package org.infinispan.stream;

import io.reactivex.rxjava3.core.Flowable;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.reactive.publisher.impl.SegmentCompletionPublisher;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.TransportFlags;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional", "smoke"}, testName = "iteration.DistributedStreamIteratorTest")
/* loaded from: input_file:org/infinispan/stream/DistributedStreamIteratorTest.class */
public class DistributedStreamIteratorTest extends BaseClusteredStreamIteratorTest {
    public DistributedStreamIteratorTest() {
        this(false, CacheMode.DIST_SYNC);
    }

    public DistributedStreamIteratorTest(boolean z, CacheMode cacheMode) {
        super(z, cacheMode);
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.stream.BaseStreamIteratorTest
    public Object getKeyTiedToCache(Cache<?, ?> cache) {
        return new MagicKey(cache);
    }

    @Test
    public void testIterationDuringInitialTransfer() throws Exception {
        Map<Object, String> putValueInEachCache = putValueInEachCache(3);
        killMember(2, "testCache");
        Cache<?, ?> cache = cache(0, "testCache");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        blockStateTransfer(cache, checkPoint);
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(this.sci, new ConfigurationBuilder(), new TransportFlags().withFD(true));
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.read(this.builderUsed.build());
        configurationBuilder.clustering().stateTransfer().awaitInitialTransfer(false);
        addClusterEnabledCacheManager.defineConfiguration("testCache", configurationBuilder.build());
        Cache cache2 = addClusterEnabledCacheManager.getCache("testCache", true);
        checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
        HashSet hashSet = new HashSet();
        try {
            Iterator it = cache2.entrySet().stream().map((v0) -> {
                return v0.getValue();
            }).iterator();
            while (it.hasNext()) {
                hashSet.add((String) it.next());
            }
            for (Map.Entry<Object, String> entry : putValueInEachCache.entrySet()) {
                AssertJUnit.assertTrue("Entry wasn't found:" + entry, hashSet.contains(entry.getValue()));
            }
        } finally {
            checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
        }
    }

    @Test
    public void verifyNodeLeavesBeforeGettingData() throws Exception {
        Map<Object, String> putValueInEachCache = putValueInEachCache(3);
        Cache cache = cache(0, "testCache");
        Cache<?, ?> cache2 = cache(1, "testCache");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        waitUntilSendingResponse(cache2, checkPoint);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache.values().stream().iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((String) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
        killMember(1, "testCache");
        checkPoint.trigger(Mocks.BEFORE_RELEASE);
        fork.get(10L, TimeUnit.SECONDS);
        for (Map.Entry<Object, String> entry : putValueInEachCache.entrySet()) {
            AssertJUnit.assertTrue("Entry wasn't found:" + entry, linkedBlockingQueue.contains(entry.getValue()));
        }
    }

    @Test
    public void verifyNodeLeavesAfterSendingBackSomeData() throws TimeoutException, InterruptedException, ExecutionException {
        Cache cache = cache(0, "testCache");
        Cache<?, ?> cache2 = cache(1, "testCache");
        HashMap hashMap = new HashMap();
        int chunkSize = cache.getCacheConfiguration().clustering().stateTransfer().chunkSize();
        for (int i = 0; i < chunkSize + 2; i++) {
            MagicKey magicKey = new MagicKey(cache2);
            cache2.put(magicKey, magicKey.toString());
            hashMap.put(magicKey, magicKey.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.trigger(Mocks.BEFORE_RELEASE);
        waitUntilSendingResponse(cache2, checkPoint);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache.entrySet().stream().iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict(Mocks.AFTER_INVOCATION, 10L, TimeUnit.SECONDS);
        checkPoint.trigger(Mocks.AFTER_RELEASE);
        Map.Entry entry = (Map.Entry) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS);
        killMember(1, "testCache");
        fork.get(10L, TimeUnit.SECONDS);
        for (Map.Entry entry2 : hashMap.entrySet()) {
            AssertJUnit.assertTrue("Entry wasn't found:" + entry2, linkedBlockingQueue.contains(entry2) || entry2.equals(entry));
        }
    }

    @Test
    public void waitUntilProcessingResults() throws TimeoutException, InterruptedException, ExecutionException {
        Cache cache = cache(0, "testCache");
        Cache cache2 = cache(1, "testCache");
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 9; i++) {
            MagicKey magicKey = new MagicKey(cache2);
            cache2.put(magicKey, magicKey.toString());
            hashMap.put(magicKey, magicKey.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        ((ClusterPublisherManager) Mockito.doAnswer(invocationOnMock -> {
            return Mocks.blockingPublisher((SegmentCompletionPublisher) invocationOnMock.callRealMethod(), checkPoint);
        }).when((ClusterPublisherManager) Mocks.replaceComponentWithSpy(cache, ClusterPublisherManager.class))).entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), Mockito.anyBoolean(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache.entrySet().stream().iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
        checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
        killMember(1, "testCache");
        fork.get(10L, TimeUnit.SECONDS);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class);
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment = generateEntriesPerSegment(keyPartitioner, hashMap.entrySet());
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment2 = generateEntriesPerSegment(keyPartitioner, linkedBlockingQueue);
        for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : generateEntriesPerSegment.entrySet()) {
            Integer key = entry.getKey();
            Set<Map.Entry<Object, String>> set = generateEntriesPerSegment2.get(key);
            if (set != null) {
                for (Map.Entry<Object, String> entry2 : entry.getValue()) {
                    if (!set.contains(entry2)) {
                        log.errorf("Segment %d, missing %s", key, entry2);
                    }
                }
                for (Map.Entry<Object, String> entry3 : set) {
                    if (!entry.getValue().contains(entry3)) {
                        log.errorf("Segment %d, extra %s", key, entry3);
                    }
                }
                AssertJUnit.assertEquals(entry.getValue().size(), set.size());
            }
            AssertJUnit.assertEquals("Segment " + key + " had a mismatch", entry.getValue(), set);
        }
    }

    @Test
    public void testNodeLeavesWhileIteratingOverContainerCausingRehashToLoseValues() throws Exception {
        Cache<?, ?> cache = cache(0, "testCache");
        Cache cache2 = cache(1, "testCache");
        Cache cache3 = cache(2, "testCache");
        HashMap hashMap = new HashMap();
        hashMap.put(new MagicKey(cache), "ignore");
        hashMap.put(new MagicKey(cache), "ignore");
        hashMap.put(new MagicKey(cache), "ignore");
        hashMap.put(new MagicKey(cache2), "ignore");
        cache2.putAll(hashMap);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("post_iterator_released");
        waitUntilDataContainerWillBeIteratedOn(cache, checkPoint);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Future fork = fork(() -> {
            Iterator it = cache3.entrySet().stream().distributedBatchSize(2).iterator();
            while (it.hasNext()) {
                linkedBlockingQueue.add((Map.Entry) it.next());
            }
            return null;
        });
        checkPoint.awaitStrict("pre_iterator_invoked", 10L, TimeUnit.SECONDS);
        killMember(0, "testCache", false);
        checkPoint.triggerForever("pre_iterator_released");
        fork.get(10L, TimeUnit.SECONDS);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(cache2, KeyPartitioner.class);
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment = generateEntriesPerSegment(keyPartitioner, hashMap.entrySet());
        Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment2 = generateEntriesPerSegment(keyPartitioner, linkedBlockingQueue);
        for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : generateEntriesPerSegment.entrySet()) {
            try {
                AssertJUnit.assertEquals("Segment " + entry.getKey() + " had a mismatch", entry.getValue(), generateEntriesPerSegment2.get(entry.getKey()));
            } catch (AssertionError e) {
                log.fatal("TEST ENDED");
                throw e;
            }
        }
    }

    @Test
    public void testLocallyForcedStream() {
        Cache cache = cache(0, "testCache");
        Cache<Object, String> cache2 = cache(1, "testCache");
        Cache<Object, String> cache3 = cache(2, "testCache");
        HashMap hashMap = new HashMap();
        MagicKey magicKey = new MagicKey(cache);
        cache.put(magicKey, magicKey.toString());
        hashMap.put(magicKey, magicKey.toString());
        MagicKey magicKey2 = magicKey(cache2, cache3);
        cache3.put(magicKey2, magicKey2.toString());
        MagicKey magicKey3 = magicKey(cache3, cache2);
        cache2.put(magicKey3, magicKey3.toString());
        int i = 0;
        for (Map.Entry entry : cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).entrySet().stream()) {
            String str = (String) cache.get(entry.getKey());
            AssertJUnit.assertNotNull(str);
            AssertJUnit.assertEquals(str, (String) entry.getValue());
            i++;
        }
        AssertJUnit.assertEquals(hashMap.size(), i);
    }

