package org.infinispan.stream.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.CacheStream;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener(observation = Listener.Observation.POST)
/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0-SNAPSHOT.jar:org/infinispan/stream/impl/LocalStreamManagerImpl.class */
public class LocalStreamManagerImpl<K, V> implements LocalStreamManager<K> {
    private static final Log log = LogFactory.getLog(LocalStreamManagerImpl.class);
    private AdvancedCache<K, V> cache;
    private ComponentRegistry registry;
    private StateTransferManager stm;
    private RpcManager rpc;
    private CommandsFactory factory;
    private boolean hasLoader;
    private Address localAddress;
    private final ConcurrentMap<UUID, LocalStreamManagerImpl<K, V>.SegmentListener> changeListener = CollectionFactory.makeConcurrentMap();

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0-SNAPSHOT.jar:org/infinispan/stream/impl/LocalStreamManagerImpl$NonRehashIntermediateCollector.class */
    class NonRehashIntermediateCollector<R> implements KeyTrackingTerminalOperation.IntermediateCollector<R> {
        private final Address origin;
        private final UUID requestId;
        private final boolean useManagedBlocker;

        /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0-SNAPSHOT.jar:org/infinispan/stream/impl/LocalStreamManagerImpl$NonRehashIntermediateCollector$ResponseBlocker.class */
        class ResponseBlocker implements ForkJoinPool.ManagedBlocker {
            private final R response;
            private boolean completed = false;

            ResponseBlocker(R r) {
                this.response = r;
            }

            @Override // java.util.concurrent.ForkJoinPool.ManagedBlocker
            public boolean block() throws InterruptedException {
                if (!this.completed) {
                    synchronized (NonRehashIntermediateCollector.this) {
                        LocalStreamManagerImpl.this.rpc.invokeRemotely(Collections.singleton(NonRehashIntermediateCollector.this.origin), new StreamResponseCommand(LocalStreamManagerImpl.this.cache.getName(), LocalStreamManagerImpl.this.localAddress, NonRehashIntermediateCollector.this.requestId, false, this.response), LocalStreamManagerImpl.this.rpc.getDefaultRpcOptions(true));
                    }
                }
                this.completed = true;
                return this.completed;
            }

            @Override // java.util.concurrent.ForkJoinPool.ManagedBlocker
            public boolean isReleasable() {
                return this.completed;
            }
        }

        NonRehashIntermediateCollector(Address address, UUID uuid, boolean z) {
            this.origin = address;
            this.requestId = uuid;
            this.useManagedBlocker = z;
        }

        @Override // org.infinispan.stream.impl.KeyTrackingTerminalOperation.IntermediateCollector
        public void sendDataResonse(R r) {
            if (!this.useManagedBlocker) {
                LocalStreamManagerImpl.this.rpc.invokeRemotely(Collections.singleton(this.origin), new StreamResponseCommand(LocalStreamManagerImpl.this.cache.getName(), LocalStreamManagerImpl.this.localAddress, this.requestId, false, r), LocalStreamManagerImpl.this.rpc.getDefaultRpcOptions(true));
                return;
            }
            try {
                ForkJoinPool.managedBlock(new ResponseBlocker(r));
            } catch (InterruptedException e) {
                throw new CacheException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0-SNAPSHOT.jar:org/infinispan/stream/impl/LocalStreamManagerImpl$SegmentListener.class */
    public class SegmentListener {
        private final Set<Integer> segments;
        private final SegmentAwareOperation op;
        private final Set<Integer> segmentsLost = new HashSet();

        SegmentListener(Set<Integer> set, SegmentAwareOperation segmentAwareOperation) {
            this.segments = new HashSet(set);
            this.op = segmentAwareOperation;
        }

        public void localSegments(Set<Integer> set) {
            this.segments.forEach(num -> {
                if (set.contains(num)) {
                    return;
                }
                LocalStreamManagerImpl.log.tracef("Could not process segment %s", num);
                this.segmentsLost.add(num);
            });
        }

        public void lostSegments(Set<Integer> set) {
            for (Integer num : set) {
                if (this.segments.contains(num)) {
                    LocalStreamManagerImpl.log.tracef("Lost segment %s", num);
                    if (this.op.lostSegment(false) && this.segmentsLost.add(num) && this.segmentsLost.size() == this.segments.size()) {
                        LocalStreamManagerImpl.log.tracef("All segments %s are now lost", this.segments);
                        this.op.lostSegment(true);
                    }
                }
            }
        }
    }

    @Inject
    public void inject(Cache<K, V> cache, ComponentRegistry componentRegistry, StateTransferManager stateTransferManager, RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory) {
        this.cache = cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL);
        this.registry = componentRegistry;
        this.stm = stateTransferManager;
        this.rpc = rpcManager;
        this.factory = commandsFactory;
        this.hasLoader = configuration.persistence().usingStores();
    }

    @Start
    public void start() {
        this.localAddress = this.rpc.getAddress();
        this.cache.addListener(this);
    }

    @DataRehashed
    public void dataRehashed(DataRehashedEvent<K, V> dataRehashedEvent) {
        ConsistentHash consistentHashAtStart = dataRehashedEvent.getConsistentHashAtStart();
        ConsistentHash consistentHashAtEnd = dataRehashedEvent.getConsistentHashAtEnd();
        boolean isTraceEnabled = log.isTraceEnabled();
        if (consistentHashAtStart == null || consistentHashAtEnd == null) {
            return;
        }
        log.tracef("Data rehash occurred startHash: %s and endHash: %s with new topology %s and was pre %s", consistentHashAtStart, consistentHashAtEnd, Integer.valueOf(dataRehashedEvent.getNewTopologyId()), Boolean.valueOf(dataRehashedEvent.isPre()));
        if (this.changeListener.isEmpty()) {
            log.tracef("No change listeners present!", new Object[0]);
            return;
        }
        if (isTraceEnabled) {
            log.tracef("Previous segments %s ", consistentHashAtStart.getSegmentsForOwner(this.localAddress));
            log.tracef("After segments %s ", consistentHashAtEnd.getSegmentsForOwner(this.localAddress));
        }
        HashSet hashSet = new HashSet(consistentHashAtStart.getSegmentsForOwner(this.localAddress));
        hashSet.removeAll(consistentHashAtEnd.getSegmentsForOwner(this.localAddress));
        if (hashSet.isEmpty()) {
            if (isTraceEnabled) {
                log.tracef("No segments have been removed from data rehash, no notification required", new Object[0]);
            }
        } else {
            for (Map.Entry<UUID, LocalStreamManagerImpl<K, V>.SegmentListener> entry : this.changeListener.entrySet()) {
                if (isTraceEnabled) {
                    log.tracef("Notifying %s through SegmentChangeListener", entry.getKey());
                }
                entry.getValue().lostSegments(hashSet);
            }
        }
    }

    private AdvancedCache<K, V> getCacheRespectingLoader(boolean z) {
        return (!this.hasLoader || z) ? this.cache : this.cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD);
    }

