package org.infinispan.scattered.stream;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.statetransfer.ScatteredStateGetKeysCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.stream.DistributedStreamIteratorTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.testng.annotations.Test;

@Test(groups = {"functional", "smoke"}, testName = "iteration.ScatteredStreamIteratorTest")
/* loaded from: input_file:org/infinispan/scattered/stream/ScatteredStreamIteratorTest.class */
public class ScatteredStreamIteratorTest extends DistributedStreamIteratorTest {
    public ScatteredStreamIteratorTest() {
        super(false, CacheMode.SCATTERED_SYNC);
    }

    @Override // org.infinispan.stream.DistributedStreamIteratorTest
    public void testNodeLeavesWhileIteratingOverContainerCausingRehashToLoseValues() {
    }

    @Override // org.infinispan.stream.DistributedStreamIteratorTest
    public void waitUntilProcessingResults() {
    }

    @Override // org.infinispan.stream.DistributedStreamIteratorTest
    protected <K> void blockStateTransfer(Cache<?, ?> cache, CheckPoint checkPoint) {
        Executor executor = (Executor) TestingUtil.extractGlobalComponent(cache.getCacheManager(), ExecutorService.class, "org.infinispan.executors.non-blocking");
        TestingUtil.wrapInboundInvocationHandler(cache, perCacheInboundInvocationHandler -> {
            return new AbstractDelegatingHandler(perCacheInboundInvocationHandler) { // from class: org.infinispan.scattered.stream.ScatteredStreamIteratorTest.1
                public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
                    if (!(cacheRpcCommand instanceof ScatteredStateGetKeysCommand)) {
                        this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
                        return;
                    }
                    checkPoint.trigger(Mocks.BEFORE_INVOCATION);
                    CompletableFuture<Void> whenComplete = checkPoint.future(Mocks.BEFORE_RELEASE, 2L, TimeUnit.SECONDS, executor).whenComplete((r9, th) -> {
                        this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
                    });
                    CheckPoint checkPoint2 = checkPoint;
                    Executor executor2 = executor;
                    whenComplete.thenCompose(r92 -> {
                        checkPoint2.trigger(Mocks.AFTER_INVOCATION);
                        return checkPoint2.future(Mocks.AFTER_RELEASE, 20L, TimeUnit.SECONDS, executor2);
                    });
                }
            };
        });
    }
}
