package org.infinispan.stream.impl;

import io.reactivex.internal.subscriptions.EmptySubscription;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.StreamRequestCommand;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* JADX WARN: Classes with same name are omitted:
  input_file:m2repo/org/infinispan/infinispan-core/9.4.15.Final/infinispan-core-9.4.15.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl.class
 */
/* loaded from: input_file:m2repo/org/infinispan/infinispan-core/9.4.3.Final/infinispan-core-9.4.3.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl.class */
public class ClusterStreamManagerImpl<Original, K> implements ClusterStreamManager<Original, K> {
    protected final Map<String, RequestTracker> currentlyRunning = new ConcurrentHashMap();
    protected final Set<Subscriber> iteratorsRunning = new ConcurrentHashSet();
    protected final AtomicInteger requestId = new AtomicInteger();

    @Inject
    protected RpcManager rpc;

    @Inject
    protected CommandsFactory factory;
    protected RpcOptions rpcOptions;
    protected Address localAddress;
    protected static final Log log = LogFactory.getLog(ClusterStreamManagerImpl.class);
    protected static final boolean trace = log.isTraceEnabled();

    /* JADX WARN: Classes with same name are omitted:
      input_file:m2repo/org/infinispan/infinispan-core/9.4.15.Final/infinispan-core-9.4.15.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$ClusterStreamSubscription.class
     */
    /* loaded from: input_file:m2repo/org/infinispan/infinispan-core/9.4.3.Final/infinispan-core-9.4.3.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$ClusterStreamSubscription.class */
    private class ClusterStreamSubscription<V> implements Subscription {
        private final Subscriber<? super V> s;
        private final ClusterStreamManagerImpl<Original, K>.RemoteIteratorPublisherImpl<V> publisher;
        private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsComplete;
        private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsLost;
        private final String id;
        private final AtomicLong requestedAmount = new AtomicLong();
        private final AtomicBoolean pendingRequest = new AtomicBoolean();
        private volatile AtomicReference<Map.Entry<Address, IntSet>> currentTarget;
        private volatile boolean alreadyCreated;

        ClusterStreamSubscription(Subscriber<? super V> subscriber, ClusterStreamManagerImpl<Original, K>.RemoteIteratorPublisherImpl<V> remoteIteratorPublisherImpl, Consumer<? super Supplier<PrimitiveIterator.OfInt>> consumer, Consumer<? super Supplier<PrimitiveIterator.OfInt>> consumer2, String str, Map.Entry<Address, IntSet> entry) {
            this.s = subscriber;
            this.publisher = remoteIteratorPublisherImpl;
            this.onSegmentsComplete = consumer;
            this.onSegmentsLost = consumer2;
            this.id = str;
            this.currentTarget = new AtomicReference<>(entry);
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (this.currentTarget == null) {
                return;
            }
            if (j <= 0) {
                throw new IllegalArgumentException("request amount must be greater than 0");
            }
            this.requestedAmount.addAndGet(j);
            if (this.pendingRequest.getAndSet(true)) {
                return;
            }
            sendRequest(this.requestedAmount.get());
        }

        StreamIteratorNextCommand getCommand(IntSet intSet, long j) {
            if (this.alreadyCreated) {
                return ClusterStreamManagerImpl.this.factory.buildStreamIteratorNextCommand(this.id, j);
            }
            this.alreadyCreated = true;
            return ClusterStreamManagerImpl.this.factory.buildStreamIteratorRequestCommand(this.id, ((RemoteIteratorPublisherImpl) this.publisher).parallelStream, intSet, ((RemoteIteratorPublisherImpl) this.publisher).keysToInclude, ClusterStreamManagerImpl.this.determineExcludedKeys(((RemoteIteratorPublisherImpl) this.publisher).keysToExclude, intSet), ((RemoteIteratorPublisherImpl) this.publisher).includeLoader, ((RemoteIteratorPublisherImpl) this.publisher).entryStream, ((RemoteIteratorPublisherImpl) this.publisher).intermediateOperations, j);
        }

