package org.infinispan.stream.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.infinispan.BaseCacheStream;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.DoubleCacheStream;
import org.infinispan.IntCacheStream;
import org.infinispan.LongCacheStream;
import org.infinispan.commons.util.AbstractIterator;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.marshall.core.MarshallableFunctions;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.PublisherReducers;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentCompletionPublisher;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.intops.object.DistinctOperation;
import org.infinispan.stream.impl.intops.object.FilterOperation;
import org.infinispan.stream.impl.intops.object.FlatMapOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToIntOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToLongOperation;
import org.infinispan.stream.impl.intops.object.LimitOperation;
import org.infinispan.stream.impl.intops.object.MapOperation;
import org.infinispan.stream.impl.intops.object.MapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.MapToIntOperation;
import org.infinispan.stream.impl.intops.object.MapToLongOperation;
import org.infinispan.stream.impl.intops.object.PeekOperation;
import org.infinispan.util.Closeables;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/stream/impl/DistributedCacheStream.class */
public class DistributedCacheStream<Original, R> extends AbstractCacheStream<Original, R, Stream<R>, CacheStream<R>> implements CacheStream<R> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final int maxSegment;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/stream/impl/DistributedCacheStream$CompletionSegmentTracker.class */
    public class CompletionSegmentTracker implements IntConsumer, Consumer<Object> {
        private final java.util.function.Consumer<Supplier<PrimitiveIterator.OfInt>> listener;
        private final Map<Object, IntSet> awaitingNotification;
        volatile IntSet completedSegments;

        private CompletionSegmentTracker(java.util.function.Consumer<Supplier<PrimitiveIterator.OfInt>> consumer) {
            this.listener = (java.util.function.Consumer) Objects.requireNonNull(consumer);
            this.awaitingNotification = new HashMap();
            this.completedSegments = IntSets.concurrentSet(DistributedCacheStream.this.maxSegment);
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            this.completedSegments.set(i);
        }

        public void accept(Object obj) {
            if (this.completedSegments.isEmpty()) {
                return;
            }
            DistributedCacheStream.log.tracef("Going to complete segments %s when %s is iterated upon", this.completedSegments, Util.toStr(obj));
            this.awaitingNotification.put(obj, this.completedSegments);
            this.completedSegments = IntSets.concurrentSet(DistributedCacheStream.this.maxSegment);
        }

        public void returningObject(Object obj) {
            IntSet remove = this.awaitingNotification.remove(obj);
            if (remove != null) {
                DistributedCacheStream.log.tracef("Notifying listeners of segments %s complete now that %s is returning", remove, Util.toStr(obj));
                java.util.function.Consumer<Supplier<PrimitiveIterator.OfInt>> consumer = this.listener;
                Objects.requireNonNull(remove);
                consumer.accept(remove::iterator);
            }
        }

        public void onComplete() {
            DistributedCacheStream.log.tracef("Completing last segments of: %s", this.completedSegments);
            java.util.function.Consumer<Supplier<PrimitiveIterator.OfInt>> consumer = this.listener;
            IntSet intSet = this.completedSegments;
            Objects.requireNonNull(intSet);
            consumer.accept(intSet::iterator);
            this.completedSegments.clear();
        }
    }

    protected static <R> Supplier<CacheStream<R>> supplierStreamCast(Supplier supplier) {
        return supplier;
    }

    public DistributedCacheStream(Address address, boolean z, DistributionManager distributionManager, InvocationContext invocationContext, Supplier<CacheStream<R>> supplier, boolean z2, int i, Executor executor, ComponentRegistry componentRegistry, Function<? super Original, ?> function) {
        super(address, z, distributionManager, invocationContext, supplierStreamCast(supplier), z2, i, executor, componentRegistry, function);
        this.maxSegment = ((Configuration) componentRegistry.getComponent(Configuration.class)).clustering().hash().numSegments();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DistributedCacheStream(AbstractCacheStream abstractCacheStream) {
        super(abstractCacheStream);
        this.maxSegment = ((Configuration) this.registry.getComponent(Configuration.class)).clustering().hash().numSegments();
    }

    @Override // org.infinispan.stream.impl.AbstractCacheStream
    protected Log getLog() {
        return log;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.stream.impl.AbstractCacheStream
    public CacheStream<R> unwrap() {
        return this;
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> filter(Predicate<? super R> predicate) {
        return (CacheStream) addIntermediateOperation(new FilterOperation(predicate));
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public <R1> CacheStream<R1> map(Function<? super R, ? extends R1> function) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        addIntermediateOperationMap(new MapOperation(function));
        return this;
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public IntCacheStream mapToInt(ToIntFunction<? super R> toIntFunction) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        addIntermediateOperationMap(new MapToIntOperation(toIntFunction));
        return intCacheStream();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public LongCacheStream mapToLong(ToLongFunction<? super R> toLongFunction) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        addIntermediateOperationMap(new MapToLongOperation(toLongFunction));
        return longCacheStream();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public DoubleCacheStream mapToDouble(ToDoubleFunction<? super R> toDoubleFunction) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        addIntermediateOperationMap(new MapToDoubleOperation(toDoubleFunction));
        return doubleCacheStream();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public <R1> CacheStream<R1> flatMap(Function<? super R, ? extends Stream<? extends R1>> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        addIntermediateOperationMap(new FlatMapOperation(function));
        return this;
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public IntCacheStream flatMapToInt(Function<? super R, ? extends IntStream> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        addIntermediateOperationMap(new FlatMapToIntOperation(function));
        return intCacheStream();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public LongCacheStream flatMapToLong(Function<? super R, ? extends LongStream> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        addIntermediateOperationMap(new FlatMapToLongOperation(function));
        return longCacheStream();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public DoubleCacheStream flatMapToDouble(Function<? super R, ? extends DoubleStream> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        addIntermediateOperationMap(new FlatMapToDoubleOperation(function));
        return doubleCacheStream();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> distinct() {
        addIntermediateOperation(DistinctOperation.getInstance());
        return new IntermediateCacheStream(this).distinct();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> sorted() {
        return new IntermediateCacheStream(this).sorted();
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> sorted(Comparator<? super R> comparator) {
        return new IntermediateCacheStream(this).sorted((Comparator) comparator);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> peek(java.util.function.Consumer<? super R> consumer) {
        return (CacheStream) addIntermediateOperation(new PeekOperation(consumer));
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> limit(long j) {
        addIntermediateOperation(new LimitOperation(j));
        return new IntermediateCacheStream(this).limit(j);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public CacheStream<R> skip(long j) {
        return new IntermediateCacheStream(this).skip(j);
    }

    @Override // java.util.stream.Stream
    public R reduce(R r, BinaryOperator<R> binaryOperator) {
        return (R) performPublisherOperation(PublisherReducers.reduce(r, binaryOperator), PublisherReducers.reduce(binaryOperator));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.stream.Stream
    public Optional<R> reduce(BinaryOperator<R> binaryOperator) {
        Function reduce = PublisherReducers.reduce(binaryOperator);
        return Optional.ofNullable(performPublisherOperation(reduce, reduce));
    }

    @Override // java.util.stream.Stream
    public <U> U reduce(U u, BiFunction<U, ? super R, U> biFunction, BinaryOperator<U> binaryOperator) {
        return (U) performPublisherOperation(PublisherReducers.reduce(u, biFunction), PublisherReducers.reduce(binaryOperator));
    }

    @Override // java.util.stream.Stream
    public <R1> R1 collect(Supplier<R1> supplier, BiConsumer<R1, ? super R> biConsumer, BiConsumer<R1, R1> biConsumer2) {
        return (R1) performPublisherOperation(PublisherReducers.collect(supplier, biConsumer), PublisherReducers.accumulate(biConsumer2));
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public <R1, A> R1 collect(Collector<? super R, A, R1> collector) {
        R1 r1 = (R1) performPublisherOperation(PublisherReducers.collectorReducer(collector), PublisherReducers.collectorFinalizer(collector));
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? r1 : (R1) collector.finisher().apply(r1);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.stream.Stream
    public Optional<R> min(Comparator<? super R> comparator) {
        Function min = PublisherReducers.min(comparator);
        return Optional.ofNullable(performPublisherOperation(min, min));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.stream.Stream
    public Optional<R> max(Comparator<? super R> comparator) {
        Function max = PublisherReducers.max(comparator);
        return Optional.ofNullable(performPublisherOperation(max, max));
    }

    @Override // java.util.stream.Stream
    public boolean anyMatch(Predicate<? super R> predicate) {
        return ((Boolean) performPublisherOperation(PublisherReducers.anyMatch(predicate), PublisherReducers.or())).booleanValue();
    }

    @Override // java.util.stream.Stream
    public boolean allMatch(Predicate<? super R> predicate) {
        return ((Boolean) performPublisherOperation(PublisherReducers.allMatch(predicate), PublisherReducers.and())).booleanValue();
    }

    @Override // java.util.stream.Stream
    public boolean noneMatch(Predicate<? super R> predicate) {
        return ((Boolean) performPublisherOperation(PublisherReducers.noneMatch(predicate), PublisherReducers.and())).booleanValue();
    }

    @Override // java.util.stream.Stream
    public Optional<R> findFirst() {
        return findAny();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.stream.Stream
    public Optional<R> findAny() {
        Function findFirst = PublisherReducers.findFirst();
        return Optional.ofNullable(performPublisherOperation(findFirst, findFirst));
    }

    @Override // java.util.stream.Stream
    public long count() {
        return ((Long) performPublisherOperation(PublisherReducers.count(), PublisherReducers.add())).longValue();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v36, types: [java.util.function.Function] */
    @Override // java.util.stream.BaseStream, org.infinispan.CacheStream
    public Iterator<R> iterator() {
        CompletionSegmentTracker completionSegmentTracker;
        SegmentCompletionPublisher<R> segmentCompletionPublisher;
        log.tracef("Distributed iterator invoked with rehash: %s", Boolean.valueOf(this.rehashAware));
        CacheIntermediatePublisher identity = this.intermediateOperations.isEmpty() ? MarshallableFunctions.identity() : new CacheIntermediatePublisher(this.intermediateOperations);
        DeliveryGuarantee deliveryGuarantee = this.rehashAware ? DeliveryGuarantee.EXACTLY_ONCE : DeliveryGuarantee.AT_MOST_ONCE;
        SegmentCompletionPublisher<R> keyPublisher = this.toKeyFunction == null ? this.cpm.keyPublisher(this.segmentsToFilter, this.keysToFilter, null, this.includeLoader, deliveryGuarantee, this.distributedBatchSize, identity) : this.cpm.entryPublisher(this.segmentsToFilter, this.keysToFilter, null, this.includeLoader, deliveryGuarantee, this.distributedBatchSize, identity);
        if (this.segmentCompletionListener != null) {
            completionSegmentTracker = new CompletionSegmentTracker(this.segmentCompletionListener);
            SegmentCompletionPublisher<R> segmentCompletionPublisher2 = keyPublisher;
            segmentCompletionPublisher = Flowable.fromPublisher(subscriber -> {
                segmentCompletionPublisher2.subscribe(subscriber, completionSegmentTracker);
            }).doOnNext(completionSegmentTracker);
        } else {
            completionSegmentTracker = null;
            segmentCompletionPublisher = keyPublisher;
        }
        final CloseableIterator it = Closeables.iterator(Flowable.fromPublisher(segmentCompletionPublisher).onErrorResumeNext(RxJavaInterop.cacheExceptionWrapper()), this.distributedBatchSize);
        Objects.requireNonNull(it);
        onClose(it::close);
        if (completionSegmentTracker == null) {
            return it;
        }
        final CompletionSegmentTracker completionSegmentTracker2 = completionSegmentTracker;
        return new AbstractIterator<R>() { // from class: org.infinispan.stream.impl.DistributedCacheStream.1
            protected R getNext() {
                if (!it.hasNext()) {
                    completionSegmentTracker2.onComplete();
                    return null;
                }
                R r = (R) it.next();
                completionSegmentTracker2.returningObject(r);
                return r;
            }
        };
    }

    @Override // java.util.stream.BaseStream, org.infinispan.CacheStream
    public Spliterator<R> spliterator() {
        return Spliterators.spliterator(iterator(), Long.MAX_VALUE, 4096);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public void forEach(java.util.function.Consumer<? super R> consumer) {
        peek((java.util.function.Consumer) consumer).iterator().forEachRemaining(obj -> {
        });
    }

    @Override // org.infinispan.CacheStream
    public <K, V> void forEach(BiConsumer<Cache<K, V>, ? super R> biConsumer) {
        peek((java.util.function.Consumer) CacheBiConsumers.objectConsumer(biConsumer)).iterator().forEachRemaining(obj -> {
        });
    }

    @Override // java.util.stream.Stream
    public void forEachOrdered(java.util.function.Consumer<? super R> consumer) {
        forEach(consumer);
    }

    @Override // java.util.stream.Stream
    public Object[] toArray() {
        return (Object[]) performPublisherOperation(PublisherReducers.toArrayReducer(), PublisherReducers.toArrayFinalizer());
    }

    @Override // java.util.stream.Stream
    public <A> A[] toArray(IntFunction<A[]> intFunction) {
        return (A[]) ((Object[]) performPublisherOperation(PublisherReducers.toArrayReducer(intFunction), PublisherReducers.toArrayFinalizer(intFunction)));
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> sequentialDistribution() {
        this.parallelDistribution = false;
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> parallelDistribution() {
        this.parallelDistribution = true;
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> filterKeySegments(Set<Integer> set) {
        this.segmentsToFilter = IntSets.from(set);
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> filterKeySegments(IntSet intSet) {
        this.segmentsToFilter = intSet;
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> filterKeys(Set<?> set) {
        this.keysToFilter = set;
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> distributedBatchSize(int i) {
        this.distributedBatchSize = i;
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> segmentCompletionListener(BaseCacheStream.SegmentCompletionListener segmentCompletionListener) {
        if (this.segmentCompletionListener == null) {
            this.segmentCompletionListener = segmentCompletionListener;
        } else {
            this.segmentCompletionListener = composeWithExceptions(this.segmentCompletionListener, segmentCompletionListener);
        }
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> disableRehashAware() {
        this.rehashAware = false;
        return this;
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public CacheStream<R> timeout(long j, TimeUnit timeUnit) {
        if (j <= 0) {
            throw new IllegalArgumentException("Timeout must be greater than 0");
        }
        this.timeout = j;
        this.timeoutUnit = timeUnit;
        return this;
    }

    protected DistributedIntCacheStream intCacheStream() {
        return new DistributedIntCacheStream(this);
    }

    protected DistributedDoubleCacheStream doubleCacheStream() {
        return new DistributedDoubleCacheStream(this);
    }

    protected DistributedLongCacheStream longCacheStream() {
        return new DistributedLongCacheStream(this);
    }

    @Override // org.infinispan.stream.impl.AbstractCacheStream, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ CacheStream onClose(Runnable runnable) {
        return (CacheStream) super.onClose(runnable);
    }

    @Override // org.infinispan.stream.impl.AbstractCacheStream, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ CacheStream unordered() {
        return (CacheStream) super.unordered();
    }

    @Override // org.infinispan.stream.impl.AbstractCacheStream, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ CacheStream sequential() {
        return (CacheStream) super.sequential();
    }

    @Override // org.infinispan.stream.impl.AbstractCacheStream, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ CacheStream parallel() {
        return (CacheStream) super.parallel();
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public /* bridge */ /* synthetic */ BaseCacheStream filterKeys(Set set) {
        return filterKeys((Set<?>) set);
    }

    @Override // org.infinispan.CacheStream, org.infinispan.BaseCacheStream
    public /* bridge */ /* synthetic */ BaseCacheStream filterKeySegments(Set set) {
        return filterKeySegments((Set<Integer>) set);
    }
}