    private Stream<CacheEntry<K, V>> getStream(CacheSet<CacheEntry<K, V>> cacheSet, boolean z, Set<Integer> set, Set<K> set2, Set<K> set3) {
        CacheStream<CacheEntry<K, V>> filterKeySegments = (z ? cacheSet.parallelStream() : cacheSet.stream()).filterKeys(set2).filterKeySegments(set);
        return !set3.isEmpty() ? filterKeySegments.filter(cacheEntry -> {
            return !set3.contains(cacheEntry.getKey());
        }) : filterKeySegments;
    }

    private Stream<CacheEntry<K, V>> getRehashStream(CacheSet<CacheEntry<K, V>> cacheSet, UUID uuid, LocalStreamManagerImpl<K, V>.SegmentListener segmentListener, boolean z, Set<Integer> set, Set<K> set2, Set<K> set3) {
        CacheTopology cacheTopology = this.stm.getCacheTopology();
        log.tracef("Topology for supplier is %s for id %s", cacheTopology, uuid);
        ConsistentHash currentCH = cacheTopology.getCurrentCH();
        ConsistentHash pendingCH = cacheTopology.getPendingCH();
        if (pendingCH != null) {
            HashSet hashSet = new HashSet();
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                Integer next = it.next();
                if (!pendingCH.locateOwnersForSegment(next.intValue()).contains(this.localAddress) || !currentCH.locateOwnersForSegment(next.intValue()).contains(this.localAddress)) {
                    it.remove();
                    hashSet.add(next);
                }
            }
            if (hashSet.isEmpty()) {
                log.tracef("Currently in the middle of a rehash for id %s", uuid);
            } else {
                log.tracef("Lost segments %s during rehash for id %s", hashSet, uuid);
                segmentListener.lostSegments(hashSet);
            }
        } else {
            Set<Integer> segmentsForOwner = currentCH.getSegmentsForOwner(this.localAddress);
            if (set.retainAll(segmentsForOwner)) {
                log.tracef("We found to be missing some segments requested for id %s", uuid);
                segmentListener.localSegments(segmentsForOwner);
            } else {
                log.tracef("Hash %s for id %s", currentCH, uuid);
            }
        }
        return getStream(cacheSet, z, set, set2, set3);
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R> void streamOperation(UUID uuid, Address address, boolean z, Set<Integer> set, Set<K> set2, Set<K> set3, boolean z2, TerminalOperation<R> terminalOperation) {
        log.tracef("Received operation request for id %s from %s for segments %s", uuid, address, set);
        CacheSet<CacheEntry<K, V>> cacheEntrySet = getCacheRespectingLoader(z2).cacheEntrySet();
        terminalOperation.setSupplier(() -> {
            return getStream(cacheEntrySet, z, set, set2, set3);
        });
        terminalOperation.handleInjection(this.registry);
        this.rpc.invokeRemotely(Collections.singleton(address), this.factory.buildStreamResponseCommand(uuid, true, Collections.emptySet(), terminalOperation.performOperation()), this.rpc.getDefaultRpcOptions(true));
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R> void streamOperationRehashAware(UUID uuid, Address address, boolean z, Set<Integer> set, Set<K> set2, Set<K> set3, boolean z2, TerminalOperation<R> terminalOperation) {
        log.tracef("Received rehash aware operation request for id %s from %s for segments %s", uuid, address, set);
        CacheSet<CacheEntry<K, V>> cacheEntrySet = getCacheRespectingLoader(z2).cacheEntrySet();
        LocalStreamManagerImpl<K, V>.SegmentListener segmentListener = new SegmentListener(set, terminalOperation);
        terminalOperation.handleInjection(this.registry);
        this.changeListener.put(uuid, segmentListener);
        log.tracef("Registered change listener for %s", uuid);
        try {
            terminalOperation.setSupplier(() -> {
                return getRehashStream(cacheEntrySet, uuid, segmentListener, z, set, set2, set3);
            });
            R performOperation = terminalOperation.performOperation();
            log.tracef("Request %s completed for segments %s with %s suspected segments", uuid, set, ((SegmentListener) segmentListener).segmentsLost);
            this.changeListener.remove(uuid);
            log.tracef("UnRegistered change listener for %s", uuid);
            if (this.cache.getStatus() != ComponentStatus.RUNNING) {
                if (log.isTraceEnabled()) {
                    log.tracef("Cache status is no longer running, all segments are now suspect for %s", uuid);
                }
                ((SegmentListener) segmentListener).segmentsLost.addAll(set);
                performOperation = null;
            }
            log.tracef("Sending response for %s", uuid);
            this.rpc.invokeRemotely(Collections.singleton(address), this.factory.buildStreamResponseCommand(uuid, true, ((SegmentListener) segmentListener).segmentsLost, performOperation), this.rpc.getDefaultRpcOptions(true));
            log.tracef("Sent response for %s", uuid);
        } catch (Throwable th) {
            this.changeListener.remove(uuid);
            log.tracef("UnRegistered change listener for %s", uuid);
            throw th;
        }
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R> void streamOperation(UUID uuid, Address address, boolean z, Set<Integer> set, Set<K> set2, Set<K> set3, boolean z2, KeyTrackingTerminalOperation<K, R, ?> keyTrackingTerminalOperation) {
        log.tracef("Received key aware operation request for id %s from %s for segments %s", uuid, address, set);
        CacheSet<CacheEntry<K, V>> cacheEntrySet = getCacheRespectingLoader(z2).cacheEntrySet();
        keyTrackingTerminalOperation.setSupplier(() -> {
            return getStream(cacheEntrySet, z, set, set2, set3);
        });
        keyTrackingTerminalOperation.handleInjection(this.registry);
        this.rpc.invokeRemotely(Collections.singleton(address), this.factory.buildStreamResponseCommand(uuid, true, Collections.emptySet(), keyTrackingTerminalOperation.performOperation2(new NonRehashIntermediateCollector(address, uuid, z))), this.rpc.getDefaultRpcOptions(true));
    }

    @Override // org.infinispan.stream.impl.LocalStreamManager
    public <R2> void streamOperationRehashAware(UUID uuid, Address address, boolean z, Set<Integer> set, Set<K> set2, Set<K> set3, boolean z2, KeyTrackingTerminalOperation<K, ?, R2> keyTrackingTerminalOperation) {
        log.tracef("Received key rehash aware operation request for id %s from %s for segments %s", uuid, address, set);
        CacheSet<CacheEntry<K, V>> cacheEntrySet = getCacheRespectingLoader(z2).cacheEntrySet();
        LocalStreamManagerImpl<K, V>.SegmentListener segmentListener = new SegmentListener(set, keyTrackingTerminalOperation);
        keyTrackingTerminalOperation.handleInjection(this.registry);
        this.changeListener.put(uuid, segmentListener);
        log.tracef("Registered change listener for %s", uuid);
        try {
            keyTrackingTerminalOperation.setSupplier(() -> {
                return getRehashStream(cacheEntrySet, uuid, segmentListener, z, set, set2, set3);
            });
            Collection<CacheEntry<K, R2>> performOperationRehashAware = keyTrackingTerminalOperation.performOperationRehashAware(new NonRehashIntermediateCollector(address, uuid, z));
            log.tracef("Request %s completed segments %s with %s suspected segments", uuid, set, ((SegmentListener) segmentListener).segmentsLost);
            this.changeListener.remove(uuid);
            log.tracef("UnRegistered change listener for %s", uuid);
            if (this.cache.getStatus() != ComponentStatus.RUNNING) {
                if (log.isTraceEnabled()) {
                    log.tracef("Cache status is no longer running, all segments are now suspect for %s", uuid);
                }
                ((SegmentListener) segmentListener).segmentsLost.addAll(set);
                performOperationRehashAware = null;
            }
            this.rpc.invokeRemotely(Collections.singleton(address), this.factory.buildStreamResponseCommand(uuid, true, ((SegmentListener) segmentListener).segmentsLost, performOperationRehashAware), this.rpc.getDefaultRpcOptions(true));
        } catch (Throwable th) {
            this.changeListener.remove(uuid);
            log.tracef("UnRegistered change listener for %s", uuid);
            throw th;
        }
    }
}