        private void sendRequest(long j) {
            Map.Entry<Address, IntSet> entry = this.currentTarget.get();
            if (entry != null) {
                IntSet value = entry.getValue();
                if (ClusterStreamManagerImpl.trace) {
                    ClusterStreamManagerImpl.log.tracef("Request: %s is requesting %d more entries from %s in segments %s", this.id, Long.valueOf(j), entry, value);
                }
                Address key = entry.getKey();
                StreamIteratorNextCommand command = getCommand(value, j);
                command.setTopologyId(ClusterStreamManagerImpl.this.rpc.getTopologyId());
                ClusterStreamManagerImpl.this.rpc.invokeCommand(key, command, SingleResponseCollector.validOnly(), ClusterStreamManagerImpl.this.rpcOptions).whenComplete((validResponse, th) -> {
                    if (th != null) {
                        handleThrowable(th, entry);
                        return;
                    }
                    try {
                        if (validResponse instanceof SuccessfulResponse) {
                            IteratorResponse iteratorResponse = (IteratorResponse) validResponse.getResponseValue();
                            if (ClusterStreamManagerImpl.trace) {
                                ClusterStreamManagerImpl.log.tracef("Received valid response %s for id %s from node %s", iteratorResponse, this.id, entry.getKey());
                            }
                            Spliterator<V> spliterator = iteratorResponse.getSpliterator();
                            long exactSizeIfKnown = spliterator.getExactSizeIfKnown();
                            if (ClusterStreamManagerImpl.trace) {
                                ClusterStreamManagerImpl.log.tracef("Received %d entries for id %s from %s", exactSizeIfKnown, this.id, key);
                            }
                            Subscriber<? super V> subscriber = this.s;
                            subscriber.getClass();
                            spliterator.forEachRemaining(subscriber::onNext);
                            if (iteratorResponse.isComplete()) {
                                IntSet suspectedSegments = iteratorResponse.getSuspectedSegments();
                                if (suspectedSegments.isEmpty()) {
                                    Consumer<? super Supplier<PrimitiveIterator.OfInt>> consumer = this.onSegmentsComplete;
                                    value.getClass();
                                    consumer.accept(value::iterator);
                                } else {
                                    Consumer<? super Supplier<PrimitiveIterator.OfInt>> consumer2 = this.onSegmentsLost;
                                    suspectedSegments.getClass();
                                    consumer2.accept(suspectedSegments::iterator);
                                    if (suspectedSegments.size() != value.size()) {
                                        this.onSegmentsComplete.accept(() -> {
                                            return value.intStream().filter(i -> {
                                                return !suspectedSegments.contains(i);
                                            }).iterator();
                                        });
                                    }
                                }
                                Map.Entry<Address, IntSet> entry2 = (Map.Entry) ((RemoteIteratorPublisherImpl) this.publisher).targets.get();
                                if (entry2 == null) {
                                    this.currentTarget.set(null);
                                    completed();
                                    return;
                                } else {
                                    this.alreadyCreated = false;
                                    this.currentTarget.compareAndSet(entry, entry2);
                                }
                            }
                            long addAndGet = this.requestedAmount.addAndGet(-exactSizeIfKnown);
                            if (addAndGet > 0) {
                                sendRequest(addAndGet);
                            } else {
                                this.pendingRequest.set(false);
                                long j2 = this.requestedAmount.get();
                                if (j2 > 0 && !this.pendingRequest.getAndSet(true)) {
                                    sendRequest(j2);
                                }
                            }
                        } else {
                            handleThrowable(new IllegalArgumentException("Unsupported response received: " + validResponse), entry);
                        }
                    } catch (Throwable th) {
                        cancel();
                        this.s.onError(th);
                    }
                });
            }
        }