    private void testIteratorClosedProperlyOnClose(Cache<Object, String> cache, Cache<Object, String> cache2) {
        Cache<Object, String> cache3 = cache(2, "testCache");
        for (int i = 0; i < Flowable.bufferSize() + 2; i++) {
            cache.put(magicKey(cache, cache3), "value");
        }
        PublisherHandler publisherHandler = (PublisherHandler) TestingUtil.extractComponent(cache, PublisherHandler.class);
        AssertJUnit.assertEquals(0, publisherHandler.openPublishers());
        CacheStream stream = cache2.entrySet().stream();
        try {
            AssertJUnit.assertTrue(stream.distributedBatchSize(1).iterator().hasNext());
            AssertJUnit.assertEquals(1, publisherHandler.openPublishers());
            if (stream != null) {
                stream.close();
            }
            Objects.requireNonNull(publisherHandler);
            eventuallyEquals(0, publisherHandler::openPublishers);
        } catch (Throwable th) {
            if (stream != null) {
                try {
                    stream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void testIteratorClosedWhenPartiallyIteratedLocal() {
        testIteratorClosedProperlyOnClose(cache(1, "testCache"), cache(1, "testCache"));
    }

    public void testIteratorClosedWhenPartiallyIteratedRemote() {
        testIteratorClosedProperlyOnClose(cache(1, "testCache"), cache(0, "testCache"));
    }

    public void testIteratorClosedWhenIteratedFully() {
        Cache cache = cache(0, "testCache");
        Cache<Object, String> cache2 = cache(1, "testCache");
        Cache<Object, String> cache3 = cache(2, "testCache");
        for (int i = 0; i < Flowable.bufferSize() + 2; i++) {
            cache.put(magicKey(cache2, cache3), "not-local");
        }
        PublisherHandler publisherHandler = (PublisherHandler) TestingUtil.extractComponent(cache2, PublisherHandler.class);
        AssertJUnit.assertEquals(0, publisherHandler.openPublishers());
        Iterator it = cache.entrySet().stream().distributedBatchSize(1).iterator();
        AssertJUnit.assertTrue(it.hasNext());
        AssertJUnit.assertEquals(1, publisherHandler.openPublishers());
        it.forEachRemaining(entry -> {
        });
        Objects.requireNonNull(publisherHandler);
        eventuallyEquals(0, publisherHandler::openPublishers);
    }

    protected MagicKey magicKey(Cache<Object, String> cache, Cache<Object, String> cache2) {
        return cache.getCacheConfiguration().clustering().hash().numOwners() < 2 ? new MagicKey(cache) : new MagicKey((Cache<?, ?>) cache, (Cache<?, ?>[]) new Cache[]{cache2});
    }

    @Test
    public void testStayLocalIfAllSegmentsPresentLocallyWithReHash() throws Exception {
        testStayLocalIfAllSegmentsPresentLocally(true);
    }

    @Test
    public void testStayLocalIfAllSegmentsPresentLocallyWithoutRehash() throws Exception {
        testStayLocalIfAllSegmentsPresentLocally(false);
    }

    private void testStayLocalIfAllSegmentsPresentLocally(boolean z) {
        Cache cache = cache(0, "testCache");
        RpcManager rpcManager = (RpcManager) Mocks.replaceComponentWithSpy(cache, RpcManager.class);
        putValueInEachCache(3);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class);
        IntSet from = IntSets.from(cache.getAdvancedCache().getDistributionManager().getWriteConsistentHash().getSegmentsForOwner(address(0)));
        CacheStream stream = cache.entrySet().stream();
        if (!z) {
            stream = stream.disableRehashAware();
        }
        AssertJUnit.assertTrue(from.containsAll(generateEntriesPerSegment(keyPartitioner, mapFromIterator(stream.filterKeySegments(from).iterator()).entrySet()).keySet()));
        ((RpcManager) Mockito.verify(rpcManager, Mockito.never())).invokeCommand((Address) ArgumentMatchers.any(Address.class), (ReplicableCommand) ArgumentMatchers.any(InitialPublisherCommand.class), (ResponseCollector) ArgumentMatchers.any(), (RpcOptions) ArgumentMatchers.any());
    }

    protected void waitUntilSendingResponse(Cache<?, ?> cache, CheckPoint checkPoint) {
        Mocks.blockingMock(checkPoint, LocalPublisherManager.class, cache, (stubber, localPublisherManager) -> {
            ((LocalPublisherManager) stubber.when(localPublisherManager)).entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), Mockito.anyBoolean(), (DeliveryGuarantee) ArgumentMatchers.any(), (Function) ArgumentMatchers.any());
        }, new Class[0]);
    }

    protected <K> void blockStateTransfer(Cache<?, ?> cache, CheckPoint checkPoint) {
        Mocks.blockInboundCacheRpcCommand(cache, checkPoint, cacheRpcCommand -> {
            return cacheRpcCommand instanceof StateTransferStartCommand;
        });
    }

    protected void waitUntilDataContainerWillBeIteratedOn(Cache<?, ?> cache, CheckPoint checkPoint) {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((InternalDataContainer) TestingUtil.extractComponent(cache, InternalDataContainer.class));
        InternalDataContainer internalDataContainer = (InternalDataContainer) Mockito.mock(InternalDataContainer.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        AtomicInteger atomicInteger = new AtomicInteger();
        ((InternalDataContainer) Mockito.doAnswer(invocationOnMock -> {
            boolean z = false;
            if (atomicInteger.getAndIncrement() == 0) {
                z = true;
                checkPoint.trigger("pre_iterator_invoked");
                checkPoint.awaitStrict("pre_iterator_released", 10L, TimeUnit.SECONDS);
            }
            try {
                Object answer = delegatesTo.answer(invocationOnMock);
                atomicInteger.getAndDecrement();
                if (z) {
                    checkPoint.trigger("post_iterator_invoked");
                    checkPoint.awaitStrict("post_iterator_released", 10L, TimeUnit.SECONDS);
                }
                return answer;
            } catch (Throwable th) {
                atomicInteger.getAndDecrement();
                if (z) {
                    checkPoint.trigger("post_iterator_invoked");
                    checkPoint.awaitStrict("post_iterator_released", 10L, TimeUnit.SECONDS);
                }
                throw th;
            }
        }).when(internalDataContainer)).publisher(ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent(cache, (Class<? extends InternalDataContainer>) InternalDataContainer.class, internalDataContainer, true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/infinispan/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
