package org.infinispan.stream;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.lock.StripedLockTest;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.StreamResponseCommand;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.mockito.AdditionalAnswers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional", "smoke"}, testName = "iteration.DistributedStreamIteratorTest")
/* loaded from: input_file:org/infinispan/stream/DistributedStreamIteratorTest.class */
public class DistributedStreamIteratorTest extends BaseClusteredStreamIteratorTest {
    public DistributedStreamIteratorTest() {
        this(false);
    }

    public DistributedStreamIteratorTest(boolean z) {
        super(z, CacheMode.DIST_SYNC);
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.stream.BaseStreamIteratorTest
    public Object getKeyTiedToCache(Cache<?, ?> cache) {
        return new MagicKey(cache);
    }

    @Test
    public void verifyNodeLeavesBeforeGettingData() throws TimeoutException, InterruptedException, ExecutionException {
        Map<Object, String> putValueInEachCache = putValueInEachCache(3);
        Cache cache = cache(0, this.CACHE_NAME);
        Cache<?, ?> cache2 = cache(1, this.CACHE_NAME);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("post_send_response_released");
        waitUntilSendingResponse(cache2, checkPoint);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        Future fork = fork(() -> {
            Iterator it = cache.entrySet().stream().iterator();
            while (it.hasNext()) {
                arrayBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict("pre_send_response_invoked", 10L, TimeUnit.SECONDS);
        killMember(1, this.CACHE_NAME);
        checkPoint.trigger("pre_send_response_released");
        fork.get(10L, TimeUnit.SECONDS);
        for (Map.Entry<Object, String> entry : putValueInEachCache.entrySet()) {
            AssertJUnit.assertTrue("Entry wasn't found:" + entry, arrayBlockingQueue.contains(entry));
        }
    }

    @Test
    public void verifyNodeLeavesAfterSendingBackSomeData() throws TimeoutException, InterruptedException, ExecutionException {
        Cache cache = cache(0, this.CACHE_NAME);
        Cache<?, ?> cache2 = cache(1, this.CACHE_NAME);
        HashMap hashMap = new HashMap();
        int chunkSize = cache.getCacheConfiguration().clustering().stateTransfer().chunkSize();
        for (int i = 0; i < chunkSize + 10; i++) {
            MagicKey magicKey = new MagicKey(cache2);
            cache2.put(magicKey, magicKey.toString());
            hashMap.put(magicKey, magicKey.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.trigger("pre_send_response_released");
        waitUntilSendingResponse(cache2, checkPoint);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache.entrySet().stream().iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict("post_send_response_invoked", 10L, TimeUnit.SECONDS);
        Map.Entry entry = (Map.Entry) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS);
        killMember(1, this.CACHE_NAME);
        fork.get(10L, TimeUnit.SECONDS);
        for (Map.Entry entry2 : hashMap.entrySet()) {
            AssertJUnit.assertTrue("Entry wasn't found:" + entry2, linkedBlockingQueue.contains(entry2) || entry2.equals(entry));
        }
    }

    @Test
    public void waitUntilProcessingResults() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, this.CACHE_NAME);
        Cache cache2 = cache(1, this.CACHE_NAME);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 501; i++) {
            MagicKey magicKey = new MagicKey(cache2);
            cache2.put(magicKey, magicKey.toString());
            hashMap.put(magicKey, magicKey.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("post_receive_response_released");
        waitUntilStartOfProcessingResult(cache, checkPoint);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache.entrySet().stream().iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict("pre_receive_response_invoked", 10L, TimeUnit.SECONDS);
        checkPoint.triggerForever("pre_receive_response_released");
        killMember(1, this.CACHE_NAME);
        fork.get(10L, TimeUnit.SECONDS);
        ConsistentHash readConsistentHash = ((DistributionManager) cache.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class)).getReadConsistentHash();
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment = generateEntriesPerSegment(readConsistentHash, hashMap.entrySet());
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment2 = generateEntriesPerSegment(readConsistentHash, linkedBlockingQueue);
        for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : generateEntriesPerSegment.entrySet()) {
            AssertJUnit.assertEquals("Segment " + entry.getKey() + " had a mismatch", generateEntriesPerSegment2.get(entry.getKey()), entry.getValue());
        }
    }

    @Test
    public void testNodeLeavesWhileIteratingOverContainerCausingRehashToLoseValues() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, this.CACHE_NAME);
        Cache cache2 = cache(1, this.CACHE_NAME);
        Cache cache3 = cache(2, this.CACHE_NAME);
        addClusterEnabledCacheManager(this.builderUsed);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 501; i++) {
            MagicKey magicKey = new MagicKey(cache);
            cache2.put(magicKey, magicKey.toString());
            hashMap.put(magicKey, magicKey.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("post_iterator_released");
        waitUntilDataContainerWillBeIteratedOn(cache, checkPoint);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache3.entrySet().stream().iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict("pre_iterator_invoked", 10L, TimeUnit.SECONDS);
        killMember(1, this.CACHE_NAME);
        checkPoint.triggerForever("pre_iterator_released");
        fork.get(10L, TimeUnit.SECONDS);
        ConsistentHash readConsistentHash = ((DistributionManager) cache.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class)).getReadConsistentHash();
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment = generateEntriesPerSegment(readConsistentHash, hashMap.entrySet());
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment2 = generateEntriesPerSegment(readConsistentHash, linkedBlockingQueue);
        for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : generateEntriesPerSegment.entrySet()) {
            AssertJUnit.assertEquals("Segment " + entry.getKey() + " had a mismatch", generateEntriesPerSegment2.get(entry.getKey()), entry.getValue());
        }
    }

    @Test
    public void testLocallyForcedStream() {
        Cache cache = cache(0, this.CACHE_NAME);
        Cache cache2 = cache(1, this.CACHE_NAME);
        Cache cache3 = cache(2, this.CACHE_NAME);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 501; i++) {
            switch (i % 3) {
                case 0:
                    MagicKey magicKey = new MagicKey(cache);
                    cache.put(magicKey, magicKey.toString());
                    hashMap.put(magicKey, magicKey.toString());
                    break;
                case StripedLockTest.CAN_ACQUIRE_WL /* 1 */:
                    MagicKey magicKey2 = new MagicKey((Cache<?, ?>) cache2, (Cache<?, ?>[]) new Cache[]{cache3});
                    cache2.put(magicKey2, magicKey2.toString());
                    break;
                case 2:
                    MagicKey magicKey3 = new MagicKey((Cache<?, ?>) cache3, (Cache<?, ?>[]) new Cache[]{cache2});
                    cache3.put(magicKey3, magicKey3.toString());
                    break;
                default:
                    AssertJUnit.fail("Unexpected switch case!");
                    break;
            }
        }
        int i2 = 0;
        for (Map.Entry entry : cache.getAdvancedCache().withFlags(new Flag[]{Flag.CACHE_MODE_LOCAL}).entrySet().stream()) {
            String str = (String) cache.get(entry.getKey());
            AssertJUnit.assertNotNull(str);
            AssertJUnit.assertEquals(str, (String) entry.getValue());
            i2++;
        }
        AssertJUnit.assertEquals(hashMap.size(), i2);
    }

    @Test
    public void testStayLocalIfAllSegmentsPresentLocallyWithReHash() throws Exception {
        testStayLocalIfAllSegmentsPresentLocally(true);
    }

    @Test
    public void testStayLocalIfAllSegmentsPresentLocallyWithoutRehash() throws Exception {
        testStayLocalIfAllSegmentsPresentLocally(false);
    }

    private void testStayLocalIfAllSegmentsPresentLocally(boolean z) throws Exception {
        Cache<?, ?> cache = cache(0, this.CACHE_NAME);
        ClusterStreamManager replaceWithSpy = replaceWithSpy(cache);
        IntStream.rangeClosed(0, 499).boxed().forEach(num -> {
        });
        ConsistentHash consistentHash = cache.getAdvancedCache().getDistributionManager().getConsistentHash();
        Set segmentsForOwner = consistentHash.getSegmentsForOwner(cache.getCacheManager().getAddress());
        CacheStream stream = cache.entrySet().stream();
        if (!z) {
            stream = stream.disableRehashAware();
        }
        AssertJUnit.assertEquals(segmentsForOwner, generateEntriesPerSegment(consistentHash, mapFromIterator(stream.filterKeySegments(segmentsForOwner).iterator()).entrySet()).keySet());
        ((ClusterStreamManager) Mockito.verify(replaceWithSpy, Mockito.never())).awaitCompletion((UUID) Matchers.any(UUID.class), Mockito.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
    }

    private ClusterStreamManager replaceWithSpy(Cache<?, ?> cache) {
        ClusterStreamManager clusterStreamManager = (ClusterStreamManager) Mockito.spy((ClusterStreamManager) TestingUtil.extractComponent(cache, ClusterStreamManager.class));
        TestingUtil.replaceComponent(cache, (Class<ClusterStreamManager>) ClusterStreamManager.class, clusterStreamManager, false);
        return clusterStreamManager;
    }

    private Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment(ConsistentHash consistentHash, Iterable<Map.Entry<Object, String>> iterable) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Object, String> entry : iterable) {
            int segment = consistentHash.getSegment(entry.getKey());
            Set set = (Set) hashMap.get(Integer.valueOf(segment));
            if (set == null) {
                set = new HashSet();
                hashMap.put(Integer.valueOf(segment), set);
            }
            set.add(new ImmortalCacheEntry(entry.getKey(), entry.getValue()));
        }
        return hashMap;
    }

    protected RpcManager waitUntilSendingResponse(Cache<?, ?> cache, CheckPoint checkPoint) {
        RpcManager rpcManager = (RpcManager) TestingUtil.extractComponent(cache, RpcManager.class);
        Answer delegatesTo = AdditionalAnswers.delegatesTo(rpcManager);
        RpcManager rpcManager2 = (RpcManager) Mockito.mock(RpcManager.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((RpcManager) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("pre_send_response_invoked");
            checkPoint.awaitStrict("pre_send_response_released", 10L, TimeUnit.SECONDS);
            try {
                Object answer = delegatesTo.answer(invocationOnMock);
                checkPoint.trigger("post_send_response_invoked");
                checkPoint.awaitStrict("post_send_response_released", 10L, TimeUnit.SECONDS);
                return answer;
            } catch (Throwable th) {
                checkPoint.trigger("post_send_response_invoked");
                checkPoint.awaitStrict("post_send_response_released", 10L, TimeUnit.SECONDS);
                throw th;
            }
        }).when(rpcManager2)).invokeRemotely(Matchers.anyCollectionOf(Address.class), (ReplicableCommand) Matchers.any(StreamResponseCommand.class), (RpcOptions) Matchers.any(RpcOptions.class));
        TestingUtil.replaceComponent(cache, (Class<RpcManager>) RpcManager.class, rpcManager2, true);
        return rpcManager;
    }

    protected ClusterStreamManager waitUntilStartOfProcessingResult(Cache<?, ?> cache, CheckPoint checkPoint) {
        ClusterStreamManager clusterStreamManager = (ClusterStreamManager) TestingUtil.extractComponent(cache, ClusterStreamManager.class);
        Answer delegatesTo = AdditionalAnswers.delegatesTo(clusterStreamManager);
        ClusterStreamManager clusterStreamManager2 = (ClusterStreamManager) Mockito.mock(ClusterStreamManager.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((ClusterStreamManager) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("pre_receive_response_invoked");
            checkPoint.awaitStrict("pre_receive_response_released", 10L, TimeUnit.SECONDS);
            try {
                Object answer = delegatesTo.answer(invocationOnMock);
                checkPoint.trigger("post_receive_response_invoked");
                checkPoint.awaitStrict("post_receive_response_released", 10L, TimeUnit.SECONDS);
                return answer;
            } catch (Throwable th) {
                checkPoint.trigger("post_receive_response_invoked");
                checkPoint.awaitStrict("post_receive_response_released", 10L, TimeUnit.SECONDS);
                throw th;
            }
        }).when(clusterStreamManager2)).receiveResponse((UUID) Matchers.any(UUID.class), (Address) Matchers.any(Address.class), Mockito.anyBoolean(), Matchers.anySetOf(Integer.class), Matchers.any());
        TestingUtil.replaceComponent(cache, (Class<ClusterStreamManager>) ClusterStreamManager.class, clusterStreamManager2, true);
        return clusterStreamManager;
    }

    protected DataContainer waitUntilDataContainerWillBeIteratedOn(Cache<?, ?> cache, CheckPoint checkPoint) {
        DataContainer dataContainer = (DataContainer) TestingUtil.extractComponent(cache, DataContainer.class);
        Answer delegatesTo = AdditionalAnswers.delegatesTo(dataContainer);
        DataContainer dataContainer2 = (DataContainer) Mockito.mock(DataContainer.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        AtomicInteger atomicInteger = new AtomicInteger();
        ((DataContainer) Mockito.doAnswer(invocationOnMock -> {
            boolean z = false;
            if (atomicInteger.getAndIncrement() == 0) {
                z = true;
                checkPoint.trigger("pre_iterator_invoked");
                checkPoint.awaitStrict("pre_iterator_released", 10L, TimeUnit.SECONDS);
            }
            try {
                Object answer = delegatesTo.answer(invocationOnMock);
                atomicInteger.getAndDecrement();
                if (z) {
                    checkPoint.trigger("post_iterator_invoked");
                    checkPoint.awaitStrict("post_iterator_released", 10L, TimeUnit.SECONDS);
                }
                return answer;
            } catch (Throwable th) {
                atomicInteger.getAndDecrement();
                if (z) {
                    checkPoint.trigger("post_iterator_invoked");
                    checkPoint.awaitStrict("post_iterator_released", 10L, TimeUnit.SECONDS);
                }
                throw th;
            }
        }).when(dataContainer2)).iterator();
        TestingUtil.replaceComponent(cache, (Class<DataContainer>) DataContainer.class, dataContainer2, true);
        return dataContainer;
    }
}