        private void handleThrowable(Throwable th, Map.Entry<Address, IntSet> entry) {
            cancel();
            if (!(th instanceof SuspectException) && !(th.getCause() instanceof SuspectException)) {
                if (ClusterStreamManagerImpl.trace) {
                    ClusterStreamManagerImpl.log.tracef(th, "Received exception for id %s from node %s when requesting segments %s", this.id, entry.getKey(), entry.getValue());
                }
                this.s.onError(th);
            } else {
                if (ClusterStreamManagerImpl.trace) {
                    ClusterStreamManagerImpl.log.tracef("Received suspect exception for id %s from node %s when requesting segments %s", this.id, entry.getKey(), entry.getValue());
                }
                this.onSegmentsLost.accept(() -> {
                    return ((IntSet) entry.getValue()).iterator();
                });
                this.s.onComplete();
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            Map.Entry<Address, IntSet> andSet = this.currentTarget.getAndSet(null);
            if (andSet != null && this.alreadyCreated) {
                Address key = andSet.getKey();
                CompletionStage invokeCommand = ClusterStreamManagerImpl.this.rpc.invokeCommand(key, ClusterStreamManagerImpl.this.factory.buildStreamIteratorCloseCommand(this.id), SingleResponseCollector.validOnly(), ClusterStreamManagerImpl.this.rpcOptions);
                if (ClusterStreamManagerImpl.trace) {
                    invokeCommand.exceptionally(th -> {
                        ClusterStreamManagerImpl.log.tracef(th, "Unable to close iterator on %s for requestId %s", key, ClusterStreamManagerImpl.this.requestId);
                        return null;
                    });
                }
            }
            ClusterStreamManagerImpl.this.iteratorsRunning.remove(this.s);
        }

        private void completed() {
            if (ClusterStreamManagerImpl.trace) {
                ClusterStreamManagerImpl.log.tracef("Processor completed for request: %s", this.id);
            }
            cancel();
            this.s.onComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:m2repo/org/infinispan/infinispan-core/9.4.15.Final/infinispan-core-9.4.15.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$RemoteIteratorPublisherImpl.class
     */
    /* loaded from: input_file:m2repo/org/infinispan/infinispan-core/9.4.3.Final/infinispan-core-9.4.3.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$RemoteIteratorPublisherImpl.class */
    public class RemoteIteratorPublisherImpl<V> implements ClusterStreamManager.RemoteIteratorPublisher<V> {
        private final boolean parallelStream;
        private final Supplier<Map.Entry<Address, IntSet>> targets;
        private final Set<K> keysToInclude;
        private final IntFunction<Set<K>> keysToExclude;
        private final boolean includeLoader;
        private final boolean entryStream;
        private final Iterable<IntermediateOperation> intermediateOperations;

        RemoteIteratorPublisherImpl(boolean z, Supplier<Map.Entry<Address, IntSet>> supplier, Set<K> set, IntFunction<Set<K>> intFunction, boolean z2, boolean z3, Iterable<IntermediateOperation> iterable) {
            this.parallelStream = z;
            this.targets = supplier;
            this.keysToInclude = set;
            this.keysToExclude = intFunction;
            this.includeLoader = z2;
            this.entryStream = z3;
            this.intermediateOperations = iterable;
        }

        @Override // org.infinispan.stream.impl.ClusterStreamManager.RemoteIteratorPublisher
        public void subscribe(Subscriber<? super V> subscriber, Consumer<? super Supplier<PrimitiveIterator.OfInt>> consumer, Consumer<? super Supplier<PrimitiveIterator.OfInt>> consumer2) {
            Map.Entry<Address, IntSet> entry = this.targets.get();
            if (entry == null) {
                EmptySubscription.complete(subscriber);
                return;
            }
            String str = ClusterStreamManagerImpl.this.localAddress.toString() + "-" + ClusterStreamManagerImpl.this.requestId.getAndIncrement();
            if (ClusterStreamManagerImpl.trace) {
                ClusterStreamManagerImpl.log.tracef("Starting request: %s", str);
            }
            ClusterStreamManagerImpl.this.iteratorsRunning.add(subscriber);
            subscriber.onSubscribe(new ClusterStreamSubscription(subscriber, this, consumer, consumer2, str, entry));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:m2repo/org/infinispan/infinispan-core/9.4.15.Final/infinispan-core-9.4.15.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$RequestTracker.class
     */
    /* loaded from: input_file:m2repo/org/infinispan/infinispan-core/9.4.3.Final/infinispan-core-9.4.3.Final.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$RequestTracker.class */
    public static class RequestTracker<R> {
        final ClusterStreamManager.ResultsCallback<R> callback;
        final Map<Address, IntSet> awaitingResponse;
        final Lock completionLock = new ReentrantLock();
        final Condition completionCondition = this.completionLock.newCondition();
        final Predicate<? super R> earlyTerminatePredicate;
        IntSet missingSegments;
        volatile Throwable throwable;

        RequestTracker(ClusterStreamManager.ResultsCallback<R> resultsCallback, Map<Address, IntSet> map, Predicate<? super R> predicate) {
            this.callback = resultsCallback;
            this.awaitingResponse = map;
            this.earlyTerminatePredicate = predicate;
        }

        public void intermediateResults(Address address, R r) {
            this.callback.onIntermediateResult(address, r);
        }

        public boolean lastResult(Address address, R r) {
            boolean isEmpty;
            IntSet intSet = this.awaitingResponse.get(address);
            if (this.missingSegments != null) {
                intSet.removeAll(this.missingSegments);
            }
            this.callback.onCompletion(address, intSet, r);
            synchronized (this) {
                if (this.earlyTerminatePredicate == null || r == null || !this.earlyTerminatePredicate.test(r)) {
                    this.awaitingResponse.remove(address);
                } else {
                    this.awaitingResponse.clear();
                }
                isEmpty = this.awaitingResponse.isEmpty();
            }
            return isEmpty;
        }

        public void missingSegments(IntSet intSet) {
            synchronized (this) {
                if (this.missingSegments == null) {
                    this.missingSegments = IntSets.mutableFrom(intSet);
                } else {
                    this.missingSegments.addAll(intSet);
                }
            }
            this.callback.onSegmentsLost(intSet);
        }
    }

    @Start
    public void start() {
        this.localAddress = this.rpc.getAddress();
        this.rpcOptions = new RpcOptions(DeliverOrder.NONE, Long.MAX_VALUE, TimeUnit.DAYS);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R> Object remoteStreamOperation(boolean z, boolean z2, ConsistentHash consistentHash, IntSet intSet, Set<K> set, Map<Integer, Set<K>> map, boolean z3, boolean z4, TerminalOperation<Original, R> terminalOperation, ClusterStreamManager.ResultsCallback<R> resultsCallback, Predicate<? super R> predicate) {
        return commonRemoteStreamOperation(z, z2, consistentHash, intSet, set, map, z3, z4, terminalOperation, resultsCallback, StreamRequestCommand.Type.TERMINAL, predicate);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R> Object remoteStreamOperationRehashAware(boolean z, boolean z2, ConsistentHash consistentHash, IntSet intSet, Set<K> set, Map<Integer, Set<K>> map, boolean z3, boolean z4, TerminalOperation<Original, R> terminalOperation, ClusterStreamManager.ResultsCallback<R> resultsCallback, Predicate<? super R> predicate) {
        return commonRemoteStreamOperation(z, z2, consistentHash, intSet, set, map, z3, z4, terminalOperation, resultsCallback, StreamRequestCommand.Type.TERMINAL_REHASH, predicate);
    }

    private <R> Object commonRemoteStreamOperation(boolean z, boolean z2, ConsistentHash consistentHash, IntSet intSet, Set<K> set, Map<Integer, Set<K>> map, boolean z3, boolean z4, SegmentAwareOperation segmentAwareOperation, ClusterStreamManager.ResultsCallback<R> resultsCallback, StreamRequestCommand.Type type, Predicate<? super R> predicate) {
        String str;
        Map<Address, IntSet> determineTargets = determineTargets(consistentHash, intSet, resultsCallback);
        if (determineTargets.isEmpty()) {
            log.tracef("Not performing remote operation for request as no valid targets for segments %s found", intSet);
            str = null;
        } else {
            str = this.localAddress.toString() + "#" + this.requestId.getAndIncrement();
            log.tracef("Performing remote operations %s for id %s", determineTargets, str);
            this.currentlyRunning.put(str, new RequestTracker(resultsCallback, determineTargets, predicate));
            if (z) {
                submitAsyncTasks(str, determineTargets, map, z2, set, z3, z4, type, segmentAwareOperation);
            } else {
                for (Map.Entry<Address, IntSet> entry : determineTargets.entrySet()) {
                    IntSet value = entry.getValue();
                    StreamRequestCommand<K> buildStreamRequestCommand = this.factory.buildStreamRequestCommand(str, z2, type, value, set, determineExcludedKeys(map, value), z3, z4, segmentAwareOperation);
                    buildStreamRequestCommand.setTopologyId(this.rpc.getTopologyId());
                    this.rpc.blocking(this.rpc.invokeCommand(entry.getKey(), buildStreamRequestCommand, VoidResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()));
                }
            }
        }
        return str;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R> Object remoteStreamOperation(boolean z, boolean z2, ConsistentHash consistentHash, IntSet intSet, Set<K> set, Map<Integer, Set<K>> map, boolean z3, boolean z4, KeyTrackingTerminalOperation<Original, K, R> keyTrackingTerminalOperation, ClusterStreamManager.ResultsCallback<Collection<R>> resultsCallback) {
        return commonRemoteStreamOperation(z, z2, consistentHash, intSet, set, map, z3, z4, keyTrackingTerminalOperation, resultsCallback, StreamRequestCommand.Type.TERMINAL_KEY, null);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public Object remoteStreamOperationRehashAware(boolean z, boolean z2, ConsistentHash consistentHash, IntSet intSet, Set<K> set, Map<Integer, Set<K>> map, boolean z3, boolean z4, KeyTrackingTerminalOperation<Original, K, ?> keyTrackingTerminalOperation, ClusterStreamManager.ResultsCallback<Collection<K>> resultsCallback) {
        String str;
        Map<Address, IntSet> determineTargets = determineTargets(consistentHash, intSet, resultsCallback);
        if (determineTargets.isEmpty()) {
            log.tracef("Not performing remote rehash key aware operation for request as no valid targets for segments %s found", intSet);
            str = null;
        } else {
            str = this.localAddress.toString() + "-" + this.requestId.getAndIncrement();
            log.tracef("Performing remote rehash key aware operations %s for id %s", determineTargets, str);
            this.currentlyRunning.put(str, new RequestTracker(resultsCallback, determineTargets, null));
            if (z) {
                submitAsyncTasks(str, determineTargets, map, z2, set, z3, z4, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, keyTrackingTerminalOperation);
            } else {
                for (Map.Entry<Address, IntSet> entry : determineTargets.entrySet()) {
                    Address key = entry.getKey();
                    IntSet value = entry.getValue();
                    try {
                        Set<K> determineExcludedKeys = determineExcludedKeys(map, value);
                        log.tracef("Submitting task to %s for %s excluding keys %s", key, str, determineExcludedKeys);
                        StreamRequestCommand<K> buildStreamRequestCommand = this.factory.buildStreamRequestCommand(str, z2, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, value, set, determineExcludedKeys, z3, z4, keyTrackingTerminalOperation);
                        buildStreamRequestCommand.setTopologyId(this.rpc.getTopologyId());
                        if (!((Response) this.rpc.blocking(this.rpc.invokeCommand(key, buildStreamRequestCommand, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()))).isSuccessful()) {
                            log.tracef("Unsuccessful response for %s from %s - making segments %s suspect", str, key, value);
                            receiveResponse(str, key, true, value, null);
                        }
                    } catch (Exception e) {
                        if (!containedSuspectException(e)) {
                            log.tracef(e, "Encountered exception for %s from %s", str, key);
                            throw e;
                        }
                        log.tracef("Exception from %s contained a SuspectException, making all segments %s suspect", key, value);
                        receiveResponse(str, key, true, value, null);
                    }
                }
            }
        }
        return str;
    }

    private void submitAsyncTasks(String str, Map<Address, IntSet> map, Map<Integer, Set<K>> map2, boolean z, Set<K> set, boolean z2, boolean z3, StreamRequestCommand.Type type, Object obj) {
        for (Map.Entry<Address, IntSet> entry : map.entrySet()) {
            IntSet value = entry.getValue();
            Set<K> determineExcludedKeys = determineExcludedKeys(map2, value);
            Address key = entry.getKey();
            log.tracef("Submitting async task to %s for %s excluding keys %s", key, str, determineExcludedKeys);
            StreamRequestCommand<K> buildStreamRequestCommand = this.factory.buildStreamRequestCommand(str, z, type, value, set, determineExcludedKeys, z2, z3, obj);
            buildStreamRequestCommand.setTopologyId(this.rpc.getTopologyId());
            this.rpc.invokeCommand(key, buildStreamRequestCommand, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()).whenComplete((validResponse, th) -> {
                if (th == null) {
                    if (validResponse == null || validResponse.isSuccessful()) {
                        return;
                    }
                    log.tracef("Unsuccessful response for %s from %s - making segments suspect", str, entry.getKey());
                    receiveResponse(str, (Address) entry.getKey(), true, (IntSet) entry.getValue(), null);
                    return;
                }
                if (containedSuspectException(th)) {
                    log.tracef("Exception contained a SuspectException, making all segments %s suspect", entry.getValue());
                    receiveResponse(str, (Address) entry.getKey(), true, (IntSet) entry.getValue(), null);
                    return;
                }
                log.tracef(th, "Encountered exception for %s from %s", str, entry.getKey());
                RequestTracker requestTracker = this.currentlyRunning.get(str);
                if (requestTracker != null) {
                    markTrackerWithException(requestTracker, key, th, str);
                } else {
                    log.warnf("Unhandled remote stream exception encountered", th);
                }
            });
        }
    }

    private boolean containedSuspectException(Throwable th) {
        Throwable th2 = th;
        boolean z = false;
        while (true) {
            if (th2 instanceof SuspectException) {
                z = true;
                break;
            }
            Throwable cause = th2.getCause();
            th2 = cause;
            if (cause == null) {
                break;
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void markTrackerWithException(RequestTracker<?> requestTracker, Address address, Throwable th, Object obj) {
        log.tracef("Marking tracker to have exception", new Object[0]);
        requestTracker.throwable = th;
        if (address == null || requestTracker.lastResult(address, null)) {
            if (obj != null) {
                log.tracef("Tracker %s completed with exception, waking sleepers!", obj);
            } else {
                log.trace("Tracker completed due to outside cause, waking sleepers! ");
            }
            requestTracker.completionLock.lock();
            try {
                requestTracker.completionCondition.signalAll();
                requestTracker.completionLock.unlock();
            } catch (Throwable th2) {
                requestTracker.completionLock.unlock();
                throw th2;
            }
        }
    }

    private Set<K> determineExcludedKeys(Map<Integer, Set<K>> map, IntSet intSet) {
        if (map.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        PrimitiveIterator.OfInt it = intSet.iterator();
        while (it.hasNext()) {
            Set<K> set = map.get(Integer.valueOf(it.nextInt()));
            if (set != null) {
                hashSet.addAll(set);
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<K> determineExcludedKeys(IntFunction<Set<K>> intFunction, IntSet intSet) {
        return intFunction == null ? Collections.emptySet() : (Set) intSet.intStream().mapToObj(i -> {
            Set set = (Set) intFunction.apply(i);
            if (set != null) {
                return set.stream();
            }
            return null;
        }).flatMap(Function.identity()).collect(Collectors.toSet());
    }

    private Map<Address, IntSet> determineTargets(ConsistentHash consistentHash, IntSet intSet, ClusterStreamManager.ResultsCallback<?> resultsCallback) {
        if (intSet == null) {
            intSet = IntSets.immutableRangeSet(consistentHash.getNumSegments());
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        int numSegments = consistentHash.getNumSegments();
        PrimitiveIterator.OfInt it = intSet.iterator();
        while (it.hasNext()) {
            int nextInt = it.nextInt();
            Address locatePrimaryOwnerForSegment = consistentHash.locatePrimaryOwnerForSegment(nextInt);
            if (locatePrimaryOwnerForSegment == null) {
                resultsCallback.onSegmentsLost(IntSets.immutableSet(nextInt));
            } else if (!locatePrimaryOwnerForSegment.equals(this.localAddress)) {
                ((IntSet) concurrentHashMap.computeIfAbsent(locatePrimaryOwnerForSegment, address -> {
                    return IntSets.mutableEmptySet(numSegments);
                })).set(nextInt);
            }
        }
        return concurrentHashMap;
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public boolean isComplete(Object obj) {
        return !this.currentlyRunning.containsKey(obj);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public boolean awaitCompletion(Object obj, long j, TimeUnit timeUnit) throws InterruptedException {
        if (j <= 0) {
            throw new IllegalArgumentException("Time must be greater than 0");
        }
        Objects.requireNonNull(obj, "Identifier must be non null");
        log.tracef("Awaiting completion of %s", obj);
        boolean z = false;
        long nanoTime = System.nanoTime() + timeUnit.toNanos(j);
        Throwable th = null;
        while (true) {
            if (nanoTime - System.nanoTime() <= 0) {
                break;
            }
            RequestTracker requestTracker = this.currentlyRunning.get(obj);
            if (requestTracker == null) {
                z = true;
                break;
            }
            Throwable th2 = requestTracker.throwable;
            th = th2;
            if (th2 != null) {
                break;
            }
            requestTracker.completionLock.lock();
            try {
                if (!this.currentlyRunning.containsKey(obj)) {
                    z = true;
                    th = requestTracker.throwable;
                    requestTracker.completionLock.unlock();
                    break;
                }
                if (!requestTracker.completionCondition.await(nanoTime - System.nanoTime(), TimeUnit.NANOSECONDS)) {
                    th = requestTracker.throwable;
                    z = false;
                    requestTracker.completionLock.unlock();
                    break;
                }
                requestTracker.completionLock.unlock();
            } catch (Throwable th3) {
                requestTracker.completionLock.unlock();
                throw th3;
            }
        }
        log.tracef("Returning back to caller due to %s being completed: %s", obj, Boolean.valueOf(z));
        if (th == null) {
            return z;
        }
        if (th instanceof RuntimeException) {
            throw ((RuntimeException) th);
        }
        throw new CacheException(th);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public void forgetOperation(Object obj) {
        RequestTracker remove;
        if (obj == null || (remove = this.currentlyRunning.remove(obj)) == null) {
            return;
        }
        remove.completionLock.lock();
        try {
            remove.completionCondition.signalAll();
        } finally {
            remove.completionLock.unlock();
        }
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R1> boolean receiveResponse(Object obj, Address address, boolean z, IntSet intSet, R1 r1) {
        log.tracef("Received response from %s with a completed response %s for id %s with %s suspected segments.", address, Boolean.valueOf(z), obj, intSet);
        RequestTracker requestTracker = this.currentlyRunning.get(obj);
        if (requestTracker == null) {
            log.tracef("Ignoring response as we already received a completed response for %s from %s", obj, address);
            return false;
        }
        boolean z2 = false;
        synchronized (requestTracker) {
            if (requestTracker.awaitingResponse.containsKey(address)) {
                if (!intSet.isEmpty()) {
                    requestTracker.missingSegments(intSet);
                }
                if (z) {
                    z2 = requestTracker.lastResult(address, r1);
                } else {
                    requestTracker.intermediateResults(address, r1);
                }
            }
        }
        if (z2) {
            log.tracef("Marking %s as completed!", obj);
            requestTracker.completionLock.lock();
            try {
                this.currentlyRunning.remove(obj);
                requestTracker.completionCondition.signalAll();
                requestTracker.completionLock.unlock();
            } catch (Throwable th) {
                requestTracker.completionLock.unlock();
                throw th;
            }
        }
        return !z2;
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <E> ClusterStreamManager.RemoteIteratorPublisher<E> remoteIterationPublisher(boolean z, Supplier<Map.Entry<Address, IntSet>> supplier, Set<K> set, IntFunction<Set<K>> intFunction, boolean z2, boolean z3, Iterable<IntermediateOperation> iterable) {
        return new RemoteIteratorPublisherImpl(z, supplier, set, intFunction, z2, z3, iterable);
    }
}
