package org.infinispan.stream.impl;

import java.util.Collection;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.CacheStream;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.stream.impl.IteratorHandler;
import org.infinispan.stream.impl.IteratorResponses;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.ByteString;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener(observation = Listener.Observation.POST)
/* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final.jar:org/infinispan/stream/impl/LocalStreamManagerImpl.class */
public class LocalStreamManagerImpl<Original, K, V> implements LocalStreamManager<Original, K> {
    private static final Log log = LogFactory.getLog(LocalStreamManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private AdvancedCache<K, V> cache;

    @Inject
    private ComponentRegistry registry;

    @Inject
    private DistributionManager dm;

    @Inject
    private RpcManager rpc;

    @Inject
    private CommandsFactory factory;

    @Inject
    private IteratorHandler iteratorHandler;
    private boolean hasLoader;
    private boolean isReplicated;
    private int maxSegment;
    private Address localAddress;
    private final ConcurrentMap<Object, LocalStreamManagerImpl<Original, K, V>.SegmentListener> changeListener = CollectionFactory.makeConcurrentMap();
    private ByteString cacheName;

    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final.jar:org/infinispan/stream/impl/LocalStreamManagerImpl$NonRehashIntermediateCollector.class */
    class NonRehashIntermediateCollector<R> implements KeyTrackingTerminalOperation.IntermediateCollector<R> {
        private final Address origin;
        private final Object requestId;
        private final boolean useManagedBlocker;

        /* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final.jar:org/infinispan/stream/impl/LocalStreamManagerImpl$NonRehashIntermediateCollector$ResponseBlocker.class */
        class ResponseBlocker implements ForkJoinPool.ManagedBlocker {
            private final R response;
            private boolean completed = false;

            ResponseBlocker(R r) {
                this.response = r;
            }

            @Override // java.util.concurrent.ForkJoinPool.ManagedBlocker
            public boolean block() throws InterruptedException {
                if (!this.completed) {
                    synchronized (NonRehashIntermediateCollector.this) {
                        LocalStreamManagerImpl.this.rpc.blocking(LocalStreamManagerImpl.this.rpc.invokeCommand(NonRehashIntermediateCollector.this.origin, new StreamResponseCommand(LocalStreamManagerImpl.this.cacheName, LocalStreamManagerImpl.this.localAddress, NonRehashIntermediateCollector.this.requestId, false, this.response), SingleResponseCollector.validOnly(), LocalStreamManagerImpl.this.rpc.getSyncRpcOptions()));
                    }
                }
                this.completed = true;
                return this.completed;
            }

            @Override // java.util.concurrent.ForkJoinPool.ManagedBlocker
            public boolean isReleasable() {
                return this.completed;
            }
        }

        NonRehashIntermediateCollector(Address address, Object obj, boolean z) {
            this.origin = address;
            this.requestId = obj;
            this.useManagedBlocker = z;
        }

        @Override // org.infinispan.stream.impl.KeyTrackingTerminalOperation.IntermediateCollector
        public void sendDataResonse(R r) {
            if (!this.useManagedBlocker) {
                LocalStreamManagerImpl.this.rpc.blocking(LocalStreamManagerImpl.this.rpc.invokeCommand(this.origin, new StreamResponseCommand(LocalStreamManagerImpl.this.cacheName, LocalStreamManagerImpl.this.localAddress, this.requestId, false, r), SingleResponseCollector.validOnly(), LocalStreamManagerImpl.this.rpc.getSyncRpcOptions()));
            } else {
                try {
                    ForkJoinPool.managedBlock(new ResponseBlocker(r));
                } catch (InterruptedException e) {
                    throw new CacheException(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final.jar:org/infinispan/stream/impl/LocalStreamManagerImpl$SegmentListener.class */
    public class SegmentListener {
        private final IntSet segments;
        private final SegmentAwareOperation op;
        private final IntSet segmentsLost;

        SegmentListener(IntSet intSet, SegmentAwareOperation segmentAwareOperation) {
            this.segments = IntSets.mutableCopyFrom(intSet);
            this.op = segmentAwareOperation;
            this.segmentsLost = IntSets.concurrentSet(LocalStreamManagerImpl.this.maxSegment);
        }

        public void localSegments(IntSet intSet) {
            PrimitiveIterator.OfInt it = this.segments.iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                if (!intSet.contains(nextInt)) {
                    if (LocalStreamManagerImpl.trace) {
                        LocalStreamManagerImpl.log.tracef("Could not process segment %s", nextInt);
                    }
                    this.segmentsLost.add(nextInt);
                }
            }
        }

        public void lostSegments(IntSet intSet) {
            PrimitiveIterator.OfInt it = intSet.iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                if (this.segments.contains(nextInt)) {
                    if (LocalStreamManagerImpl.trace) {
                        LocalStreamManagerImpl.log.tracef("Lost segment %s", nextInt);
                    }
                    if (this.op.lostSegment(false) && this.segmentsLost.add(nextInt) && this.segmentsLost.size() == this.segments.size()) {
                        if (LocalStreamManagerImpl.trace) {
                            LocalStreamManagerImpl.log.tracef("All segments %s are now lost", this.segments);
                        }
                        this.op.lostSegment(true);
                    }
                }
            }
        }
    }

    @Inject
    public void inject(Cache<K, V> cache) {
        this.cache = AbstractDelegatingCache.unwrapCache(cache).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.REMOTE_ITERATION);
    }

    @Start
    public void start() {
        this.cacheName = ByteString.fromString(this.cache.getName());
        this.hasLoader = this.cache.getCacheConfiguration().persistence().usingStores();
        ClusteringConfiguration clustering = this.cache.getCacheConfiguration().clustering();
        this.maxSegment = clustering.hash().numSegments();
        this.localAddress = this.rpc.getAddress();
        this.isReplicated = clustering.cacheMode().isReplicated();
        if (this.isReplicated) {
            return;
        }
        this.cache.addListener(this);
    }

    @DataRehashed
    public void dataRehashed(DataRehashedEvent<K, V> dataRehashedEvent) {
        ConsistentHash consistentHashAtStart = dataRehashedEvent.getConsistentHashAtStart();
        ConsistentHash consistentHashAtEnd = dataRehashedEvent.getConsistentHashAtEnd();
        boolean isTraceEnabled = log.isTraceEnabled();
        if (consistentHashAtStart == null || consistentHashAtEnd == null) {
            return;
        }
        if (isTraceEnabled) {
            log.tracef("Data rehash occurred startHash: %s and endHash: %s with new topology %s and was pre %s", consistentHashAtStart, consistentHashAtEnd, Integer.valueOf(dataRehashedEvent.getNewTopologyId()), Boolean.valueOf(dataRehashedEvent.isPre()));
        }
        if (this.changeListener.isEmpty()) {
            if (isTraceEnabled) {
                log.tracef("No change listeners present!", new Object[0]);
                return;
            }
            return;
        }
        if (isTraceEnabled) {
            log.tracef("Previous segments %s ", consistentHashAtStart.getSegmentsForOwner(this.localAddress));
            log.tracef("After segments %s ", consistentHashAtEnd.getSegmentsForOwner(this.localAddress));
        }
        IntSet mutableFrom = IntSets.mutableFrom(consistentHashAtStart.getSegmentsForOwner(this.localAddress));
        mutableFrom.removeAll(consistentHashAtEnd.getSegmentsForOwner(this.localAddress));
        if (mutableFrom.isEmpty()) {
            if (isTraceEnabled) {
                log.tracef("No segments have been removed from data rehash, no notification required", new Object[0]);
            }
        } else {
            for (Map.Entry<Object, LocalStreamManagerImpl<Original, K, V>.SegmentListener> entry : this.changeListener.entrySet()) {
                if (isTraceEnabled) {
                    log.tracef("Notifying %s through SegmentChangeListener", entry.getKey());
                }
                entry.getValue().lostSegments(mutableFrom);
            }
        }
    }

    private AdvancedCache<K, V> getCacheRespectingLoader(boolean z) {
        return (!this.hasLoader || z) ? this.cache : this.cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD);
    }

    private CacheSet<Original> toOriginalSet(boolean z, boolean z2) {
        return z2 ? getCacheRespectingLoader(z).cacheEntrySet() : getCacheRespectingLoader(z).keySet();
    }

    private Stream<Original> getStream(CacheSet<Original> cacheSet, boolean z, boolean z2, IntSet intSet, Set<K> set, Set<K> set2) {
        CacheStream<Original> filterKeySegments = (z ? cacheSet.parallelStream() : cacheSet.stream()).filterKeys((Set<?>) set).filterKeySegments(intSet);
        return !set2.isEmpty() ? z2 ? filterKeySegments.filter(obj -> {
            return !set2.contains(((CacheEntry) obj).getKey());
        }) : filterKeySegments.filter(obj2 -> {
            return !set2.contains(obj2);
        }) : filterKeySegments;
    }

    private Stream<Original> getRehashStream(CacheSet<Original> cacheSet, Object obj, LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener, boolean z, boolean z2, IntSet intSet, Set<K> set, Set<K> set2) {
        if (segmentListener != null) {
            handleSuspectSegmentsBeforeStream(obj, segmentListener, intSet);
        }
        return getStream(cacheSet, z, z2, intSet, set, set2);
    }

    private void handleSuspectSegmentsBeforeStream(Object obj, LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener, IntSet intSet) {
        LocalizedCacheTopology cacheTopology = this.dm.getCacheTopology();
        if (trace) {
            log.tracef("Topology for supplier is %s for id %s", cacheTopology, obj);
        }
        ConsistentHash currentCH = cacheTopology.getCurrentCH();
        ConsistentHash pendingCH = cacheTopology.getPendingCH();
        if (pendingCH == null) {
            IntSet localReadSegments = cacheTopology.getLocalReadSegments();
            if (intSet.retainAll(localReadSegments)) {
                if (trace) {
                    log.tracef("We found to be missing some segments requested for id %s", obj);
                }
                segmentListener.localSegments(localReadSegments);
                return;
            } else {
                if (trace) {
                    log.tracef("Hash %s for id %s", currentCH, obj);
                    return;
                }
                return;
            }
        }
        IntSet mutableEmptySet = IntSets.mutableEmptySet(cacheTopology.getCurrentCH().getNumSegments());
        PrimitiveIterator.OfInt it = intSet.iterator();
        while (it.hasNext()) {
            int nextInt = it.nextInt();
            if (!pendingCH.isSegmentLocalToNode(this.localAddress, nextInt) || !currentCH.isSegmentLocalToNode(this.localAddress, nextInt)) {
                it.remove();
                mutableEmptySet.set(nextInt);
            }
        }
        if (mutableEmptySet.isEmpty()) {
            if (trace) {
                log.tracef("Currently in the middle of a rehash for id %s", obj);
            }
        } else {
            if (trace) {
                log.tracef("Lost segments %s during rehash for id %s", mutableEmptySet, obj);
            }
            segmentListener.lostSegments(mutableEmptySet);
        }
    }

    private void handleResponseError(CompletionStage<ValidResponse> completionStage, Object obj, Address address) {
        if (trace) {
            completionStage.whenComplete((validResponse, th) -> {
                if (th != null) {
                    log.tracef(th, "Encountered exception for %s sending response to %s", obj, address);
                } else if (validResponse == null || validResponse.isSuccessful()) {
                    log.tracef("Response successfully sent for %s", obj);
                } else {
                    log.tracef("Unsuccessful response for %s sending response to %s", obj, address);
                }
            });
        }
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R> void streamOperation(Object obj, Address address, boolean z, IntSet intSet, Set<K> set, Set<K> set2, boolean z2, boolean z3, TerminalOperation<Original, R> terminalOperation) {
        if (trace) {
            log.tracef("Received operation request for id %s from %s for segments %s", obj, address, intSet);
        }
        CacheSet<Original> originalSet = toOriginalSet(z2, z3);
        terminalOperation.setSupplier(() -> {
            return getStream(originalSet, z, z3, intSet, set, set2);
        });
        terminalOperation.handleInjection(this.registry);
        handleResponseError(this.rpc.invokeCommand(address, this.factory.buildStreamResponseCommand(obj, true, IntSets.immutableEmptySet(), terminalOperation.performOperation()), SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()), obj, address);
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R> void streamOperationRehashAware(Object obj, Address address, boolean z, IntSet intSet, Set<K> set, Set<K> set2, boolean z2, boolean z3, TerminalOperation<Original, R> terminalOperation) {
        LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener;
        if (trace) {
            log.tracef("Received rehash aware operation request for id %s from %s for segments %s", obj, address, intSet);
        }
        CacheSet<Original> originalSet = toOriginalSet(z2, z3);
        terminalOperation.handleInjection(this.registry);
        if (this.isReplicated) {
            segmentListener = null;
        } else {
            segmentListener = new SegmentListener(intSet, terminalOperation);
            this.changeListener.put(obj, segmentListener);
            if (trace) {
                log.tracef("Registered change listener for %s", obj);
            }
        }
        try {
            LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener2 = segmentListener;
            terminalOperation.setSupplier(() -> {
                return getRehashStream(originalSet, obj, segmentListener2, z, z3, intSet, set, set2);
            });
            R performOperation = terminalOperation.performOperation();
            IntSet immutableEmptySet = segmentListener != null ? ((SegmentListener) segmentListener).segmentsLost : IntSets.immutableEmptySet();
            if (trace) {
                log.tracef("Request %s completed for segments %s with %s suspected segments", obj, intSet, immutableEmptySet);
            }
            sendRehashAwareResponse(obj, address, immutableEmptySet, intSet, performOperation);
        } finally {
            if (segmentListener != null) {
                this.changeListener.remove(obj);
                if (trace) {
                    log.tracef("UnRegistered change listener for %s", obj);
                }
            }
        }
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R> void streamOperation(Object obj, Address address, boolean z, IntSet intSet, Set<K> set, Set<K> set2, boolean z2, boolean z3, KeyTrackingTerminalOperation<Original, K, R> keyTrackingTerminalOperation) {
        if (trace) {
            log.tracef("Received key aware operation request for id %s from %s for segments %s", obj, address, intSet);
        }
        CacheSet<Original> originalSet = toOriginalSet(z2, z3);
        keyTrackingTerminalOperation.setSupplier(() -> {
            return getStream(originalSet, z, z3, intSet, set, set2);
        });
        keyTrackingTerminalOperation.handleInjection(this.registry);
        handleResponseError(this.rpc.invokeCommand(address, this.factory.buildStreamResponseCommand(obj, true, IntSets.immutableEmptySet(), keyTrackingTerminalOperation.performOperation2(new NonRehashIntermediateCollector(address, obj, z))), SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()), obj, address);
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public void streamOperationRehashAware(Object obj, Address address, boolean z, IntSet intSet, Set<K> set, Set<K> set2, boolean z2, boolean z3, KeyTrackingTerminalOperation<Original, K, ?> keyTrackingTerminalOperation) {
        LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener;
        if (trace) {
            log.tracef("Received key rehash aware operation request for id %s from %s for segments %s", obj, address, intSet);
        }
        CacheSet<Original> originalSet = toOriginalSet(z2, z3);
        keyTrackingTerminalOperation.handleInjection(this.registry);
        if (this.isReplicated) {
            segmentListener = null;
        } else {
            segmentListener = new SegmentListener(intSet, keyTrackingTerminalOperation);
            this.changeListener.put(obj, segmentListener);
            if (trace) {
                log.tracef("Registered change listener for %s", obj);
            }
        }
        try {
            LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener2 = segmentListener;
            keyTrackingTerminalOperation.setSupplier(() -> {
                return getRehashStream(originalSet, obj, segmentListener2, z, z3, intSet, set, set2);
            });
            Collection<K> performForEachOperation = keyTrackingTerminalOperation.performForEachOperation(new NonRehashIntermediateCollector(address, obj, z));
            IntSet immutableEmptySet = segmentListener != null ? ((SegmentListener) segmentListener).segmentsLost : IntSets.immutableEmptySet();
            if (trace) {
                log.tracef("Request %s completed segments %s with %s suspected segments", obj, intSet, immutableEmptySet);
            }
            sendRehashAwareResponse(obj, address, immutableEmptySet, intSet, performForEachOperation);
        } finally {
            if (segmentListener != null) {
                this.changeListener.remove(obj);
                if (trace) {
                    log.tracef("UnRegistered change listener for %s", obj);
                }
            }
        }
    }

    private <R> void sendRehashAwareResponse(Object obj, Address address, IntSet intSet, IntSet intSet2, R r) {
        if (this.cache.getStatus() != ComponentStatus.RUNNING && this.cache.getStatus() != ComponentStatus.INITIALIZING) {
            if (intSet.isEmpty()) {
                intSet = intSet2;
            } else {
                intSet.addAll(intSet2);
            }
            if (trace) {
                log.tracef("Cache status is no longer running, all segments are now suspect for %s", obj);
            }
            r = null;
        }
        handleResponseError(this.rpc.invokeCommand(address, this.factory.buildStreamResponseCommand(obj, true, intSet, r), SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()), obj, address);
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public IteratorResponse startIterator(Object obj, Address address, IntSet intSet, Set<K> set, Set<K> set2, boolean z, boolean z2, Iterable<IntermediateOperation> iterable, long j) {
        if (trace) {
            log.tracef("Received rehash aware operation request to start iterator for id %s from %s for segments %s", obj, address, intSet);
        }
        CacheSet<Original> originalSet = toOriginalSet(z, z2);
        LocalStreamManagerImpl<Original, K, V>.SegmentListener segmentListener = new SegmentListener(intSet, z3 -> {
            return true;
        });
        Runnable runnable = () -> {
            this.changeListener.remove(obj);
            if (this.cache.getStatus() == ComponentStatus.RUNNING || this.cache.getStatus() == ComponentStatus.INITIALIZING) {
                return;
            }
            if (trace) {
                log.tracef("Cache status is no longer running after completing iterator, all segments are now suspect for %s", obj);
            }
            segmentListener.segmentsLost.addAll(intSet);
        };
        if (this.changeListener.putIfAbsent(obj, segmentListener) != null) {
            throw new IllegalStateException("Iterator was already created for id " + obj);
        }
        if (trace) {
            log.tracef("Registered change listener for %s", obj);
        }
        IteratorHandler.OnCloseIterator start = this.iteratorHandler.start(address, () -> {
            return getRehashStream(originalSet, obj, this.isReplicated ? null : segmentListener, false, z2, intSet, set, set2);
        }, iterable, obj);
        start.onClose(runnable);
        return new IteratorResponses.RemoteResponse(start, ((SegmentListener) segmentListener).segmentsLost, j);
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public IteratorResponse continueIterator(Object obj, long j) {
        return new IteratorResponses.RemoteResponse(this.iteratorHandler.getIterator(obj), ((SegmentListener) this.changeListener.get(obj)).segmentsLost, j);
    }
}
