package org.infinispan.notifications.cachelistener;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentCompletionPublisher;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "notifications.cachelistener.CacheNotifierImplInitialTransferDistTest")
/* loaded from: input_file:org/infinispan/notifications/cachelistener/CacheNotifierImplInitialTransferDistTest.class */
public class CacheNotifierImplInitialTransferDistTest extends MultipleCacheManagersTest {
    private final String CACHE_NAME = "DistInitialTransferListener";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/CacheNotifierImplInitialTransferDistTest$Operation.class */
    public enum Operation {
        PUT(Event.Type.CACHE_ENTRY_MODIFIED),
        CREATE(Event.Type.CACHE_ENTRY_CREATED),
        REMOVE(Event.Type.CACHE_ENTRY_REMOVED) { // from class: org.infinispan.notifications.cachelistener.CacheNotifierImplInitialTransferDistTest.Operation.1
            @Override // org.infinispan.notifications.cachelistener.CacheNotifierImplInitialTransferDistTest.Operation
            public <K, V> Object perform(Cache<K, V> cache, K k, V v) {
                return cache.remove(k);
            }
        };

        private final Event.Type type;

        Operation(Event.Type type) {
            this.type = type;
        }

        public Event.Type getType() {
            return this.type;
        }

        public <K, V> Object perform(Cache<K, V> cache, K k, V v) {
            return cache.put(k, v);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/CacheNotifierImplInitialTransferDistTest$StateListener.class */
    public static abstract class StateListener<K, V> {
        final List<CacheEntryEvent<K, V>> events = Collections.synchronizedList(new ArrayList());
        private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

        protected StateListener() {
        }

        @CacheEntryCreated
        @CacheEntryModified
        @CacheEntryRemoved
        public void onCacheNotification(CacheEntryEvent<K, V> cacheEntryEvent) {
            log.tracef("Received event: %s", cacheEntryEvent);
            this.events.add(cacheEntryEvent);
        }
    }

    @Listener(includeCurrentState = true, clustered = true)
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/CacheNotifierImplInitialTransferDistTest$StateListenerClustered.class */
    private static class StateListenerClustered extends StateListener<String, String> {
        private StateListenerClustered() {
        }
    }

    @Listener(includeCurrentState = true, clustered = false)
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/CacheNotifierImplInitialTransferDistTest$StateListenerNotClustered.class */
    private static class StateListenerNotClustered extends StateListener<String, String> {
        private StateListenerNotClustered() {
        }
    }

    /* loaded from: input_file:org/infinispan/notifications/cachelistener/CacheNotifierImplInitialTransferDistTest$StreamMocking.class */
    private interface StreamMocking {
        void additionalInformation(Stream stream, Stream stream2, StreamMocking streamMocking);
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        createClusteredCaches(3, "DistInitialTransferListener", getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC));
    }

    public void testSimpleCacheStartingClusterListener() {
        testSimpleCacheStarting(new StateListenerClustered());
    }

    private void testSimpleCacheStarting(StateListener<String, String> stateListener) {
        HashMap hashMap = new HashMap(10);
        Cache<String, String> cache = cache(0, "DistInitialTransferListener");
        populateCache(cache, hashMap);
        cache.addListener(stateListener);
        try {
            verifyEvents(isClustered(stateListener), stateListener, hashMap);
            cache.removeListener(stateListener);
        } catch (Throwable th) {
            cache.removeListener(stateListener);
            throw th;
        }
    }

    private void populateCache(Cache<String, String> cache, Map<String, String> map) {
        for (int i = 0; i < 10; i++) {
            String str = "key-" + i;
            String str2 = "value-" + i;
            map.put(str, str2);
            cache.put(str, str2);
        }
    }

    private void verifyEvents(boolean z, StateListener<String, String> stateListener, Map<String, String> map) {
        Assert.assertEquals(stateListener.events.size(), z ? map.size() : map.size() * 2);
        boolean z2 = true;
        for (CacheEntryEvent<String, String> cacheEntryEvent : stateListener.events) {
            if (!z) {
                z2 = !z2;
            }
            Assert.assertEquals(cacheEntryEvent.getType(), Event.Type.CACHE_ENTRY_CREATED);
            Assert.assertTrue(map.containsKey(cacheEntryEvent.getKey()));
            Assert.assertEquals(cacheEntryEvent.isPre(), !z2);
            if (z2) {
                Assert.assertEquals((String) cacheEntryEvent.getValue(), map.get(cacheEntryEvent.getKey()));
            } else {
                Assert.assertNull(cacheEntryEvent.getValue());
            }
        }
    }

