package org.infinispan.stream.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.ReplicatedConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.StreamRequestCommand;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.4.0.ER6-redhat-1.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl.class */
public class ClusterStreamManagerImpl<K> implements ClusterStreamManager<K> {
    protected final Map<String, RequestTracker> currentlyRunning = new ConcurrentHashMap();
    protected final AtomicInteger requestId = new AtomicInteger();
    protected RpcManager rpc;
    protected CommandsFactory factory;
    protected Address localAddress;
    protected static final Log log = LogFactory.getLog(ClusterStreamManagerImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.4.0.ER6-redhat-1.jar:org/infinispan/stream/impl/ClusterStreamManagerImpl$RequestTracker.class */
    public static class RequestTracker<R> {
        final ClusterStreamManager.ResultsCallback<R> callback;
        final Map<Address, Set<Integer>> awaitingResponse;
        final Lock completionLock = new ReentrantLock();
        final Condition completionCondition = this.completionLock.newCondition();
        final Predicate<? super R> earlyTerminatePredicate;
        Set<Integer> missingSegments;
        volatile Throwable throwable;

        RequestTracker(ClusterStreamManager.ResultsCallback<R> resultsCallback, Map<Address, Set<Integer>> map, Predicate<? super R> predicate) {
            this.callback = resultsCallback;
            this.awaitingResponse = map;
            this.earlyTerminatePredicate = predicate;
        }

        public void intermediateResults(Address address, R r) {
            this.callback.onIntermediateResult(address, r);
        }

        public boolean lastResult(Address address, R r) {
            boolean isEmpty;
            Set<Integer> set = this.awaitingResponse.get(address);
            if (this.missingSegments != null) {
                set.removeAll(this.missingSegments);
            }
            this.callback.onCompletion(address, set, r);
            synchronized (this) {
                if (this.earlyTerminatePredicate == null || !this.earlyTerminatePredicate.test(r)) {
                    this.awaitingResponse.remove(address);
                } else {
                    this.awaitingResponse.clear();
                }
                isEmpty = this.awaitingResponse.isEmpty();
            }
            return isEmpty;
        }

        public void missingSegments(Set<Integer> set) {
            synchronized (this) {
                if (this.missingSegments == null) {
                    this.missingSegments = set;
                } else {
                    this.missingSegments.addAll(set);
                }
            }
            this.callback.onSegmentsLost(set);
        }
    }

    @Inject
    public void inject(RpcManager rpcManager, CommandsFactory commandsFactory) {
        this.rpc = rpcManager;
        this.factory = commandsFactory;
    }

    @Start
    public void start() {
        this.localAddress = this.rpc.getAddress();
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R> Object remoteStreamOperation(boolean z, boolean z2, ConsistentHash consistentHash, Set<Integer> set, Set<K> set2, Map<Integer, Set<K>> map, boolean z3, TerminalOperation<R> terminalOperation, ClusterStreamManager.ResultsCallback<R> resultsCallback, Predicate<? super R> predicate) {
        return commonRemoteStreamOperation(z, z2, consistentHash, set, set2, map, z3, terminalOperation, resultsCallback, StreamRequestCommand.Type.TERMINAL, predicate);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R> Object remoteStreamOperationRehashAware(boolean z, boolean z2, ConsistentHash consistentHash, Set<Integer> set, Set<K> set2, Map<Integer, Set<K>> map, boolean z3, TerminalOperation<R> terminalOperation, ClusterStreamManager.ResultsCallback<R> resultsCallback, Predicate<? super R> predicate) {
        return commonRemoteStreamOperation(z, z2, consistentHash, set, set2, map, z3, terminalOperation, resultsCallback, StreamRequestCommand.Type.TERMINAL_REHASH, predicate);
    }

    private <R> Object commonRemoteStreamOperation(boolean z, boolean z2, ConsistentHash consistentHash, Set<Integer> set, Set<K> set2, Map<Integer, Set<K>> map, boolean z3, SegmentAwareOperation segmentAwareOperation, ClusterStreamManager.ResultsCallback<R> resultsCallback, StreamRequestCommand.Type type, Predicate<? super R> predicate) {
        String str;
        Map<Address, Set<Integer>> determineTargets = determineTargets(consistentHash, set);
        if (determineTargets.isEmpty()) {
            log.tracef("Not performing remote operation for request as no valid targets found", new Object[0]);
            str = null;
        } else {
            str = this.localAddress.toString() + this.requestId.getAndIncrement();
            log.tracef("Performing remote operations %s for id %s", determineTargets, str);
            this.currentlyRunning.put(str, new RequestTracker(resultsCallback, determineTargets, predicate));
            if (z) {
                submitAsyncTasks(str, determineTargets, map, z2, set2, z3, type, segmentAwareOperation);
            } else {
                for (Map.Entry<Address, Set<Integer>> entry : determineTargets.entrySet()) {
                    Set<Integer> value = entry.getValue();
                    this.rpc.invokeRemotely(Collections.singleton(entry.getKey()), this.factory.buildStreamRequestCommand(str, z2, type, value, set2, determineExcludedKeys(map, value), z3, segmentAwareOperation), this.rpc.getDefaultRpcOptions(true));
                }
            }
        }
        return str;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R> Object remoteStreamOperation(boolean z, boolean z2, ConsistentHash consistentHash, Set<Integer> set, Set<K> set2, Map<Integer, Set<K>> map, boolean z3, KeyTrackingTerminalOperation<K, R, ?> keyTrackingTerminalOperation, ClusterStreamManager.ResultsCallback<Collection<R>> resultsCallback) {
        return commonRemoteStreamOperation(z, z2, consistentHash, set, set2, map, z3, keyTrackingTerminalOperation, resultsCallback, StreamRequestCommand.Type.TERMINAL_KEY, null);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R2> Object remoteStreamOperationRehashAware(boolean z, boolean z2, ConsistentHash consistentHash, Set<Integer> set, Set<K> set2, Map<Integer, Set<K>> map, boolean z3, KeyTrackingTerminalOperation<K, ?, R2> keyTrackingTerminalOperation, ClusterStreamManager.ResultsCallback<Map<K, R2>> resultsCallback) {
        String str;
        Map<Address, Set<Integer>> determineTargets = determineTargets(consistentHash, set);
        if (determineTargets.isEmpty()) {
            log.tracef("Not performing remote rehash key aware operation for request as no valid targets found", new Object[0]);
            str = null;
        } else {
            str = this.localAddress.toString() + this.requestId.getAndIncrement();
            log.tracef("Performing remote rehash key aware operations %s for id %s", determineTargets, str);
            this.currentlyRunning.put(str, new RequestTracker(resultsCallback, determineTargets, null));
            if (z) {
                submitAsyncTasks(str, determineTargets, map, z2, set2, z3, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, keyTrackingTerminalOperation);
            } else {
                for (Map.Entry<Address, Set<Integer>> entry : determineTargets.entrySet()) {
                    Address key = entry.getKey();
                    Set<Integer> value = entry.getValue();
                    try {
                        Set<K> determineExcludedKeys = determineExcludedKeys(map, value);
                        log.tracef("Submitting task to %s for %s excluding keys %s", key, str, determineExcludedKeys);
                        if (!this.rpc.invokeRemotely(Collections.singleton(key), this.factory.buildStreamRequestCommand(str, z2, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, value, set2, determineExcludedKeys, z3, keyTrackingTerminalOperation), this.rpc.getDefaultRpcOptions(true)).values().iterator().next().isSuccessful()) {
                            log.tracef("Unsuccessful response for %s from %s - making segments %s suspect", str, key, value);
                            receiveResponse(str, key, true, value, null);
                        }
                    } catch (Exception e) {
                        if (!containedSuspectException(e)) {
                            log.tracef(e, "Encounted exception for %s from %s", str, key);
                            throw e;
                        }
                        log.tracef("Exception from %s contained a SuspectException, making all segments %s suspect", key, value);
                        receiveResponse(str, key, true, value, null);
                    }
                }
            }
        }
        return str;
    }

    private void submitAsyncTasks(String str, Map<Address, Set<Integer>> map, Map<Integer, Set<K>> map2, boolean z, Set<K> set, boolean z2, StreamRequestCommand.Type type, Object obj) {
        for (Map.Entry<Address, Set<Integer>> entry : map.entrySet()) {
            Set<Integer> value = entry.getValue();
            Set<K> determineExcludedKeys = determineExcludedKeys(map2, value);
            Address key = entry.getKey();
            log.tracef("Submitting async task to %s for %s excluding keys %s", key, str, determineExcludedKeys);
            this.rpc.invokeRemotelyAsync(Collections.singleton(key), this.factory.buildStreamRequestCommand(str, z, type, value, set, determineExcludedKeys, z2, obj), this.rpc.getDefaultRpcOptions(true)).whenComplete((map3, th) -> {
                if (map3 != null) {
                    if (((Response) map3.values().iterator().next()).isSuccessful()) {
                        return;
                    }
                    log.tracef("Unsuccessful response for %s from %s - making segments suspect", str, entry.getKey());
                    receiveResponse(str, (Address) entry.getKey(), true, (Set) entry.getValue(), null);
                    return;
                }
                if (th != null) {
                    if (containedSuspectException(th)) {
                        log.tracef("Exception contained a SuspectException, making all segments %s suspect", entry.getValue());
                        receiveResponse(str, (Address) entry.getKey(), true, (Set) entry.getValue(), null);
                        return;
                    }
                    log.tracef(th, "Encounted exception for %s from %s", str, entry.getKey());
                    RequestTracker requestTracker = this.currentlyRunning.get(str);
                    if (requestTracker != null) {
                        markTrackerWithException(requestTracker, key, th, str);
                    } else {
                        log.warnf("Unhandled remote stream exception encountered", th);
                    }
                }
            });
        }
    }

    private boolean containedSuspectException(Throwable th) {
        Throwable th2 = th;
        boolean z = false;
        while (true) {
            if (th2 instanceof SuspectException) {
                z = true;
                break;
            }
            Throwable cause = th2.getCause();
            th2 = cause;
            if (cause == null) {
                break;
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void markTrackerWithException(RequestTracker<?> requestTracker, Address address, Throwable th, Object obj) {
        log.tracef("Marking tracker to have exception", new Object[0]);
        requestTracker.throwable = th;
        if (address == null || requestTracker.lastResult(address, null)) {
            if (obj != null) {
                log.tracef("Tracker %s completed with exception, waking sleepers!", obj);
            } else {
                log.trace("Tracker completed due to outside cause, waking sleepers! ");
            }
            requestTracker.completionLock.lock();
            try {
                requestTracker.completionCondition.signalAll();
                requestTracker.completionLock.unlock();
            } catch (Throwable th2) {
                requestTracker.completionLock.unlock();
                throw th2;
            }
        }
    }

    private Set<K> determineExcludedKeys(Map<Integer, Set<K>> map, Set<Integer> set) {
        return map.isEmpty() ? Collections.emptySet() : (Set) set.stream().flatMap(num -> {
            Set set2 = (Set) map.get(num);
            if (set2 != null) {
                return set2.stream();
            }
            return null;
        }).collect(Collectors.toSet());
    }

    private Map<Address, Set<Integer>> determineTargets(ConsistentHash consistentHash, Set<Integer> set) {
        if (set == null) {
            set = new ReplicatedConsistentHash.RangeSet(consistentHash.getNumSegments());
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (Integer num : set) {
            Address locatePrimaryOwnerForSegment = consistentHash.locatePrimaryOwnerForSegment(num.intValue());
            if (!locatePrimaryOwnerForSegment.equals(this.localAddress)) {
                Set set2 = (Set) concurrentHashMap.get(locatePrimaryOwnerForSegment);
                if (set2 == null) {
                    set2 = new HashSet();
                    concurrentHashMap.put(locatePrimaryOwnerForSegment, set2);
                }
                set2.add(num);
            }
        }
        return concurrentHashMap;
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public boolean isComplete(Object obj) {
        return !this.currentlyRunning.containsKey(obj);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public boolean awaitCompletion(Object obj, long j, TimeUnit timeUnit) throws InterruptedException {
        if (j <= 0) {
            throw new IllegalArgumentException("Time must be greater than 0");
        }
        if (obj == null) {
            Objects.requireNonNull(obj, "Identifier must be non null");
        }
        log.tracef("Awaiting completion of %s", obj);
        boolean z = false;
        long nanoTime = System.nanoTime() + timeUnit.toNanos(j);
        Throwable th = null;
        while (true) {
            if (nanoTime - System.nanoTime() <= 0) {
                break;
            }
            RequestTracker requestTracker = this.currentlyRunning.get(obj);
            if (requestTracker == null) {
                z = true;
                break;
            }
            Throwable th2 = requestTracker.throwable;
            th = th2;
            if (th2 != null) {
                break;
            }
            requestTracker.completionLock.lock();
            try {
                if (!this.currentlyRunning.containsKey(obj)) {
                    z = true;
                    th = requestTracker.throwable;
                    requestTracker.completionLock.unlock();
                    break;
                }
                if (!requestTracker.completionCondition.await(nanoTime - System.nanoTime(), TimeUnit.NANOSECONDS)) {
                    th = requestTracker.throwable;
                    z = false;
                    requestTracker.completionLock.unlock();
                    break;
                }
                requestTracker.completionLock.unlock();
            } catch (Throwable th3) {
                requestTracker.completionLock.unlock();
                throw th3;
            }
        }
        log.tracef("Returning back to caller due to %s being completed: %s", obj, Boolean.valueOf(z));
        if (th == null) {
            return z;
        }
        if (th instanceof RuntimeException) {
            throw ((RuntimeException) th);
        }
        throw new CacheException(th);
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public void forgetOperation(Object obj) {
        RequestTracker remove;
        if (obj == null || (remove = this.currentlyRunning.remove(obj)) == null) {
            return;
        }
        remove.completionLock.lock();
        try {
            remove.completionCondition.signalAll();
        } finally {
            remove.completionLock.unlock();
        }
    }

    @Override // org.infinispan.stream.impl.ClusterStreamManager
    public <R1> boolean receiveResponse(Object obj, Address address, boolean z, Set<Integer> set, R1 r1) {
        log.tracef("Received response from %s with a completed response %s for id %s with %s suspected segments.", address, Boolean.valueOf(z), obj, set);
        RequestTracker requestTracker = this.currentlyRunning.get(obj);
        if (requestTracker == null) {
            log.tracef("Ignoring response as we already received a completed response for %s from %s", obj, address);
            return false;
        }
        boolean z2 = false;
        synchronized (requestTracker) {
            if (requestTracker.awaitingResponse.containsKey(address)) {
                if (!set.isEmpty()) {
                    requestTracker.missingSegments(set);
                }
                if (z) {
                    z2 = requestTracker.lastResult(address, r1);
                } else {
                    requestTracker.intermediateResults(address, r1);
                }
            }
        }
        if (z2) {
            log.tracef("Marking %s as completed!", obj);
            requestTracker.completionLock.lock();
            try {
                this.currentlyRunning.remove(obj);
                requestTracker.completionCondition.signalAll();
                requestTracker.completionLock.unlock();
            } catch (Throwable th) {
                requestTracker.completionLock.unlock();
                throw th;
            }
        }
        return !z2;
    }
}
