package org.infinispan.stream.impl;

import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subscribers.TestSubscriber;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(testName = "stream.impl.CompletionRehashPublisherDecoratorTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/stream/impl/CompletionRehashPublisherDecoratorTest.class */
public class CompletionRehashPublisherDecoratorTest {
    <S> CompletionRehashPublisherDecorator<S> createDecorator(Consumer<Supplier<PrimitiveIterator.OfInt>> consumer, Consumer<Object> consumer2) {
        return new CompletionRehashPublisherDecorator<>(AbstractCacheStream.IteratorOperation.NO_MAP, (DistributionManager) null, (Address) null, consumer, supplier -> {
        }, supplier2 -> {
        }, new WithinThreadExecutor(), consumer2, obj -> {
            return ((Map.Entry) obj).getKey();
        });
    }

    <S> CompletionRehashPublisherDecorator<S> createDecorator(ConsistentHash consistentHash, Set<Integer> set, Set<Integer> set2, Consumer<Supplier<PrimitiveIterator.OfInt>> consumer, Consumer<Object> consumer2) {
        Address address = (Address) Mockito.mock(Address.class);
        if (set != null) {
            Mockito.when(consistentHash.getSegmentsForOwner((Address) ArgumentMatchers.eq(address))).thenReturn(set);
        }
        if (set2 != null) {
            Mockito.when(consistentHash.getPrimarySegmentsForOwner((Address) ArgumentMatchers.eq(address))).thenReturn(set2);
        }
        DistributionManager distributionManager = (DistributionManager) Mockito.mock(DistributionManager.class);
        Mockito.when(distributionManager.getReadConsistentHash()).thenReturn(consistentHash);
        return new CompletionRehashPublisherDecorator<>(AbstractCacheStream.IteratorOperation.NO_MAP, distributionManager, address, consumer, supplier -> {
        }, supplier2 -> {
        }, new WithinThreadExecutor(), consumer2, obj -> {
            return ((Map.Entry) obj).getKey();
        });
    }

    void simpleAssert(Publisher<Object> publisher, PublishProcessor<Object> publishProcessor, Consumer<Supplier<PrimitiveIterator.OfInt>> consumer, CompletionRehashPublisherDecorator<Object> completionRehashPublisherDecorator, Consumer<IntSet> consumer2, IntSet intSet) {
        TestSubscriber test = Flowable.fromPublisher(publisher).test();
        test.assertNotComplete();
        ImmortalCacheEntry immortalCacheEntry = new ImmortalCacheEntry(1, 1);
        publishProcessor.onNext(immortalCacheEntry);
        ImmortalCacheEntry immortalCacheEntry2 = new ImmortalCacheEntry(2, 2);
        publishProcessor.onNext(immortalCacheEntry2);
        ImmortalCacheEntry immortalCacheEntry3 = new ImmortalCacheEntry(3, 3);
        publishProcessor.onNext(immortalCacheEntry3);
        test.assertValueCount(3);
        test.assertNotComplete();
        ((Consumer) Mockito.verify(consumer, Mockito.never())).accept((Supplier) Mockito.any());
        publishProcessor.onComplete();
        test.assertComplete();
        ((Consumer) Mockito.verify(consumer, Mockito.never())).accept((Supplier) Mockito.any());
        completionRehashPublisherDecorator.valueIterated(immortalCacheEntry);
        ((Consumer) Mockito.verify(consumer, Mockito.never())).accept((Supplier) Mockito.any());
        completionRehashPublisherDecorator.valueIterated(immortalCacheEntry2);
        ((Consumer) Mockito.verify(consumer, Mockito.never())).accept((Supplier) Mockito.any());
        completionRehashPublisherDecorator.valueIterated(immortalCacheEntry3);
        consumer2.accept(intSet);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Supplier.class);
        ((Consumer) Mockito.verify(consumer, Mockito.times(1))).accept((Supplier) forClass.capture());
        AssertJUnit.assertEquals(intSet, SmallIntSet.of((PrimitiveIterator.OfInt) ((Supplier) forClass.getValue()).get()));
    }

    public void testLocalOnlyStreamCompletes() {
        SmallIntSet of = SmallIntSet.of(1, 4);
        Consumer<Supplier<PrimitiveIterator.OfInt>> consumer = (Consumer) Mockito.mock(Consumer.class);
        Consumer<Object> consumer2 = (Consumer) Mockito.mock(Consumer.class);
        ConsistentHash consistentHash = (ConsistentHash) Mockito.mock(ConsistentHash.class);
        CompletionRehashPublisherDecorator<Object> createDecorator = createDecorator(consistentHash, of, null, consumer, consumer2);
        PublishProcessor<Object> create = PublishProcessor.create();
        simpleAssert(createDecorator.decorateLocal(consistentHash, true, of, create), create, consumer, createDecorator, intSet -> {
        }, of);
    }

    public void testRemoteOnlyStreamCompletes() {
        SmallIntSet of = SmallIntSet.of(1, 4);
        Consumer<Supplier<PrimitiveIterator.OfInt>> consumer = (Consumer) Mockito.mock(Consumer.class);
        CompletionRehashPublisherDecorator<Object> createDecorator = createDecorator(consumer, (Consumer) Mockito.mock(Consumer.class));
        TestRemoteIteratorPublisher testRemoteIteratorPublisher = new TestRemoteIteratorPublisher();
        simpleAssert(createDecorator.decorateRemote(testRemoteIteratorPublisher), testRemoteIteratorPublisher.publishProcessor(), consumer, createDecorator, intSet -> {
            Objects.requireNonNull(intSet);
            testRemoteIteratorPublisher.notifyCompleted(intSet::iterator);
        }, of);
    }

    public void testRemoteAndLocal() {
        SmallIntSet of = SmallIntSet.of(1, 4);
        Consumer<Supplier<PrimitiveIterator.OfInt>> consumer = (Consumer) Mockito.mock(Consumer.class);
        Consumer<Object> consumer2 = (Consumer) Mockito.mock(Consumer.class);
        ConsistentHash consistentHash = (ConsistentHash) Mockito.mock(ConsistentHash.class);
        CompletionRehashPublisherDecorator<Object> createDecorator = createDecorator(consistentHash, of, null, consumer, consumer2);
        PublishProcessor<Object> create = PublishProcessor.create();
        simpleAssert(createDecorator.decorateLocal(consistentHash, true, of, create), create, consumer, createDecorator, intSet -> {
        }, of);
        Mockito.reset(new Consumer[]{consumer});
        TestRemoteIteratorPublisher testRemoteIteratorPublisher = new TestRemoteIteratorPublisher();
        simpleAssert(createDecorator.decorateRemote(testRemoteIteratorPublisher), testRemoteIteratorPublisher.publishProcessor(), consumer, createDecorator, intSet2 -> {
            Objects.requireNonNull(intSet2);
            testRemoteIteratorPublisher.notifyCompleted(intSet2::iterator);
        }, of);
    }

    public void testRemoteAndLocalCompleteSameTime() {
        SmallIntSet of = SmallIntSet.of(1, 4);
        SmallIntSet of2 = SmallIntSet.of(2, 3);
        Consumer<Supplier<PrimitiveIterator.OfInt>> consumer = (Consumer) Mockito.mock(Consumer.class);
        Consumer<Object> consumer2 = (Consumer) Mockito.mock(Consumer.class);
        ConsistentHash consistentHash = (ConsistentHash) Mockito.mock(ConsistentHash.class);
        CompletionRehashPublisherDecorator createDecorator = createDecorator(consistentHash, of, null, consumer, consumer2);
        PublishProcessor create = PublishProcessor.create();
        TestSubscriber test = Flowable.fromPublisher(createDecorator.decorateLocal(consistentHash, true, of, create)).test();
        TestRemoteIteratorPublisher testRemoteIteratorPublisher = new TestRemoteIteratorPublisher();
        PublishProcessor publishProcessor = testRemoteIteratorPublisher.publishProcessor();
        TestSubscriber test2 = Flowable.fromPublisher(createDecorator.decorateRemote(testRemoteIteratorPublisher)).test();
        test.assertNotComplete();
        test2.assertNotComplete();
        ImmortalCacheEntry immortalCacheEntry = new ImmortalCacheEntry(1, 1);
        create.onNext(immortalCacheEntry);
        ImmortalCacheEntry immortalCacheEntry2 = new ImmortalCacheEntry(2, 2);
        publishProcessor.onNext(immortalCacheEntry2);
        ImmortalCacheEntry immortalCacheEntry3 = new ImmortalCacheEntry(3, 3);
        publishProcessor.onNext(immortalCacheEntry3);
        ImmortalCacheEntry immortalCacheEntry4 = new ImmortalCacheEntry(4, 4);
        create.onNext(immortalCacheEntry4);
        ImmortalCacheEntry immortalCacheEntry5 = new ImmortalCacheEntry(5, 5);
        publishProcessor.onNext(immortalCacheEntry5);
        test.assertValueCount(2);
        test2.assertValueCount(3);
        test.assertNotComplete();
        test2.assertNotComplete();
        create.onComplete();
        Objects.requireNonNull(of2);
        testRemoteIteratorPublisher.notifyCompleted(of2::iterator);
        publishProcessor.onComplete();
        test.assertComplete();
        test2.assertComplete();
        ((Consumer) Mockito.verify(consumer, Mockito.never())).accept((Supplier) ArgumentMatchers.any());
        createDecorator.valueIterated(immortalCacheEntry);
        createDecorator.valueIterated(immortalCacheEntry2);
        createDecorator.valueIterated(immortalCacheEntry3);
        createDecorator.valueIterated(immortalCacheEntry5);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Supplier.class);
        ((Consumer) Mockito.verify(consumer, Mockito.times(1))).accept((Supplier) forClass.capture());
        AssertJUnit.assertEquals(of2, SmallIntSet.of((PrimitiveIterator.OfInt) ((Supplier) forClass.getValue()).get()));
        Mockito.reset(new Consumer[]{consumer});
        createDecorator.valueIterated(immortalCacheEntry4);
        ((Consumer) Mockito.verify(consumer, Mockito.times(1))).accept((Supplier) forClass.capture());
        AssertJUnit.assertEquals(of, SmallIntSet.of((PrimitiveIterator.OfInt) ((Supplier) forClass.getValue()).get()));
    }
}
