package org.infinispan.stream.impl;

import java.util.ArrayDeque;
import java.util.PrimitiveIterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import org.infinispan.CacheStream;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.Util;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-11.0.4.Final.jar:org/infinispan/stream/impl/AbstractCacheStream.class */
public abstract class AbstractCacheStream<Original, T, S extends BaseStream<T, S>, S2 extends S> implements BaseStream<T, S> {
    protected final Queue<IntermediateOperation> intermediateOperations;
    protected final Address localAddress;
    protected final DistributionManager dm;
    protected final Supplier<CacheStream<Original>> supplier;
    protected final ClusterPublisherManager cpm;
    protected final Executor executor;
    protected final ComponentRegistry registry;
    protected final PartitionHandlingManager partition;
    protected final KeyPartitioner keyPartitioner;
    protected final StateTransferLock stateTransferLock;
    protected final boolean includeLoader;
    protected final Function<? super Original, ?> toKeyFunction;
    protected final InvocationContext invocationContext;
    protected Runnable closeRunnable;
    protected Boolean parallelDistribution;
    protected boolean parallel;
    protected boolean rehashAware;
    protected Set<?> keysToFilter;
    protected IntSet segmentsToFilter;
    protected int distributedBatchSize;
    protected Consumer<Supplier<PrimitiveIterator.OfInt>> segmentCompletionListener;
    protected IteratorOperation iteratorOperation;
    protected long timeout;
    protected TimeUnit timeoutUnit;

    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-11.0.4.Final.jar:org/infinispan/stream/impl/AbstractCacheStream$IteratorOperation.class */
    enum IteratorOperation {
        NO_MAP,
        MAP { // from class: org.infinispan.stream.impl.AbstractCacheStream.IteratorOperation.1
            @Override // org.infinispan.stream.impl.AbstractCacheStream.IteratorOperation
            public <In, Out> Function<In, Out> getFunction() {
                return obj -> {
                    return ((KeyValuePair) obj).getValue();
                };
            }
        },
        FLAT_MAP;