    public void testCreateAfterIterationBeganButNotIteratedValueYetNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE, false);
    }

    public void testCreateAfterIterationBeganButNotIteratedValueYetOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE, true);
    }

    public void testModificationAfterIterationBeganButNotIteratedValueYetNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT, false);
    }

    public void testModificationAfterIterationBeganButNotIteratedValueYetOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT, true);
    }

    public void testRemoveAfterIterationBeganButNotIteratedValueYetNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE, false);
    }

    public void testRemoveAfterIterationBeganButNotIteratedValueYetOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE, true);
    }

    private void testModificationAfterIterationBeganButNotIteratedValueYet(StateListener<String, String> stateListener, Operation operation, boolean z) throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        HashMap hashMap = new HashMap(10);
        Cache<String, String> cache = cache(0, "DistInitialTransferListener");
        populateCache(cache, hashMap);
        CheckPoint checkPoint = new CheckPoint();
        registerBlockingPublisher(checkPoint, cache);
        checkPoint.triggerForever(Mocks.AFTER_INVOCATION);
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        try {
            String findKeyBasedOnOwnership = findKeyBasedOnOwnership("key-to-change", cache.getAdvancedCache().getDistributionManager().getCacheTopology(), z);
            String prepareOperation = prepareOperation(operation, cache, findKeyBasedOnOwnership);
            if (prepareOperation != null) {
                hashMap.put(findKeyBasedOnOwnership, prepareOperation);
            }
            Future fork = fork(() -> {
                cache.addListener(stateListener);
                return null;
            });
            checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
            operation.perform(cache, findKeyBasedOnOwnership, prepareOperation);
            checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
            fork.get(10L, TimeUnit.SECONDS);
            verifyEvents(isClustered(stateListener), stateListener, hashMap);
            cache.removeListener(stateListener);
        } catch (Throwable th) {
            cache.removeListener(stateListener);
            throw th;
        }
    }

    private String prepareOperation(Operation operation, Cache<String, String> cache, String str) {
        String str2;
        switch (operation) {
            case CREATE:
                str2 = "new-value";
                break;
            case PUT:
                cache.put(str, "initial-value");
                str2 = "changed-value";
                break;
            case REMOVE:
                cache.put(str, "initial-value");
                str2 = null;
                break;
            default:
                throw new IllegalArgumentException("Unsupported Operation provided " + operation);
        }
        return str2;
    }

    private void testModificationAfterIterationBeganAndCompletedSegmentValueOwner(StateListener<String, String> stateListener, Operation operation, boolean z) throws IOException, InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        HashMap hashMap = new HashMap(10);
        Cache<String, String> cache = cache(0, "DistInitialTransferListener");
        populateCache(cache, hashMap);
        CheckPoint checkPoint = new CheckPoint();
        registerBlockingPublisher(checkPoint, cache);
        checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
        try {
            String findKeyBasedOnOwnership = findKeyBasedOnOwnership("key-to-change", cache.getAdvancedCache().getDistributionManager().getCacheTopology(), z);
            String prepareOperation = prepareOperation(operation, cache, findKeyBasedOnOwnership);
            if (cache.get(findKeyBasedOnOwnership) != null) {
                hashMap.put(findKeyBasedOnOwnership, (String) cache.get(findKeyBasedOnOwnership));
            }
            Future fork = fork(() -> {
                cache.addListener(stateListener);
                return null;
            });
            checkPoint.awaitStrict(Mocks.AFTER_INVOCATION, 10L, TimeUnit.MINUTES);
            Object perform = operation.perform(cache, findKeyBasedOnOwnership, prepareOperation);
            checkPoint.triggerForever(Mocks.AFTER_RELEASE);
            fork.get(10L, TimeUnit.MINUTES);
            boolean isClustered = isClustered(stateListener);
            Assert.assertEquals(stateListener.events.size(), isClustered ? hashMap.size() + 1 : (hashMap.size() + 1) * 2);
            boolean z2 = true;
            int i = 0;
            while (true) {
                if (i >= (isClustered ? hashMap.size() : hashMap.size() * 2)) {
                    break;
                }
                if (!isClustered) {
                    z2 = !z2;
                }
                CacheEntryEvent<String, String> cacheEntryEvent = stateListener.events.get(i);
                Assert.assertEquals(cacheEntryEvent.getType(), Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertTrue(hashMap.containsKey(cacheEntryEvent.getKey()));
                Assert.assertEquals(cacheEntryEvent.isPre(), !z2);
                if (z2) {
                    Assert.assertEquals(cacheEntryEvent.getValue(), hashMap.get(cacheEntryEvent.getKey()));
                } else {
                    Assert.assertNull(cacheEntryEvent.getValue());
                }
                i++;
            }
            if (isClustered) {
                CacheEntryEvent<String, String> cacheEntryEvent2 = stateListener.events.get(i);
                Assert.assertEquals(cacheEntryEvent2.getType(), operation.getType());
                Assert.assertEquals(cacheEntryEvent2.isPre(), false);
                Assert.assertEquals((String) cacheEntryEvent2.getKey(), findKeyBasedOnOwnership);
                Assert.assertEquals((String) cacheEntryEvent2.getValue(), prepareOperation);
            } else {
                CacheEntryEvent<String, String> cacheEntryEvent3 = stateListener.events.get(i);
                Assert.assertEquals(cacheEntryEvent3.getType(), operation.getType());
                Assert.assertEquals(cacheEntryEvent3.isPre(), true);
                Assert.assertEquals((String) cacheEntryEvent3.getKey(), findKeyBasedOnOwnership);
                Assert.assertEquals(cacheEntryEvent3.getValue(), perform);
                CacheEntryEvent<String, String> cacheEntryEvent4 = stateListener.events.get(i + 1);
                Assert.assertEquals(cacheEntryEvent4.getType(), operation.getType());
                Assert.assertEquals(cacheEntryEvent4.isPre(), false);
                Assert.assertEquals((String) cacheEntryEvent4.getKey(), findKeyBasedOnOwnership);
                Assert.assertEquals((String) cacheEntryEvent4.getValue(), prepareOperation);
            }
        } finally {
            cache.removeListener(stateListener);
        }
    }

    private String findKeyBasedOnOwnership(String str, LocalizedCacheTopology localizedCacheTopology, boolean z) {
        for (int i = 0; i < 1000; i++) {
            String str2 = str + i;
            if (localizedCacheTopology.getDistribution(str2).isPrimary() == z) {
                if (z) {
                    log.debugf("Found key %s with primary owner %s, segment %d", str2, localizedCacheTopology.getLocalAddress(), Integer.valueOf(localizedCacheTopology.getSegment(str2)));
                } else {
                    log.debugf("Found key %s with primary owner != %s, segment %d", str2, localizedCacheTopology.getLocalAddress(), Integer.valueOf(localizedCacheTopology.getSegment(str2)));
                }
                return str2;
            }
        }
        throw new RuntimeException("No key could be found for owner, this may be a bug in test or really bad luck!");
    }

    public void testCreateAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.CREATE, false);
    }

    public void testCreateAfterIterationBeganAndCompletedSegmentValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.CREATE, true);
    }

    public void testModificationAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.PUT, false);
    }

    public void testModificationAfterIterationBeganAndCompletedSegmentValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.PUT, true);
    }

    public void testRemoveAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.REMOVE, false);
    }

    public void testRemoveAfterIterationBeganAndCompletedSegmentValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.REMOVE, true);
    }

    protected void testIterationBeganAndSegmentNotComplete(StateListener<String, String> stateListener, Operation operation, boolean z) throws TimeoutException, InterruptedException, ExecutionException {
        HashMap hashMap = new HashMap(10);
        Cache<String, String> cache = cache(0, "DistInitialTransferListener");
        populateCache(cache, hashMap);
        String findKeyBasedOnOwnership = findKeyBasedOnOwnership("key-to-change-", cache.getAdvancedCache().getDistributionManager().getCacheTopology(), z);
        String prepareOperation = prepareOperation(operation, cache, findKeyBasedOnOwnership);
        if (cache.get(findKeyBasedOnOwnership) != null) {
            hashMap.put(findKeyBasedOnOwnership, (String) cache.get(findKeyBasedOnOwnership));
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        waitUntilClosingSegment(cache, cache.getAdvancedCache().getDistributionManager().getCacheTopology().getSegment(findKeyBasedOnOwnership), checkPoint);
        Future fork = fork(() -> {
            cache.addListener(stateListener);
            return null;
        });
        try {
            checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
            Object perform = operation.perform(cache, findKeyBasedOnOwnership, prepareOperation);
            checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
            fork.get(10L, TimeUnit.SECONDS);
            boolean isClustered = isClustered(stateListener);
            Assert.assertEquals(stateListener.events.size(), isClustered ? hashMap.size() + 1 : (hashMap.size() + 1) * 2);
            if (isClustered) {
                CacheEntryEvent<String, String> cacheEntryEvent = null;
                boolean z2 = false;
                int size = stateListener.events.size() - 1;
                while (true) {
                    if (size < 0) {
                        break;
                    }
                    CacheEntryEvent<String, String> cacheEntryEvent2 = stateListener.events.get(size);
                    if (cacheEntryEvent2.getKey().equals(findKeyBasedOnOwnership) && operation.getType() == cacheEntryEvent2.getType()) {
                        if (cacheEntryEvent == null) {
                            cacheEntryEvent = cacheEntryEvent2;
                            stateListener.events.remove(size);
                            if (operation.getType() == Event.Type.CACHE_ENTRY_CREATED) {
                                z2 = true;
                                break;
                            }
                        } else {
                            Assert.fail("There should only be a single event in the event queue!");
                        }
                        size--;
                    } else {
                        if (cacheEntryEvent != null) {
                            boolean equals = cacheEntryEvent.getKey().equals(cacheEntryEvent2.getKey());
                            z2 = equals;
                            if (equals) {
                                break;
                            }
                        } else {
                            continue;
                        }
                        size--;
                    }
                }
                Assert.assertTrue(z2, "There was no matching create event for key " + cacheEntryEvent.getKey());
                Assert.assertEquals(cacheEntryEvent.getType(), operation.getType());
                Assert.assertEquals(cacheEntryEvent.isPre(), false);
                Assert.assertEquals(cacheEntryEvent.getValue(), prepareOperation);
            }
            boolean z3 = true;
            int i = 0;
            while (true) {
                if (i >= (isClustered ? hashMap.size() : hashMap.size() * 2)) {
                    break;
                }
                if (!isClustered) {
                    z3 = !z3;
                }
                CacheEntryEvent<String, String> cacheEntryEvent3 = stateListener.events.get(i);
                Assert.assertEquals(cacheEntryEvent3.getType(), Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertTrue(hashMap.containsKey(cacheEntryEvent3.getKey()));
                Assert.assertEquals(cacheEntryEvent3.isPre(), !z3);
                if (z3) {
                    Assert.assertEquals((String) cacheEntryEvent3.getValue(), hashMap.get(cacheEntryEvent3.getKey()));
                } else {
                    Assert.assertNull(cacheEntryEvent3.getValue());
                }
                i++;
            }
            if (!isClustered) {
                CacheEntryEvent<String, String> cacheEntryEvent4 = stateListener.events.get(i);
                Assert.assertEquals(cacheEntryEvent4.getType(), operation.getType());
                Assert.assertEquals(cacheEntryEvent4.isPre(), true);
                Assert.assertEquals((String) cacheEntryEvent4.getKey(), findKeyBasedOnOwnership);
                Assert.assertEquals(cacheEntryEvent4.getValue(), perform);
                CacheEntryEvent<String, String> cacheEntryEvent5 = stateListener.events.get(i + 1);
                Assert.assertEquals(cacheEntryEvent5.getType(), operation.getType());
                Assert.assertEquals(cacheEntryEvent5.isPre(), false);
                Assert.assertEquals((String) cacheEntryEvent5.getKey(), findKeyBasedOnOwnership);
                Assert.assertEquals((String) cacheEntryEvent5.getValue(), prepareOperation);
            }
        } finally {
            cache.removeListener(stateListener);
        }
    }

    public void testCreateAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.CREATE, false);
    }

    public void testCreateAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.CREATE, true);
    }

    public void testModificationAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.PUT, false);
    }

    public void testModificationAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.PUT, true);
    }

    public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.REMOVE, false);
    }

    public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.REMOVE, true);
    }

    private boolean isClustered(StateListener stateListener) {
        return stateListener.getClass().getAnnotation(Listener.class).clustered();
    }

    private void segmentCompletionWaiter(AtomicBoolean atomicBoolean, CheckPoint checkPoint) throws TimeoutException, InterruptedException {
        if (atomicBoolean.compareAndSet(false, true)) {
            log.tracef("We were first to check segment completion", new Object[0]);
            return;
        }
        log.tracef("We were last to check segment completion, so notifying main thread", new Object[0]);
        checkPoint.trigger("pre_complete_segment_invoked");
        checkPoint.awaitStrict("pre_complete_segment_released", 10L, TimeUnit.SECONDS);
    }

    protected void waitUntilClosingSegment(Cache<?, ?> cache, int i, CheckPoint checkPoint) {
        ((ClusterPublisherManager) Mockito.doAnswer(invocationOnMock -> {
            SegmentCompletionPublisher segmentCompletionPublisher = (SegmentCompletionPublisher) invocationOnMock.callRealMethod();
            return (subscriber, intConsumer) -> {
                IntConsumer intConsumer = (IntConsumer) Mockito.mock(IntConsumer.class);
                ((IntConsumer) Mockito.doAnswer(Mocks.blockingAnswer(AdditionalAnswers.delegatesTo(intConsumer), checkPoint)).when(intConsumer)).accept(i);
                segmentCompletionPublisher.subscribe(subscriber, intConsumer);
            };
        }).when((ClusterPublisherManager) Mocks.replaceComponentWithSpy(cache, ClusterPublisherManager.class))).entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any());
    }

    private static void registerBlockingPublisher(CheckPoint checkPoint, Cache<?, ?> cache) {
        ((ClusterPublisherManager) Mockito.doAnswer(invocationOnMock -> {
            return Mocks.blockingPublisher((SegmentCompletionPublisher) invocationOnMock.callRealMethod(), checkPoint);
        }).when((ClusterPublisherManager) Mocks.replaceComponentWithSpy(cache, ClusterPublisherManager.class))).entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any());
    }
}
