package org.infinispan.stream.impl.tx;

import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.infinispan.CacheStream;
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.DistributedCacheStream;
import org.infinispan.stream.impl.DistributedDoubleCacheStream;
import org.infinispan.stream.impl.DistributedIntCacheStream;
import org.infinispan.stream.impl.DistributedLongCacheStream;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final.jar:org/infinispan/stream/impl/tx/TxDistributedCacheStream.class */
public class TxDistributedCacheStream<Original, R, K, V> extends DistributedCacheStream<Original, R> {
    private final Address localAddress;
    private final LocalTxInvocationContext ctx;
    private final ConsistentHash hash;
    private final Function<? super CacheEntry<K, V>, ? extends Original> toOriginalFunction;

    public TxDistributedCacheStream(Address address, boolean z, DistributionManager distributionManager, Supplier<CacheStream<R>> supplier, TxClusterStreamManager<Original, K> txClusterStreamManager, boolean z2, int i, Executor executor, ComponentRegistry componentRegistry, LocalTxInvocationContext localTxInvocationContext, Function<? super Original, ?> function, Function<? super CacheEntry<K, V>, ? extends Original> function2) {
        super(address, z, distributionManager, supplier, txClusterStreamManager, z2, i, executor, componentRegistry, function);
        this.localAddress = address;
        this.hash = distributionManager.getWriteConsistentHash();
        this.ctx = localTxInvocationContext;
        this.toOriginalFunction = function2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TxDistributedCacheStream(AbstractCacheStream abstractCacheStream, Address address, ConsistentHash consistentHash, LocalTxInvocationContext localTxInvocationContext, Function<? super CacheEntry<K, V>, ? extends Original> function) {
        super(abstractCacheStream);
        this.localAddress = address;
        this.hash = consistentHash;
        this.ctx = localTxInvocationContext;
        this.toOriginalFunction = function;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.stream.impl.AbstractCacheStream
    public Supplier<Stream<Original>> supplierForSegments(ConsistentHash consistentHash, IntSet intSet, Set<Object> set, boolean z) {
        return () -> {
            Supplier<Stream<Original>> supplierForSegments = super.supplierForSegments(consistentHash, intSet, set, z);
            ArrayList arrayList = new ArrayList();
            this.ctx.forEachValue((obj, cacheEntry) -> {
                if (isPrimaryOwner(consistentHash, obj)) {
                    return;
                }
                arrayList.add(this.toOriginalFunction.apply(cacheEntry));
            });
            Stream<Original> stream = supplierForSegments.get();
            return !arrayList.isEmpty() ? Stream.concat(arrayList.stream(), stream) : stream;
        };
    }

    @Override // org.infinispan.stream.impl.DistributedCacheStream
    protected DistributedDoubleCacheStream<Original> doubleCacheStream() {
        return new TxDistributedDoubleCacheStream(this, this.localAddress, this.hash, this.ctx, this.toOriginalFunction);
    }

    @Override // org.infinispan.stream.impl.DistributedCacheStream
    protected DistributedLongCacheStream<Original> longCacheStream() {
        return new TxDistributedLongCacheStream(this, this.localAddress, this.hash, this.ctx, this.toOriginalFunction);
    }

    @Override // org.infinispan.stream.impl.DistributedCacheStream
    protected DistributedIntCacheStream<Original> intCacheStream() {
        return new TxDistributedIntCacheStream(this, this.hash, this.ctx, this.toOriginalFunction);
    }
}
