package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.marshall.core.MarshallableFunctions;
import org.infinispan.reactive.publisher.PublisherReducers;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.InCacheMode;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@InCacheMode({CacheMode.REPL_SYNC, CacheMode.DIST_SYNC, CacheMode.SCATTERED_SYNC})
@Test(groups = {"functional"}, testName = "reactive.publisher.impl.SimpleClusterPublisherManagerTest")
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/SimpleClusterPublisherManagerTest.class */
public class SimpleClusterPublisherManagerTest extends MultipleCacheManagersTest {
    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(this.cacheMode, false);
        defaultClusteredCacheConfig.clustering().hash().numSegments(10);
        createCluster(defaultClusteredCacheConfig, 4);
        waitForClusterToForm();
    }

    private Map<Integer, String> insert(Cache<Integer, String> cache) {
        HashMap hashMap = new HashMap(24);
        HashMap hashMap2 = log.isTraceEnabled() ? new HashMap() : null;
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class);
        IntStream.range(0, 24).forEach(i -> {
            hashMap.put(Integer.valueOf(i), "value-" + i);
            if (hashMap2 != null) {
                ((IntSet) hashMap2.computeIfAbsent(Integer.valueOf(keyPartitioner.getSegment(Integer.valueOf(i))), (v0) -> {
                    return IntSets.mutableEmptySet(v0);
                })).set(i);
            }
        });
        if (hashMap2 != null) {
            log.tracef("Keys by segment are: " + hashMap2, new Object[0]);
        }
        cache.putAll(hashMap);
        return hashMap;
    }

    private ClusterPublisherManager<Integer, String> cpm(Cache<Integer, String> cache) {
        return (ClusterPublisherManager) TestingUtil.extractComponent(cache, ClusterPublisherManager.class);
    }

    @DataProvider(name = "GuaranteeParallelEntry")
    public Object[][] collectionAndVersionsProvider() {
        return (Object[][]) Arrays.stream(DeliveryGuarantee.values()).flatMap(deliveryGuarantee -> {
            return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).flatMap(bool -> {
                return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).map(bool -> {
                    return new Object[]{deliveryGuarantee, bool, bool};
                });
            });
        }).toArray(i -> {
            return new Object[i];
        });
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testCount(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) {
        Cache<Integer, String> cache = cache(0);
        int size = insert(cache).size();
        ClusterPublisherManager<Integer, String> cpm = cpm(cache);
        AssertJUnit.assertEquals(size, ((Long) (z2 ? cpm.entryReduction(z, (IntSet) null, (Set) null, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(z, (IntSet) null, (Set) null, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add())).toCompletableFuture().join()).intValue());
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testCountSegments(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) {
        Cache<Integer, String> cache = cache(0);
        int size = insert(cache).size();
        IntSet mutableEmptySet = IntSets.mutableEmptySet();
        for (int i = 2; i <= 8; i++) {
            mutableEmptySet.set(i);
        }
        ClusterPublisherManager<Integer, String> cpm = cpm(cache);
        AssertJUnit.assertEquals(findHowManyInSegments(size, mutableEmptySet, (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class)), ((Long) (z2 ? cpm.entryReduction(z, mutableEmptySet, (Set) null, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(z, mutableEmptySet, (Set) null, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add())).toCompletableFuture().join()).intValue());
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testCountSpecificKeys(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) {
        Cache<Integer, String> cache = cache(0);
        int size = insert(cache).size();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < size; i += 2) {
            hashSet.add(Integer.valueOf(i));
        }
        hashSet.add(Integer.valueOf(size + 1));
        ClusterPublisherManager<Integer, String> cpm = cpm(cache);
        AssertJUnit.assertEquals(size / 2, ((Long) (z2 ? cpm.entryReduction(z, (IntSet) null, hashSet, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(z, (IntSet) null, hashSet, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add())).toCompletableFuture().join()).intValue());
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testCountWithContext(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) {
        Cache<Integer, String> cache = cache(0);
        int size = insert(cache).size();
        NonTxInvocationContext nonTxInvocationContext = new NonTxInvocationContext((Address) null);
        nonTxInvocationContext.putLookedUpEntry(0, NullCacheEntry.getInstance());
        nonTxInvocationContext.putLookedUpEntry(Integer.valueOf(size - 2), (CacheEntry) Mockito.when(Boolean.valueOf(((CacheEntry) Mockito.mock(CacheEntry.class)).isRemoved())).thenReturn(true).getMock());
        nonTxInvocationContext.putLookedUpEntry(Integer.valueOf(size + 1), new ImmortalCacheEntry(Integer.valueOf(size + 1), Integer.valueOf(size + 1)));
        ClusterPublisherManager<Integer, String> cpm = cpm(cache);
        AssertJUnit.assertEquals(size - 1, ((Long) (z2 ? cpm.entryReduction(z, (IntSet) null, (Set) null, nonTxInvocationContext, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(z, (IntSet) null, (Set) null, nonTxInvocationContext, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add())).toCompletableFuture().join()).intValue());
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testCountWithContextSegments(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) {
        Cache<Integer, String> cache = cache(0);
        int size = insert(cache).size();
        IntSet mutableEmptySet = IntSets.mutableEmptySet();
        for (int i = 2; i <= 8; i++) {
            mutableEmptySet.set(i);
        }
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class);
        int findHowManyInSegments = findHowManyInSegments(size, mutableEmptySet, keyPartitioner);
        AtomicInteger atomicInteger = new AtomicInteger();
        NonTxInvocationContext nonTxInvocationContext = new NonTxInvocationContext((Address) null);
        nonTxInvocationContext.putLookedUpEntry(0, NullCacheEntry.getInstance());
        nonTxInvocationContext.putLookedUpEntry(Integer.valueOf(size - 2), (CacheEntry) Mockito.when(Boolean.valueOf(((CacheEntry) Mockito.mock(CacheEntry.class)).isRemoved())).thenReturn(true).getMock());
        nonTxInvocationContext.putLookedUpEntry(Integer.valueOf(size + 1), new ImmortalCacheEntry(Integer.valueOf(size + 1), Integer.valueOf(size + 1)));
        nonTxInvocationContext.forEachEntry((obj, cacheEntry) -> {
            if (mutableEmptySet.contains(keyPartitioner.getSegment(obj))) {
                atomicInteger.addAndGet((cacheEntry.isRemoved() || cacheEntry.isNull()) ? -1 : 1);
            }
        });
        ClusterPublisherManager<Integer, String> cpm = cpm(cache);
        AssertJUnit.assertEquals(findHowManyInSegments + atomicInteger.get(), ((Long) (z2 ? cpm.entryReduction(z, mutableEmptySet, (Set) null, nonTxInvocationContext, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(z, mutableEmptySet, (Set) null, nonTxInvocationContext, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add())).toCompletableFuture().join()).intValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int findHowManyInSegments(int i, IntSet intSet, KeyPartitioner keyPartitioner) {
        int i2 = 0;
        for (int i3 = 0; i3 < i; i3++) {
            if (intSet.contains(keyPartitioner.getSegment(Integer.valueOf(i3)))) {
                i2++;
            }
        }
        return i2;
    }

    @AfterMethod
    public void verifyNoDanglingRequests() {
        for (Cache cache : caches()) {
            eventuallyEquals(0, () -> {
                return Integer.valueOf(((PublisherHandler) TestingUtil.extractComponent(cache, PublisherHandler.class)).openPublishers());
            });
        }
    }

    @DataProvider(name = "GuaranteeEntry")
    public Object[][] guaranteesEntryType() {
        return (Object[][]) Arrays.stream(DeliveryGuarantee.values()).flatMap(deliveryGuarantee -> {
            return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).map(bool -> {
                return new Object[]{deliveryGuarantee, bool};
            });
        }).toArray(i -> {
            return new Object[i];
        });
    }

    private <I> void performPublisherOperation(DeliveryGuarantee deliveryGuarantee, boolean z, IntSet intSet, Set<Integer> set, InvocationContext invocationContext, Map<Integer, String> map) {
        SegmentPublisherSupplier keyPublisher;
        Consumer consumer;
        ClusterPublisherManager<Integer, String> cpm = cpm(cache(0));
        if (z) {
            keyPublisher = cpm.entryPublisher(intSet, set, invocationContext, 0L, deliveryGuarantee, 10, MarshallableFunctions.identity());
            consumer = obj -> {
                Map.Entry entry = (Map.Entry) obj;
                AssertJUnit.assertEquals(map.get(entry.getKey()), entry.getValue());
            };
        } else {
            keyPublisher = cpm.keyPublisher(intSet, set, invocationContext, 0L, deliveryGuarantee, 10, MarshallableFunctions.identity());
            consumer = obj2 -> {
                AssertJUnit.assertTrue(map.containsKey(obj2));
            };
        }
        int size = map.size();
        List list = (List) Flowable.fromPublisher(keyPublisher.publisherWithoutSegments()).toList(size).blockingGet();
        if (size != list.size()) {
            log.fatal("SIZE MISMATCH expected: " + map + " was: " + list);
        }
        AssertJUnit.assertEquals(size, list.size());
        list.forEach(consumer);
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testSimpleIteration(DeliveryGuarantee deliveryGuarantee, boolean z) {
        performPublisherOperation(deliveryGuarantee, z, null, null, null, insert(cache(0)));
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testIterationSegments(DeliveryGuarantee deliveryGuarantee, boolean z) {
        Cache<Integer, String> cache = cache(0);
        Map<Integer, String> insert = insert(cache);
        IntSet mutableEmptySet = IntSets.mutableEmptySet();
        for (int i = 2; i <= 7; i++) {
            mutableEmptySet.set(i);
        }
        removeEntriesNotInSegment(insert, (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class), mutableEmptySet);
        performPublisherOperation(deliveryGuarantee, z, mutableEmptySet, null, null, insert);
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testContextIteration(DeliveryGuarantee deliveryGuarantee, boolean z) {
        Map<Integer, String> insert = insert(cache(0));
        NonTxInvocationContext nonTxInvocationContext = new NonTxInvocationContext((Address) null);
        nonTxInvocationContext.putLookedUpEntry(0, NullCacheEntry.getInstance());
        insert.remove(0);
        nonTxInvocationContext.putLookedUpEntry(7, (CacheEntry) Mockito.when(Boolean.valueOf(((CacheEntry) Mockito.mock(CacheEntry.class)).isRemoved())).thenReturn(true).getMock());
        insert.remove(7);
        nonTxInvocationContext.putLookedUpEntry(156, new ImmortalCacheEntry(156, "value-156"));
        insert.put(156, "value-156");
        performPublisherOperation(deliveryGuarantee, z, null, null, nonTxInvocationContext, insert);
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testSpecificKeyIteration(DeliveryGuarantee deliveryGuarantee, boolean z) {
        Map<Integer, String> insert = insert(cache(0));
        HashSet hashSet = new HashSet();
        hashSet.add(1);
        hashSet.add(4);
        hashSet.add(7);
        hashSet.add(123);
        insert.entrySet().removeIf(entry -> {
            return !hashSet.contains(entry.getKey());
        });
        performPublisherOperation(deliveryGuarantee, z, null, hashSet, null, insert);
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testMapIteration(DeliveryGuarantee deliveryGuarantee, boolean z) {
        List list;
        SegmentPublisherSupplier keyPublisher;
        Cache<Integer, String> cache = cache(0);
        ClusterPublisherManager<Integer, String> cpm = cpm(cache(0));
        Map<Integer, String> insert = insert(cache);
        if (z) {
            list = (List) insert.entrySet().stream().map((v0) -> {
                return v0.getValue();
            }).map((v0) -> {
                return String.valueOf(v0);
            }).collect(Collectors.toList());
            keyPublisher = cpm.entryPublisher((IntSet) null, (Set) null, (InvocationContext) null, 0L, deliveryGuarantee, 10, publisher -> {
                return Flowable.fromPublisher(publisher).map((v0) -> {
                    return v0.getValue();
                }).map((v0) -> {
                    return String.valueOf(v0);
                });
            });
        } else {
            list = (List) insert.keySet().stream().map((v0) -> {
                return String.valueOf(v0);
            }).collect(Collectors.toList());
            keyPublisher = cpm.keyPublisher((IntSet) null, (Set) null, (InvocationContext) null, 0L, deliveryGuarantee, 10, publisher2 -> {
                return Flowable.fromPublisher(publisher2).map((v0) -> {
                    return String.valueOf(v0);
                });
            });
        }
        performFunctionPublisherOperation(keyPublisher.publisherWithoutSegments(), list);
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testEmptySegmentNotification(DeliveryGuarantee deliveryGuarantee, boolean z) throws InterruptedException {
        performSegmentPublisherOperation(deliveryGuarantee, z, null, null, null, null);
    }

    private <I, R> void performSegmentPublisherOperation(DeliveryGuarantee deliveryGuarantee, boolean z, IntSet intSet, Set<Integer> set, InvocationContext invocationContext, Map<Integer, String> map) throws InterruptedException {
        ClusterPublisherManager<Integer, String> cpm = cpm(cache(0));
        SegmentPublisherSupplier entryPublisher = z ? cpm.entryPublisher(intSet, set, invocationContext, 0L, deliveryGuarantee, 10, MarshallableFunctions.identity()) : cpm.keyPublisher(intSet, set, invocationContext, 0L, deliveryGuarantee, 10, MarshallableFunctions.identity());
        IntSet concurrentSet = IntSets.concurrentSet(10);
        TestSubscriber create = TestSubscriber.create();
        entryPublisher.publisherWithSegments().subscribe(create);
        create.await(10L, TimeUnit.SECONDS);
        for (SegmentPublisherSupplier.Notification notification : create.values()) {
            if (notification.isSegmentComplete()) {
                concurrentSet.set(notification.completedSegment());
            }
        }
        AssertJUnit.assertEquals(IntSets.immutableRangeSet(10), concurrentSet);
    }

    private <I, R> void performFunctionPublisherOperation(Publisher<R> publisher, Collection<R> collection) {
        int size = collection.size();
        List list = (List) Flowable.fromPublisher(publisher).toList(size).blockingGet();
        if (size != list.size()) {
            log.fatal("SIZE MISMATCH was: " + list.size());
        }
        AssertJUnit.assertEquals(size, list.size());
        list.forEach(obj -> {
            AssertJUnit.assertTrue(collection.contains(obj));
        });
    }

    private void removeEntriesNotInSegment(Map<?, ?> map, KeyPartitioner keyPartitioner, IntSet intSet) {
        Iterator<?> it = map.keySet().iterator();
        while (it.hasNext()) {
            if (!intSet.contains(keyPartitioner.getSegment(it.next()))) {
                it.remove();
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 614549645:
                if (implMethodName.equals("lambda$testMapIteration$9d72b6c5$1")) {
                    z = false;
                    break;
                }
                break;
            case 614549646:
                if (implMethodName.equals("lambda$testMapIteration$9d72b6c5$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/infinispan/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/infinispan/reactive/publisher/impl/SimpleClusterPublisherManagerTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/reactivestreams/Publisher;)Lorg/reactivestreams/Publisher;")) {
                    return publisher -> {
                        return Flowable.fromPublisher(publisher).map((v0) -> {
                            return v0.getValue();
                        }).map((v0) -> {
                            return String.valueOf(v0);
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/infinispan/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/infinispan/reactive/publisher/impl/SimpleClusterPublisherManagerTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/reactivestreams/Publisher;)Lorg/reactivestreams/Publisher;")) {
                    return publisher2 -> {
                        return Flowable.fromPublisher(publisher2).map((v0) -> {
                            return String.valueOf(v0);
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
