package org.infinispan.stream.impl;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.infinispan.CacheStream;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.ReplicatedConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.intops.object.DistinctOperation;
import org.infinispan.stream.impl.intops.object.FilterOperation;
import org.infinispan.stream.impl.intops.object.FlatMapOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToIntOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToLongOperation;
import org.infinispan.stream.impl.intops.object.LimitOperation;
import org.infinispan.stream.impl.intops.object.MapOperation;
import org.infinispan.stream.impl.intops.object.MapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.MapToIntOperation;
import org.infinispan.stream.impl.intops.object.MapToLongOperation;
import org.infinispan.stream.impl.intops.object.PeekOperation;
import org.infinispan.stream.impl.intops.object.SkipOperation;
import org.infinispan.stream.impl.intops.object.SortedComparatorOperation;
import org.infinispan.stream.impl.intops.object.SortedOperation;
import org.infinispan.stream.impl.termop.SingleRunOperation;
import org.infinispan.stream.impl.termop.object.ForEachOperation;
import org.infinispan.stream.impl.termop.object.NoMapIteratorOperation;
import org.infinispan.util.CloseableSuppliedIterator;
import org.infinispan.util.CloseableSupplier;
import org.infinispan.util.concurrent.TimeoutException;

/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.4.Final.jar:org/infinispan/stream/impl/DistributedCacheStream.class */
public class DistributedCacheStream<R> extends AbstractCacheStream<R, Stream<R>, Consumer<? super R>> implements CacheStream<R> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.4.Final.jar:org/infinispan/stream/impl/DistributedCacheStream$HandOffConsumer.class */
    public static class HandOffConsumer<R> implements Consumer<R> {
        private final BlockingQueue<R> queue;
        private final AtomicBoolean completed;
        private final Lock nextLock;
        private final Condition nextCondition;

        HandOffConsumer(BlockingQueue<R> blockingQueue, AtomicBoolean atomicBoolean, Lock lock, Condition condition) {
            this.queue = blockingQueue;
            this.completed = atomicBoolean;
            this.nextLock = lock;
            this.nextCondition = condition;
        }

        @Override // java.util.function.Consumer
        public void accept(R r) {
            if (this.queue.offer(r) || this.completed.get()) {
                return;
            }
            this.nextLock.lock();
            try {
                this.nextCondition.signalAll();
                while (!this.completed.get()) {
                    try {
                        if (this.queue.offer(r, 100L, TimeUnit.MILLISECONDS)) {
                            return;
                        }
                    } catch (InterruptedException e) {
                        throw new CacheException(e);
                    }
                }
            } finally {
                this.nextLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @SerializeWith(IdentityFinishCollectorExternalizer.class)
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.4.Final.jar:org/infinispan/stream/impl/DistributedCacheStream$IdentifyFinishCollector.class */
    public static final class IdentifyFinishCollector<T, A> implements Collector<T, A, A> {
        private final Collector<T, A, ?> realCollector;

        /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.4.Final.jar:org/infinispan/stream/impl/DistributedCacheStream$IdentifyFinishCollector$IdentityFinishCollectorExternalizer.class */
        public static final class IdentityFinishCollectorExternalizer implements Externalizer<IdentifyFinishCollector> {
            @Override // org.infinispan.commons.marshall.Externalizer
            public void writeObject(ObjectOutput objectOutput, IdentifyFinishCollector identifyFinishCollector) throws IOException {
                objectOutput.writeObject(identifyFinishCollector.realCollector);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.infinispan.commons.marshall.Externalizer
            /* renamed from: readObject */
            public IdentifyFinishCollector readObject2(ObjectInput objectInput) throws IOException, ClassNotFoundException {
                return new IdentifyFinishCollector((Collector) objectInput.readObject());
            }
        }

        IdentifyFinishCollector(Collector<T, A, ?> collector) {
            this.realCollector = collector;
        }

        @Override // java.util.stream.Collector
        public Supplier<A> supplier() {
            return this.realCollector.supplier();
        }

        @Override // java.util.stream.Collector
        public BiConsumer<A, T> accumulator() {
            return this.realCollector.accumulator();
        }

        @Override // java.util.stream.Collector
        public BinaryOperator<A> combiner() {
            return this.realCollector.combiner();
        }

        @Override // java.util.stream.Collector
        public Function<A, A> finisher() {
            return null;
        }

        @Override // java.util.stream.Collector
        public Set<Collector.Characteristics> characteristics() {
            Set<Collector.Characteristics> characteristics = this.realCollector.characteristics();
            if (characteristics.size() == 0) {
                return EnumSet.of(Collector.Characteristics.IDENTITY_FINISH);
            }
            EnumSet copyOf = EnumSet.copyOf((Collection) characteristics);
            copyOf.add(Collector.Characteristics.IDENTITY_FINISH);
            return copyOf;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.4.Final.jar:org/infinispan/stream/impl/DistributedCacheStream$IteratorSupplier.class */
    public static class IteratorSupplier<R> implements CloseableSupplier<R> {
        private final BlockingQueue<R> queue;
        private final AtomicBoolean completed;
        private final Lock nextLock;
        private final Condition nextCondition;
        private final ClusterStreamManager<?> clusterStreamManager;
        CacheException exception;
        volatile Object pending;
        private Consumer<R> consumer;

        IteratorSupplier(BlockingQueue<R> blockingQueue, AtomicBoolean atomicBoolean, Lock lock, Condition condition, ClusterStreamManager<?> clusterStreamManager) {
            this.queue = blockingQueue;
            this.completed = atomicBoolean;
            this.nextLock = lock;
            this.nextCondition = condition;
            this.clusterStreamManager = clusterStreamManager;
        }

        @Override // org.infinispan.util.CloseableSupplier, java.lang.AutoCloseable
        public void close() {
            close(null);
        }

        public void close(CacheException cacheException) {
            this.nextLock.lock();
            try {
                if (!this.completed.getAndSet(true) && cacheException != null) {
                    this.exception = cacheException;
                }
                if (this.pending != null) {
                    this.clusterStreamManager.forgetOperation(this.pending);
                    this.pending = null;
                }
                this.nextCondition.signalAll();
            } finally {
                this.nextLock.unlock();
            }
        }

        @Override // java.util.function.Supplier
        public R get() {
            R poll = this.queue.poll();
            if (poll == null) {
                if (this.completed.get()) {
                    if (this.exception != null) {
                        throw this.exception;
                    }
                    return null;
                }
                this.nextLock.lock();
                boolean z = false;
                while (true) {
                    try {
                        R poll2 = this.queue.poll();
                        poll = poll2;
                        if (poll2 != null || this.completed.get()) {
                            break;
                        }
                        try {
                            this.nextCondition.await(100L, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            z = true;
                        }
                    } finally {
                        this.nextLock.unlock();
                    }
                }
                if (poll == null) {
                    if (this.exception != null) {
                        throw this.exception;
                    }
                    return null;
                }
                if (z) {
                    Thread.currentThread().interrupt();
                }
                this.nextLock.unlock();
            }
            if (this.consumer != null && poll != null) {
                this.consumer.accept(poll);
            }
            return poll;
        }

        public void setConsumer(Consumer<R> consumer) {
            this.consumer = consumer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.4.Final.jar:org/infinispan/stream/impl/DistributedCacheStream$SegmentListenerNotifier.class */
    public static class SegmentListenerNotifier<T> implements Consumer<T> {
        private final CacheStream.SegmentCompletionListener listener;
        private final Map<T, Set<Integer>> segmentsByObject = new IdentityHashMap();

        SegmentListenerNotifier(CacheStream.SegmentCompletionListener segmentCompletionListener) {
            this.listener = segmentCompletionListener;
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            Set<Integer> remove = this.segmentsByObject.remove(t);
            if (remove != null) {
                this.listener.segmentCompleted(remove);
            }
        }

        public void addSegmentsForObject(T t, Set<Integer> set) {
            this.segmentsByObject.put(t, set);
        }

        public void completeSegmentsNoResults(Set<Integer> set) {
            this.listener.segmentCompleted(set);
        }
    }

    protected static Supplier<CacheStream<CacheEntry>> supplierStreamCast(Supplier supplier) {
        return supplier;
    }

    public <K, V> DistributedCacheStream(Address address, boolean z, DistributionManager distributionManager, Supplier<CacheStream<CacheEntry<K, V>>> supplier, ClusterStreamManager clusterStreamManager, boolean z2, int i, Executor executor, ComponentRegistry componentRegistry) {
        super(address, z, distributionManager, supplierStreamCast(supplier), clusterStreamManager, z2, i, executor, componentRegistry);
    }

    public <K, V> DistributedCacheStream(Address address, boolean z, DistributionManager distributionManager, Supplier<CacheStream<CacheEntry<K, V>>> supplier, ClusterStreamManager clusterStreamManager, boolean z2, int i, Executor executor, ComponentRegistry componentRegistry, Function<? super CacheEntry<K, V>, R> function) {
        super(address, z, distributionManager, supplierStreamCast(supplier), clusterStreamManager, z2, i, executor, componentRegistry);
        this.intermediateOperations.add(new MapOperation(function));
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DistributedCacheStream(AbstractCacheStream abstractCacheStream) {
        super(abstractCacheStream);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.stream.impl.AbstractCacheStream
    /* renamed from: unwrap */
    public Stream<R> unwrap2() {
        return this;
    }

    @Override // java.util.stream.Stream
    public Stream<R> filter(Predicate<? super R> predicate) {
        return addIntermediateOperation(new FilterOperation(predicate));
    }

    @Override // java.util.stream.Stream
    public <R1> Stream<R1> map(Function<? super R, ? extends R1> function) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        return (Stream) addIntermediateOperationMap(new MapOperation(function), this);
    }

    @Override // java.util.stream.Stream
    public IntStream mapToInt(ToIntFunction<? super R> toIntFunction) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        return (IntStream) addIntermediateOperationMap(new MapToIntOperation(toIntFunction), intCacheStream());
    }

    @Override // java.util.stream.Stream
    public LongStream mapToLong(ToLongFunction<? super R> toLongFunction) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        return (LongStream) addIntermediateOperationMap(new MapToLongOperation(toLongFunction), longCacheStream());
    }

    @Override // java.util.stream.Stream
    public DoubleStream mapToDouble(ToDoubleFunction<? super R> toDoubleFunction) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        return (DoubleStream) addIntermediateOperationMap(new MapToDoubleOperation(toDoubleFunction), doubleCacheStream());
    }

    @Override // java.util.stream.Stream
    public <R1> Stream<R1> flatMap(Function<? super R, ? extends Stream<? extends R1>> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        return (Stream) addIntermediateOperationMap(new FlatMapOperation(function), this);
    }

    @Override // java.util.stream.Stream
    public IntStream flatMapToInt(Function<? super R, ? extends IntStream> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        return (IntStream) addIntermediateOperationMap(new FlatMapToIntOperation(function), intCacheStream());
    }

    @Override // java.util.stream.Stream
    public LongStream flatMapToLong(Function<? super R, ? extends LongStream> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        return (LongStream) addIntermediateOperationMap(new FlatMapToLongOperation(function), longCacheStream());
    }

    @Override // java.util.stream.Stream
    public DoubleStream flatMapToDouble(Function<? super R, ? extends DoubleStream> function) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        return (DoubleStream) addIntermediateOperationMap(new FlatMapToDoubleOperation(function), doubleCacheStream());
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public Stream<R> distinct() {
        DistinctOperation distinctOperation = DistinctOperation.getInstance();
        markDistinct(distinctOperation, AbstractCacheStream.IntermediateType.OBJ);
        return addIntermediateOperation(distinctOperation);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public Stream<R> sorted() {
        markSorted(AbstractCacheStream.IntermediateType.OBJ);
        return addIntermediateOperation(SortedOperation.getInstance());
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public Stream<R> sorted(Comparator<? super R> comparator) {
        markSorted(AbstractCacheStream.IntermediateType.OBJ);
        return addIntermediateOperation(new SortedComparatorOperation(comparator));
    }

    @Override // java.util.stream.Stream
    public Stream<R> peek(Consumer<? super R> consumer) {
        return addIntermediateOperation(new PeekOperation(consumer));
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public Stream<R> limit(long j) {
        LimitOperation limitOperation = new LimitOperation(j);
        markDistinct(limitOperation, AbstractCacheStream.IntermediateType.OBJ);
        return addIntermediateOperation(limitOperation);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public Stream<R> skip(long j) {
        SkipOperation skipOperation = new SkipOperation(j);
        markSkip(AbstractCacheStream.IntermediateType.OBJ);
        return addIntermediateOperation(skipOperation);
    }

    @Override // java.util.stream.Stream
    public R reduce(R r, BinaryOperator<R> binaryOperator) {
        return (R) performOperation(TerminalFunctions.reduceFunction(r, binaryOperator), true, binaryOperator, null);
    }

    @Override // java.util.stream.Stream
    public Optional<R> reduce(BinaryOperator<R> binaryOperator) {
        return Optional.ofNullable(performOperation(TerminalFunctions.reduceFunction(binaryOperator), true, (obj, obj2) -> {
            return obj != null ? obj2 != null ? binaryOperator.apply(obj, obj2) : obj : obj2;
        }, null));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.stream.Stream
    public <U> U reduce(U u, BiFunction<U, ? super R, U> biFunction, BinaryOperator<U> binaryOperator) {
        return (U) performOperation(TerminalFunctions.reduceFunction(u, biFunction, binaryOperator), true, binaryOperator, null);
    }

    @Override // java.util.stream.Stream
    public <R1> R1 collect(Supplier<R1> supplier, BiConsumer<R1, ? super R> biConsumer, BiConsumer<R1, R1> biConsumer2) {
        return (R1) performOperation(TerminalFunctions.collectFunction(supplier, biConsumer, biConsumer2), true, (obj, obj2) -> {
            biConsumer2.accept(obj, obj2);
            return obj;
        }, null);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public <R1, A> R1 collect(Collector<? super R, A, R1> collector) {
        if (this.sorted) {
            this.sorted = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
        }
        if (collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
            return (R1) performOperation(TerminalFunctions.collectorFunction(collector), true, collector.combiner(), null, false);
        }
        return (R1) collector.finisher().apply(performOperation(TerminalFunctions.collectorFunction(new IdentifyFinishCollector(collector)), true, collector.combiner(), null, false));
    }

    @Override // java.util.stream.Stream
    public Optional<R> min(Comparator<? super R> comparator) {
        return Optional.ofNullable(performOperation(TerminalFunctions.minFunction(comparator), false, (obj, obj2) -> {
            if (obj == null) {
                return obj2;
            }
            if (obj2 != null && comparator.compare(obj, obj2) > 0) {
                return obj2;
            }
            return obj;
        }, null));
    }

    @Override // java.util.stream.Stream
    public Optional<R> max(Comparator<? super R> comparator) {
        return Optional.ofNullable(performOperation(TerminalFunctions.maxFunction(comparator), false, (obj, obj2) -> {
            if (obj == null) {
                return obj2;
            }
            if (obj2 != null && comparator.compare(obj, obj2) <= 0) {
                return obj2;
            }
            return obj;
        }, null));
    }

    @Override // java.util.stream.Stream
    public boolean anyMatch(Predicate<? super R> predicate) {
        return ((Boolean) performOperation(TerminalFunctions.anyMatchFunction(predicate), false, (v0, v1) -> {
            return Boolean.logicalOr(v0, v1);
        }, bool -> {
            return bool.booleanValue();
        })).booleanValue();
    }

    @Override // java.util.stream.Stream
    public boolean allMatch(Predicate<? super R> predicate) {
        return ((Boolean) performOperation(TerminalFunctions.allMatchFunction(predicate), false, (v0, v1) -> {
            return Boolean.logicalAnd(v0, v1);
        }, bool -> {
            return !bool.booleanValue();
        })).booleanValue();
    }

    @Override // java.util.stream.Stream
    public boolean noneMatch(Predicate<? super R> predicate) {
        return ((Boolean) performOperation(TerminalFunctions.noneMatchFunction(predicate), false, (v0, v1) -> {
            return Boolean.logicalAnd(v0, v1);
        }, bool -> {
            return !bool.booleanValue();
        })).booleanValue();
    }

    @Override // java.util.stream.Stream
    public Optional<R> findFirst() {
        if (!this.intermediateType.shouldUseIntermediate(this.sorted, this.distinct)) {
            return findAny();
        }
        Iterator<R> it = iterator();
        return (Optional) new SingleRunOperation(this.localIntermediateOperations, () -> {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 4352), this.parallel);
        }, stream -> {
            return stream.findFirst();
        }).performOperation();
    }

    @Override // java.util.stream.Stream
    public Optional<R> findAny() {
        return Optional.ofNullable(performOperation(TerminalFunctions.findAnyFunction(), false, (obj, obj2) -> {
            return obj == null ? obj2 : obj;
        }, Objects::nonNull));
    }

    @Override // java.util.stream.Stream
    public long count() {
        return ((Long) performOperation(TerminalFunctions.countFunction(), true, (l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        }, null)).longValue();
    }

    @Override // java.util.stream.BaseStream, org.infinispan.CacheStream
    public Iterator<R> iterator() {
        return this.intermediateType.shouldUseIntermediate(this.sorted, this.distinct) ? (Iterator) performIntermediateRemoteOperation(stream -> {
            return stream.iterator();
        }) : remoteIterator();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterator<R> remoteIterator() {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(this.distributedBatchSize);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        HandOffConsumer handOffConsumer = new HandOffConsumer(arrayBlockingQueue, atomicBoolean, reentrantLock, newCondition);
        IteratorSupplier<R> iteratorSupplier = new IteratorSupplier<>(arrayBlockingQueue, atomicBoolean, reentrantLock, newCondition, this.csm);
        boolean booleanValue = this.parallelDistribution == null ? false : this.parallelDistribution.booleanValue();
        if (this.rehashAware) {
            rehashAwareIteration(atomicBoolean, handOffConsumer, iteratorSupplier, booleanValue);
        } else {
            ignoreRehashIteration(handOffConsumer, iteratorSupplier, booleanValue);
        }
        CloseableSuppliedIterator closeableSuppliedIterator = new CloseableSuppliedIterator(iteratorSupplier);
        onClose(() -> {
            iteratorSupplier.close();
        });
        return closeableSuppliedIterator;
    }

    private void ignoreRehashIteration(Consumer<R> consumer, IteratorSupplier<R> iteratorSupplier, boolean z) {
        AbstractCacheStream.CollectionConsumer collectionConsumer = new AbstractCacheStream.CollectionConsumer(consumer);
        ConsistentHash consistentHash = this.dm.getConsistentHash();
        boolean z2 = consistentHash.getMembers().contains(this.localAddress) && this.segmentsToFilter != null && consistentHash.getSegmentsForOwner(this.localAddress).containsAll(this.segmentsToFilter);
        NoMapIteratorOperation noMapIteratorOperation = new NoMapIteratorOperation(this.intermediateOperations, supplierForSegments(consistentHash, this.segmentsToFilter, null, !z2), this.distributedBatchSize);
        Thread currentThread = Thread.currentThread();
        this.executor.execute(() -> {
            try {
                this.log.tracef("Thread %s submitted iterator request for stream", currentThread);
                if (z2) {
                    collectionConsumer.onCompletion((Address) null, Collections.emptySet(), (Collection) noMapIteratorOperation.performOperation2((KeyTrackingTerminalOperation.IntermediateCollector) collectionConsumer));
                } else {
                    Object remoteStreamOperation = this.csm.remoteStreamOperation(z, this.parallel, consistentHash, this.segmentsToFilter, this.keysToFilter, Collections.emptyMap(), this.includeLoader, noMapIteratorOperation, collectionConsumer);
                    collectionConsumer.onCompletion((Address) null, Collections.emptySet(), (Collection) noMapIteratorOperation.performOperation2((KeyTrackingTerminalOperation.IntermediateCollector) collectionConsumer));
                    if (remoteStreamOperation != null) {
                        iteratorSupplier.pending = remoteStreamOperation;
                        try {
                            try {
                                if (!this.csm.awaitCompletion(remoteStreamOperation, this.timeout, this.timeoutUnit)) {
                                    throw new TimeoutException();
                                }
                                this.csm.forgetOperation(remoteStreamOperation);
                            } catch (Throwable th) {
                                this.csm.forgetOperation(remoteStreamOperation);
                                throw th;
                            }
                        } catch (InterruptedException e) {
                            throw new CacheException(e);
                        }
                    }
                }
                iteratorSupplier.close();
            } catch (CacheException e2) {
                this.log.trace("Encountered local cache exception for stream", e2);
                iteratorSupplier.close(e2);
            } catch (Throwable th2) {
                this.log.trace("Encountered local throwable for stream", th2);
                iteratorSupplier.close(new CacheException(th2));
            }
        });
    }

    private void rehashAwareIteration(AtomicBoolean atomicBoolean, Consumer<R> consumer, IteratorSupplier<R> iteratorSupplier, boolean z) {
        SegmentListenerNotifier segmentListenerNotifier;
        ConsistentHash readConsistentHash = this.dm.getReadConsistentHash();
        if (this.segmentCompletionListener != null) {
            segmentListenerNotifier = new SegmentListenerNotifier(this.segmentCompletionListener);
            iteratorSupplier.setConsumer(segmentListenerNotifier);
        } else {
            segmentListenerNotifier = null;
        }
        AbstractCacheStream.KeyTrackingConsumer keyTrackingConsumer = new AbstractCacheStream.KeyTrackingConsumer(readConsistentHash, this.iteratorOperation.wrapConsumer(consumer), this.iteratorOperation.getFunction(), segmentListenerNotifier, this.keyEquivalence);
        Thread currentThread = Thread.currentThread();
        this.executor.execute(() -> {
            Set<Integer> set;
            Set<Object> emptySet;
            try {
                this.log.tracef("Thread %s submitted iterator request for stream", currentThread);
                Set<Integer> rangeSet = this.segmentsToFilter == null ? new ReplicatedConsistentHash.RangeSet(readConsistentHash.getNumSegments()) : this.segmentsToFilter;
                do {
                    ConsistentHash readConsistentHash2 = this.dm.getReadConsistentHash();
                    boolean contains = readConsistentHash2.getMembers().contains(this.localAddress);
                    boolean z2 = false;
                    if (contains) {
                        Set<Integer> segmentsForOwner = readConsistentHash2.getSegmentsForOwner(this.localAddress);
                        z2 = this.segmentsToFilter != null && segmentsForOwner.containsAll(this.segmentsToFilter);
                        set = z2 ? segmentsForOwner : readConsistentHash2.getPrimarySegmentsForOwner(this.localAddress);
                        set.retainAll(rangeSet);
                        emptySet = (Set) set.stream().flatMap(num -> {
                            return ((Set) keyTrackingConsumer.referenceArray.get(num.intValue())).stream();
                        }).collect(Collectors.toSet());
                    } else {
                        set = null;
                        emptySet = Collections.emptySet();
                    }
                    KeyTrackingTerminalOperation operation = this.iteratorOperation.getOperation(this.intermediateOperations, supplierForSegments(readConsistentHash2, rangeSet, emptySet, !z2), this.distributedBatchSize);
                    if (z2) {
                        performLocalRehashAwareOperation(keyTrackingConsumer, rangeSet, readConsistentHash2, set, operation, () -> {
                            return readConsistentHash2.getSegmentsForOwner(this.localAddress);
                        }, null);
                        rangeSet = segmentsToProcess(iteratorSupplier, keyTrackingConsumer, rangeSet, null);
                    } else {
                        Object remoteStreamOperationRehashAware = this.csm.remoteStreamOperationRehashAware(z, this.parallel, readConsistentHash2, rangeSet, this.keysToFilter, new AbstractCacheStream.AtomicReferenceArrayToMap(keyTrackingConsumer.referenceArray), this.includeLoader, operation, keyTrackingConsumer);
                        if (remoteStreamOperationRehashAware != null) {
                            iteratorSupplier.pending = remoteStreamOperationRehashAware;
                        }
                        if (contains) {
                            try {
                                performLocalRehashAwareOperation(keyTrackingConsumer, rangeSet, readConsistentHash2, set, operation, () -> {
                                    return readConsistentHash2.getPrimarySegmentsForOwner(this.localAddress);
                                }, remoteStreamOperationRehashAware);
                            } catch (Throwable th) {
                                this.csm.forgetOperation(remoteStreamOperationRehashAware);
                                throw th;
                            }
                        }
                        if (remoteStreamOperationRehashAware != null) {
                            try {
                                if (!this.csm.awaitCompletion(remoteStreamOperationRehashAware, this.timeout, this.timeoutUnit)) {
                                    throw new TimeoutException();
                                }
                            } catch (InterruptedException e) {
                                throw new CacheException(e);
                            }
                        }
                        rangeSet = segmentsToProcess(iteratorSupplier, keyTrackingConsumer, rangeSet, remoteStreamOperationRehashAware);
                        this.csm.forgetOperation(remoteStreamOperationRehashAware);
                    }
                } while (!atomicBoolean.get());
            } catch (CacheException e2) {
                this.log.trace("Encountered local cache exception for stream", e2);
                iteratorSupplier.close(e2);
            } catch (Throwable th2) {
                this.log.trace("Encountered local throwable for stream", th2);
                iteratorSupplier.close(new CacheException(th2));
            }
        });
    }

    private Set<Integer> segmentsToProcess(IteratorSupplier<R> iteratorSupplier, AbstractCacheStream<R, Stream<R>, Consumer<? super R>>.KeyTrackingConsumer<Object, R> keyTrackingConsumer, Set<Integer> set, Object obj) {
        String obj2 = obj == null ? "local" : obj.toString();
        if (keyTrackingConsumer.lostSegments.isEmpty()) {
            iteratorSupplier.close();
            this.log.tracef("Finished rehash aware operation for %s", obj2);
        } else {
            set = new HashSet(keyTrackingConsumer.lostSegments);
            keyTrackingConsumer.lostSegments.clear();
            this.log.tracef("Found %s lost segments for %s", set, obj2);
        }
        return set;
    }

    private void performLocalRehashAwareOperation(AbstractCacheStream<R, Stream<R>, Consumer<? super R>>.KeyTrackingConsumer<Object, R> keyTrackingConsumer, Set<Integer> set, ConsistentHash consistentHash, Set<Integer> set2, KeyTrackingTerminalOperation<Object, R, Object> keyTrackingTerminalOperation, Supplier<Set<Integer>> supplier, Object obj) {
        Collection<CacheEntry<Object, Object>> performOperationRehashAware = keyTrackingTerminalOperation.performOperationRehashAware(keyTrackingConsumer);
        if (this.dm.getReadConsistentHash().equals(consistentHash)) {
            this.log.tracef("Found local values %s for id %s", performOperationRehashAware.size(), obj);
            keyTrackingConsumer.onCompletion((Address) null, set2, performOperationRehashAware);
        } else {
            Set<Integer> set3 = supplier.get();
            set3.retainAll(set);
            this.log.tracef("CH changed - making %s segments suspect for identifier %s", set3, obj);
            keyTrackingConsumer.onSegmentsLost(set3);
        }
    }

    @Override // java.util.stream.BaseStream, org.infinispan.CacheStream
    public Spliterator<R> spliterator() {
        return Spliterators.spliterator(iterator(), Long.MAX_VALUE, 4096);
    }

    @Override // org.infinispan.CacheStream, java.util.stream.Stream
    public void forEach(Consumer<? super R> consumer) {
        if (this.rehashAware) {
            performRehashForEach(consumer);
        } else {
            performOperation(TerminalFunctions.forEachFunction(consumer), false, (r2, r3) -> {
                return null;
            }, null);
        }
    }

    KeyTrackingTerminalOperation getForEach(Consumer<? super R> consumer, Supplier<Stream<CacheEntry>> supplier) {
        return new ForEachOperation(this.intermediateOperations, supplier, this.distributedBatchSize, consumer);
    }

    @Override // java.util.stream.Stream
    public void forEachOrdered(Consumer<? super R> consumer) {
        if (!this.sorted) {
            forEach(consumer);
        } else {
            Iterator<R> it = iterator();
            new SingleRunOperation(this.localIntermediateOperations, () -> {
                return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 4352), this.parallel);
            }, stream -> {
                stream.forEachOrdered(consumer);
                return null;
            }).performOperation();
        }
    }

    @Override // java.util.stream.Stream
    public Object[] toArray() {
        return (Object[]) performOperation(TerminalFunctions.toArrayFunction(), false, (objArr, objArr2) -> {
            Object[] copyOf = Arrays.copyOf(objArr, objArr.length + objArr2.length);
            System.arraycopy(objArr2, 0, copyOf, objArr.length, objArr2.length);
            return copyOf;
        }, null, false);
    }

    @Override // java.util.stream.Stream
    public <A> A[] toArray(IntFunction<A[]> intFunction) {
        return (A[]) ((Object[]) performOperation(TerminalFunctions.toArrayFunction(intFunction), false, (objArr, objArr2) -> {
            Object[] objArr = (Object[]) intFunction.apply(objArr.length + objArr2.length);
            System.arraycopy(objArr, 0, objArr, 0, objArr.length);
            System.arraycopy(objArr2, 0, objArr, objArr.length, objArr2.length);
            return objArr;
        }, null, false));
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> sequentialDistribution() {
        this.parallelDistribution = false;
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> parallelDistribution() {
        this.parallelDistribution = true;
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> filterKeySegments(Set<Integer> set) {
        this.segmentsToFilter = set;
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> filterKeys(Set<?> set) {
        this.keysToFilter = set;
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> distributedBatchSize(int i) {
        this.distributedBatchSize = i;
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> segmentCompletionListener(CacheStream.SegmentCompletionListener segmentCompletionListener) {
        if (this.segmentCompletionListener == null) {
            this.segmentCompletionListener = segmentCompletionListener;
        } else {
            this.segmentCompletionListener = composeWithExceptions(this.segmentCompletionListener, segmentCompletionListener);
        }
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> disableRehashAware() {
        this.rehashAware = false;
        return this;
    }

    @Override // org.infinispan.CacheStream
    public CacheStream<R> timeout(long j, TimeUnit timeUnit) {
        if (j <= 0) {
            throw new IllegalArgumentException("Timeout must be greater than 0");
        }
        this.timeout = j;
        this.timeoutUnit = timeUnit;
        return this;
    }

    protected DistributedIntCacheStream intCacheStream() {
        return new DistributedIntCacheStream(this);
    }

    protected DistributedDoubleCacheStream doubleCacheStream() {
        return new DistributedDoubleCacheStream(this);
    }

    protected DistributedLongCacheStream longCacheStream() {
        return new DistributedLongCacheStream(this);
    }

    protected static CacheStream.SegmentCompletionListener composeWithExceptions(CacheStream.SegmentCompletionListener segmentCompletionListener, CacheStream.SegmentCompletionListener segmentCompletionListener2) {
        return set -> {
            try {
                segmentCompletionListener.segmentCompleted(set);
                segmentCompletionListener2.segmentCompleted(set);
            } catch (Throwable th) {
                try {
                    segmentCompletionListener2.segmentCompleted(set);
                } catch (Throwable th2) {
                    try {
                        th.addSuppressed(th2);
                    } catch (Throwable th3) {
                    }
                }
                throw th;
            }
        };
    }

    @Override // org.infinispan.stream.impl.AbstractCacheStream
    /* bridge */ /* synthetic */ KeyTrackingTerminalOperation getForEach(Object obj, Supplier supplier) {
        return getForEach((Consumer) obj, (Supplier<Stream<CacheEntry>>) supplier);
    }
}