        public <In, Out> Function<In, Out> getFunction() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractCacheStream(Address address, boolean z, DistributionManager distributionManager, InvocationContext invocationContext, Supplier<CacheStream<Original>> supplier, boolean z2, int i, Executor executor, ComponentRegistry componentRegistry, Function<? super Original, ?> function) {
        this.closeRunnable = null;
        this.rehashAware = true;
        this.iteratorOperation = IteratorOperation.NO_MAP;
        this.timeout = 30L;
        this.timeoutUnit = TimeUnit.SECONDS;
        this.localAddress = address;
        this.parallel = z;
        this.dm = distributionManager;
        this.invocationContext = invocationContext;
        this.supplier = supplier;
        this.includeLoader = z2;
        this.distributedBatchSize = i;
        this.executor = executor;
        this.registry = componentRegistry;
        this.toKeyFunction = function;
        this.partition = (PartitionHandlingManager) componentRegistry.getComponent(PartitionHandlingManager.class);
        this.keyPartitioner = (KeyPartitioner) componentRegistry.getComponent(KeyPartitioner.class);
        this.stateTransferLock = (StateTransferLock) componentRegistry.getComponent(StateTransferLock.class);
        this.cpm = (ClusterPublisherManager) componentRegistry.getComponent(ClusterPublisherManager.class);
        this.intermediateOperations = new ArrayDeque();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractCacheStream(AbstractCacheStream<Original, T, S, S2> abstractCacheStream) {
        this.closeRunnable = null;
        this.rehashAware = true;
        this.iteratorOperation = IteratorOperation.NO_MAP;
        this.timeout = 30L;
        this.timeoutUnit = TimeUnit.SECONDS;
        this.intermediateOperations = abstractCacheStream.intermediateOperations;
        this.localAddress = abstractCacheStream.localAddress;
        this.dm = abstractCacheStream.dm;
        this.invocationContext = abstractCacheStream.invocationContext;
        this.supplier = abstractCacheStream.supplier;
        this.includeLoader = abstractCacheStream.includeLoader;
        this.executor = abstractCacheStream.executor;
        this.registry = abstractCacheStream.registry;
        this.toKeyFunction = abstractCacheStream.toKeyFunction;
        this.partition = abstractCacheStream.partition;
        this.keyPartitioner = abstractCacheStream.keyPartitioner;
        this.stateTransferLock = abstractCacheStream.stateTransferLock;
        this.cpm = abstractCacheStream.cpm;
        this.closeRunnable = abstractCacheStream.closeRunnable;
        this.parallel = abstractCacheStream.parallel;
        this.parallelDistribution = abstractCacheStream.parallelDistribution;
        this.rehashAware = abstractCacheStream.rehashAware;
        this.keysToFilter = abstractCacheStream.keysToFilter;
        this.segmentsToFilter = abstractCacheStream.segmentsToFilter;
        this.distributedBatchSize = abstractCacheStream.distributedBatchSize;
        this.segmentCompletionListener = abstractCacheStream.segmentCompletionListener;
        this.iteratorOperation = abstractCacheStream.iteratorOperation;
        this.timeout = abstractCacheStream.timeout;
        this.timeoutUnit = abstractCacheStream.timeoutUnit;
    }

    protected abstract Log getLog();

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Incorrect return type in method signature: (Lorg/infinispan/stream/impl/intops/IntermediateOperation<TT;TS;TT;TS;>;)TS2; */
    public BaseStream addIntermediateOperation(IntermediateOperation intermediateOperation) {
        intermediateOperation.handleInjection(this.registry);
        addIntermediateOperation(this.intermediateOperations, intermediateOperation);
        return unwrap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addIntermediateOperationMap(IntermediateOperation<T, S, ?, ?> intermediateOperation) {
        intermediateOperation.handleInjection(this.registry);
        addIntermediateOperation(this.intermediateOperations, intermediateOperation);
    }

    protected void addIntermediateOperation(Queue<IntermediateOperation> queue, IntermediateOperation<T, S, ?, ?> intermediateOperation) {
        queue.add(intermediateOperation);
    }

    /* JADX WARN: Incorrect return type in method signature: ()TS2; */
    protected abstract BaseStream unwrap();

    @Override // java.util.stream.BaseStream
    public boolean isParallel() {
        return this.parallel;
    }

    /* JADX WARN: Incorrect return type in method signature: ()TS2; */
    @Override // java.util.stream.BaseStream
    public BaseStream sequential() {
        this.parallel = false;
        return unwrap();
    }

    /* JADX WARN: Incorrect return type in method signature: ()TS2; */
    @Override // java.util.stream.BaseStream
    public BaseStream parallel() {
        this.parallel = true;
        return unwrap();
    }

    /* JADX WARN: Incorrect return type in method signature: ()TS2; */
    @Override // java.util.stream.BaseStream
    public BaseStream unordered() {
        return unwrap();
    }

    /* JADX WARN: Incorrect return type in method signature: (Ljava/lang/Runnable;)TS2; */
    @Override // java.util.stream.BaseStream
    public BaseStream onClose(Runnable runnable) {
        if (this.closeRunnable == null) {
            this.closeRunnable = runnable;
        } else {
            this.closeRunnable = Util.composeWithExceptions(this.closeRunnable, runnable);
        }
        return unwrap();
    }

    @Override // java.util.stream.BaseStream, java.lang.AutoCloseable
    public void close() {
        if (this.closeRunnable != null) {
            this.closeRunnable.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <R> R performPublisherOperation(Function<? super Publisher<T>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        Function<? super Publisher<T>, ? extends CompletionStage<R>> cacheStreamIntermediateReducer = this.intermediateOperations.isEmpty() ? function : new CacheStreamIntermediateReducer(this.intermediateOperations, function);
        DeliveryGuarantee deliveryGuarantee = this.rehashAware ? DeliveryGuarantee.EXACTLY_ONCE : DeliveryGuarantee.AT_MOST_ONCE;
        return (R) CompletionStages.join(this.toKeyFunction == null ? this.cpm.keyReduction(this.parallel, this.segmentsToFilter, this.keysToFilter, this.invocationContext, this.includeLoader, deliveryGuarantee, cacheStreamIntermediateReducer, function2) : this.cpm.entryReduction(this.parallel, this.segmentsToFilter, this.keysToFilter, this.invocationContext, this.includeLoader, deliveryGuarantee, cacheStreamIntermediateReducer, function2));
    }

    protected boolean isPrimaryOwner(ConsistentHash consistentHash, Object obj) {
        return this.localAddress.equals(consistentHash.locatePrimaryOwnerForSegment(this.keyPartitioner.getSegment(obj)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Consumer<Supplier<PrimitiveIterator.OfInt>> composeWithExceptions(Consumer<Supplier<PrimitiveIterator.OfInt>> consumer, Consumer<Supplier<PrimitiveIterator.OfInt>> consumer2) {
        return supplier -> {
            try {
                consumer.accept(supplier);
                consumer2.accept(supplier);
            } catch (Throwable th) {
                try {
                    consumer2.accept(supplier);
                } catch (Throwable th2) {
                    try {
                        th.addSuppressed(th2);
                    } catch (Throwable th3) {
                    }
                }
                throw th;
            }
        };
    }
}
