package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.tx.TransactionImpl;
import org.infinispan.commons.tx.XidImpl;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.conflict.impl.InternalConflictManager;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.TriangleOrderManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.cluster.ClusterListenerReplicateCallable;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.PassthroughSingleResponseCollector;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.RemoteTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:BOOT-INF/lib/infinispan-core-12.1.7.Final.jar:org/infinispan/statetransfer/StateConsumerImpl.class */
public class StateConsumerImpl implements StateConsumer {
    protected static final int NO_STATE_TRANSFER_IN_PROGRESS = -1;
    public static final String NO_KEY = "N/A";

    @Inject
    protected ComponentRef<Cache<Object, Object>> cache;

    @Inject
    protected LocalTopologyManager localTopologyManager;

    @Inject
    protected Configuration configuration;

    @Inject
    protected RpcManager rpcManager;

    @Inject
    protected TransactionManager transactionManager;

    @Inject
    protected CommandsFactory commandsFactory;

    @Inject
    protected TransactionTable transactionTable;

    @Inject
    protected InternalDataContainer<Object, Object> dataContainer;

    @Inject
    protected PersistenceManager persistenceManager;

    @Inject
    protected AsyncInterceptorChain interceptorChain;

    @Inject
    protected InvocationContextFactory icf;

    @Inject
    protected StateTransferLock stateTransferLock;

    @Inject
    protected CacheNotifier<?, ?> cacheNotifier;

    @Inject
    protected CommitManager commitManager;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    protected Executor nonBlockingExecutor;

    @Inject
    protected CommandAckCollector commandAckCollector;

    @Inject
    protected TriangleOrderManager triangleOrderManager;

    @Inject
    protected DistributionManager distributionManager;

    @Inject
    protected KeyPartitioner keyPartitioner;

    @Inject
    protected InternalConflictManager<?, ?> conflictManager;

    @Inject
    protected LocalPublisherManager<Object, Object> localPublisherManager;

    @Inject
    PerCacheInboundInvocationHandler inboundInvocationHandler;

    @Inject
    XSiteStateTransferManager xSiteStateTransferManager;
    protected String cacheName;
    protected long timeout;
    protected boolean isFetchEnabled;
    protected boolean isTransactional;
    protected boolean isInvalidationMode;
    protected volatile KeyInvalidationListener keyInvalidationListener;
    protected volatile CacheTopology cacheTopology;
    protected LimitedExecutor stateRequestExecutor;
    protected RpcOptions rpcOptions;
    private volatile boolean running;
    private static final Log log = LogFactory.getLog(StateConsumerImpl.class);
    protected static final long STATE_TRANSFER_FLAGS = EnumUtil.bitSetOf(Flag.PUT_FOR_STATE_TRANSFER, Flag.CACHE_MODE_LOCAL, Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_OWNERSHIP_CHECK, Flag.SKIP_XSITE_BACKUP, Flag.SKIP_LOCKING, Flag.IRAC_STATE);
    private final int firstTopologyAsMember = Integer.MAX_VALUE;
    protected final AtomicInteger stateTransferTopologyId = new AtomicInteger(-1);
    protected final AtomicBoolean waitingForState = new AtomicBoolean(false);
    protected CompletableFuture<Void> stateTransferFuture = CompletableFutures.completedNull();
    protected final Object transferMapsLock = new Object();

    @GuardedBy("transferMapsLock")
    private final Map<Address, List<InboundTransferTask>> transfersBySource = new HashMap();

    @GuardedBy("transferMapsLock")
    protected final Map<Integer, List<InboundTransferTask>> transfersBySegment = new HashMap();
    private volatile boolean ownsData = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-12.1.7.Final.jar:org/infinispan/statetransfer/StateConsumerImpl$ApplyStateTransaction.class */
    public static class ApplyStateTransaction extends TransactionImpl {
        static final int FORMAT_ID = 2;
        AtomicLong id = new AtomicLong(0);

        ApplyStateTransaction() {
            byte[] bArr = new byte[8];
            Util.longToBytes(this.id.incrementAndGet(), bArr, 0);
            setXid(XidImpl.create(2, bArr, bArr));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-12.1.7.Final.jar:org/infinispan/statetransfer/StateConsumerImpl$KeyInvalidationListener.class */
    public interface KeyInvalidationListener {
        void beforeInvalidation(IntSet intSet, IntSet intSet2);
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public void stopApplyingState(int i) {
        if (log.isTraceEnabled()) {
            log.tracef("Stop keeping track of changed keys for state transfer in topology %d", i);
        }
        this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
    }

    public boolean hasActiveTransfers() {
        boolean z;
        synchronized (this.transferMapsLock) {
            z = !this.transfersBySource.isEmpty();
        }
        return z;
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public boolean isStateTransferInProgress() {
        return this.stateTransferTopologyId.get() != -1;
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public boolean isStateTransferInProgressForKey(Object obj) {
        if (this.isInvalidationMode) {
            return false;
        }
        DistributionInfo distribution = this.distributionManager.getCacheTopology().getDistribution(obj);
        return distribution.isWriteOwner() && !distribution.isReadOwner();
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public boolean ownsData() {
        return this.ownsData;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.statetransfer.StateConsumer
    public CompletionStage<CompletionStage<Void>> onTopologyUpdate(CacheTopology cacheTopology, boolean z) {
        ConsistentHash writeConsistentHash = cacheTopology.getWriteConsistentHash();
        CacheTopology cacheTopology2 = this.cacheTopology;
        ConsistentHash currentCH = cacheTopology2 != null ? cacheTopology2.getCurrentCH() : null;
        ConsistentHash writeConsistentHash2 = cacheTopology2 != null ? cacheTopology2.getWriteConsistentHash() : null;
        IntSet ownedSegments = getOwnedSegments(writeConsistentHash);
        boolean contains = cacheTopology.getMembers().contains(this.rpcManager.getAddress());
        boolean z2 = writeConsistentHash2 != null && writeConsistentHash2.getMembers().contains(this.rpcManager.getAddress());
        if (log.isTraceEnabled()) {
            log.tracef("Received new topology for cache %s, isRebalance = %b, isMember = %b, topology = %s", this.cacheName, Boolean.valueOf(z), Boolean.valueOf(contains), cacheTopology);
        }
        if (!this.ownsData && contains) {
            this.ownsData = true;
        } else if (this.ownsData && !contains) {
            this.ownsData = false;
        }
        boolean z3 = cacheTopology.getPendingCH() != null && z2 && cacheTopology2.getPendingCH() == null;
        boolean z4 = !z && cacheTopology.getPhase() == CacheTopology.Phase.CONFLICT_RESOLUTION;
        boolean z5 = z || (z3 && !z4);
        if (z5 && !z && log.isTraceEnabled()) {
            log.tracef("Forcing startRebalance = true", new Object[0]);
        }
        CompletionStage completedNull = CompletableFutures.completedNull();
        if (z5) {
            this.stateTransferTopologyId.compareAndSet(-1, cacheTopology.getTopologyId());
            this.conflictManager.cancelVersionRequests();
            if (this.cacheNotifier.hasListener(DataRehashed.class)) {
                completedNull = this.cacheNotifier.notifyDataRehashed(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), cacheTopology.getUnionCH(), cacheTopology.getTopologyId(), true);
            }
        }
        return CompletionStages.handleAndCompose(completedNull.thenCompose(r13 -> {
            if (z4) {
                this.stateTransferTopologyId.set(-1);
            }
            this.waitingForState.set(false);
            this.stateTransferFuture = new CompletableFuture<>();
            beforeTopologyInstalled(cacheTopology.getTopologyId(), z5, writeConsistentHash2, writeConsistentHash);
            if (this.configuration.clustering().cacheMode().isInvalidation()) {
                return CompletableFutures.completedNull();
            }
            this.dataContainer.addSegments(ownedSegments);
            return CompletionStages.ignoreValue(this.persistenceManager.addSegments(ownedSegments));
        }).thenCompose(r10 -> {
            if (z5 || z4) {
                if (log.isTraceEnabled()) {
                    log.tracef("Start keeping track of keys for rebalance", new Object[0]);
                }
                this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
                this.commitManager.startTrack(Flag.PUT_FOR_STATE_TRANSFER);
            }
            this.stateTransferLock.acquireExclusiveTopologyLock();
            try {
                this.cacheTopology = cacheTopology;
                this.distributionManager.setCacheTopology(cacheTopology);
                this.stateTransferLock.releaseExclusiveTopologyLock();
                this.stateTransferLock.notifyTopologyInstalled(cacheTopology.getTopologyId());
                this.inboundInvocationHandler.checkForReadyTasks();
                this.xSiteStateTransferManager.onTopologyUpdated(cacheTopology, isStateTransferInProgress());
                return (z2 || !contains) ? CompletableFutures.completedNull() : fetchClusterListeners(cacheTopology);
            } catch (Throwable th) {
                this.stateTransferLock.releaseExclusiveTopologyLock();
                throw th;
            }
        }).thenCompose(r132 -> {
            IntSet mutableCopyFrom;
            IntSet mutableCopyFrom2;
            if (z4 || !(this.isTransactional || this.isFetchEnabled)) {
                return CompletableFutures.completedNull();
            }
            if (writeConsistentHash2 == null) {
                mutableCopyFrom = IntSets.immutableEmptySet();
                mutableCopyFrom2 = IntSets.immutableEmptySet();
                if (log.isTraceEnabled()) {
                    log.tracef("On cache %s we have: added segments: %s", this.cacheName, mutableCopyFrom2);
                }
            } else {
                IntSet ownedSegments2 = getOwnedSegments(writeConsistentHash2);
                if (ownedSegments.size() == writeConsistentHash.getNumSegments()) {
                    mutableCopyFrom = IntSets.immutableEmptySet();
                } else {
                    mutableCopyFrom = IntSets.mutableCopyFrom(ownedSegments2);
                    mutableCopyFrom.removeAll(ownedSegments);
                }
                mutableCopyFrom2 = IntSets.mutableCopyFrom(ownedSegments);
                mutableCopyFrom2.removeAll(ownedSegments2);
                if (log.isTraceEnabled()) {
                    log.tracef("On cache %s we have: new segments: %s; old segments: %s", this.cacheName, ownedSegments, ownedSegments2);
                    log.tracef("On cache %s we have: added segments: %s; removed segments: %s", this.cacheName, mutableCopyFrom2, mutableCopyFrom);
                }
                cancelTransfers(mutableCopyFrom);
                if (!z5 && !mutableCopyFrom2.isEmpty() && !this.configuration.clustering().cacheMode().isScattered()) {
                    log.debugf("Not requesting segments %s because the last owner left the cluster", mutableCopyFrom2);
                    mutableCopyFrom2.clear();
                }
                restartBrokenTransfers(cacheTopology, mutableCopyFrom2);
            }
            return handleSegments(z5, mutableCopyFrom2, mutableCopyFrom);
        }).thenCompose(r12 -> {
            int i = this.stateTransferTopologyId.get();
            if (log.isTraceEnabled()) {
                log.tracef("Topology update processed, stateTransferTopologyId = %d, startRebalance = %s, pending CH = %s", Integer.valueOf(i), Boolean.valueOf(z5), cacheTopology.getPendingCH());
            }
            if (i != -1 && !z5 && !cacheTopology.getPhase().isRebalance() && this.stateTransferTopologyId.compareAndSet(i, -1)) {
                stopApplyingState(i);
                ConsistentHash pendingCH = cacheTopology.getPendingCH();
                if (pendingCH == null) {
                    pendingCH = cacheTopology.getCurrentCH();
                }
                if (this.cacheNotifier.hasListener(DataRehashed.class)) {
                    return this.cacheNotifier.notifyDataRehashed(currentCH, pendingCH, writeConsistentHash2, cacheTopology.getTopologyId(), false);
                }
            }
            return CompletableFutures.completedNull();
        }), (r9, th) -> {
            if (log.isTraceEnabled()) {
                log.tracef("Unlock State Transfer in Progress for topology ID %s", cacheTopology.getTopologyId());
            }
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            this.inboundInvocationHandler.checkForReadyTasks();
            if (this.stateTransferTopologyId.get() != -1 && contains) {
                this.waitingForState.set(true);
            }
            notifyEndOfStateTransferIfNeeded();
            try {
                if (this.transactionTable != null) {
                    this.transactionTable.cleanupLeaverTransactions(this.rpcManager.getTransport().getMembers());
                }
            } catch (Exception e) {
                log.transactionCleanupError(e);
            }
            this.commandAckCollector.onMembersChange(writeConsistentHash.getMembers());
            switch (cacheTopology.getPhase()) {
                case READ_ALL_WRITE_ALL:
                case READ_NEW_WRITE_ALL:
                    this.stateTransferFuture.complete(null);
                    break;
            }
            if ((!contains && !z2) || cacheTopology.getPhase() != CacheTopology.Phase.NO_REBALANCE) {
                CompletableFutures.rethrowExceptionIfPresent(th);
                return CompletableFuture.completedFuture(this.stateTransferFuture);
            }
            int numSegments = writeConsistentHash.getNumSegments();
            IntSet mutableEmptySet = IntSets.mutableEmptySet(numSegments);
            IntSet ownedSegments2 = getOwnedSegments(writeConsistentHash);
            for (int i = 0; i < numSegments; i++) {
                if (!ownedSegments2.contains(i)) {
                    mutableEmptySet.add(i);
                }
            }
            return removeStaleData(mutableEmptySet).thenApply(r4 -> {
                this.conflictManager.restartVersionRequests();
                CompletableFutures.rethrowExceptionIfPresent(th);
                return this.stateTransferFuture;
            });
        });
    }

    private CompletionStage<Void> fetchClusterListeners(CacheTopology cacheTopology) {
        return (this.configuration.clustering().cacheMode().isDistributed() || this.configuration.clustering().cacheMode().isScattered()) ? getClusterListeners(cacheTopology.getTopologyId(), cacheTopology.getReadConsistentHash().getMembers()).thenAccept(collection -> {
            Cache<Object, Object> wired = this.cache.wired();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                try {
                    ((ClusterListenerReplicateCallable) it.next()).accept(wired.getCacheManager(), (Cache) wired);
                } catch (Exception e) {
                    log.clusterListenerInstallationFailure(e);
                }
            }
        }) : CompletableFutures.completedNull();
    }

    protected void beforeTopologyInstalled(int i, boolean z, ConsistentHash consistentHash, ConsistentHash consistentHash2) {
    }

    protected CompletionStage<Void> handleSegments(boolean z, IntSet intSet, IntSet intSet2) {
        return intSet.isEmpty() ? CompletableFutures.completedNull() : addTransfers(intSet);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean notifyEndOfStateTransferIfNeeded() {
        if (!this.waitingForState.get()) {
            if (!log.isTraceEnabled()) {
                return true;
            }
            log.tracef("No end of state transfer notification, waitingForState already set to false by another thread", new Object[0]);
            return true;
        }
        if (hasActiveTransfers()) {
            if (!log.isTraceEnabled()) {
                return false;
            }
            log.tracef("No end of state transfer notification, active transfers still exist", new Object[0]);
            return false;
        }
        if (this.waitingForState.compareAndSet(true, false)) {
            int i = this.stateTransferTopologyId.get();
            log.debugf("Finished receiving of segments for cache %s for topology %d.", this.cacheName, Integer.valueOf(i));
            stopApplyingState(i);
            this.stateTransferFuture.complete(null);
        }
        if (!log.isTraceEnabled()) {
            return false;
        }
        log.tracef("No end of state transfer notification, waitingForState already set to false by another thread", new Object[0]);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IntSet getOwnedSegments(ConsistentHash consistentHash) {
        Address address = this.rpcManager.getAddress();
        return consistentHash.getMembers().contains(address) ? IntSets.from(consistentHash.getSegmentsForOwner(address)) : IntSets.immutableEmptySet();
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public CompletionStage<?> applyState(Address address, int i, boolean z, Collection<StateChunk> collection) {
        ConsistentHash writeConsistentHash = this.cacheTopology.getWriteConsistentHash();
        if (!writeConsistentHash.getMembers().contains(this.rpcManager.getAddress())) {
            if (log.isTraceEnabled()) {
                log.tracef("Ignoring received state because we are no longer a member of cache %s", this.cacheName);
            }
            return CompletableFutures.completedNull();
        }
        int i2 = this.stateTransferTopologyId.get();
        if (i2 == -1 && !z) {
            log.debugf("Discarding state response with topology id %d for cache %s, we don't have a state transfer in progress", i, (Object) this.cacheName);
            return CompletableFutures.completedNull();
        }
        if (i < i2) {
            log.debugf("Discarding state response with old topology id %d for cache %s, state transfer request topology was %b", i, (Object) this.cacheName, (Object) this.waitingForState);
            return CompletableFutures.completedNull();
        }
        if (log.isTraceEnabled()) {
            log.tracef("Before applying the received state the data container of cache %s has %d keys", this.cacheName, Integer.valueOf(this.dataContainer.sizeIncludingExpired()));
        }
        return applyStateIteration(address, z, IntSets.from(writeConsistentHash.getSegmentsForOwner(this.rpcManager.getAddress())), collection.iterator()).whenComplete((obj, th) -> {
            if (log.isTraceEnabled()) {
                log.tracef("After applying the received state the data container of cache %s has %d keys", this.cacheName, Integer.valueOf(this.dataContainer.sizeIncludingExpired()));
                synchronized (this.transferMapsLock) {
                    log.tracef("Segments not received yet for cache %s: %s", this.cacheName, this.transfersBySource);
                }
            }
        });
    }

    private CompletionStage<?> applyStateIteration(Address address, boolean z, IntSet intSet, Iterator<StateChunk> it) {
        CompletionStage<?> completionStage;
        CompletionStage<?> completedNull = CompletableFutures.completedNull();
        while (true) {
            completionStage = completedNull;
            if (!it.hasNext() || !CompletionStages.isCompletedSuccessfully(completionStage)) {
                break;
            }
            StateChunk next = it.next();
            completedNull = z ? doApplyState(address, next.getSegmentId(), next.getCacheEntries()) : applyChunk(address, intSet, next);
        }
        return !it.hasNext() ? completionStage : completionStage.thenCompose(obj -> {
            return applyStateIteration(address, z, intSet, it);
        });
    }

    private CompletionStage<Void> applyChunk(Address address, IntSet intSet, StateChunk stateChunk) {
        InboundTransferTask orElse;
        if (!intSet.contains(stateChunk.getSegmentId())) {
            log.debugf("Discarding received cache entries for segment %d of cache %s because they do not belong to this node.", stateChunk.getSegmentId(), (Object) this.cacheName);
            return CompletableFutures.completedNull();
        }
        synchronized (this.transferMapsLock) {
            List<InboundTransferTask> list = this.transfersBySegment.get(Integer.valueOf(stateChunk.getSegmentId()));
            orElse = list != null ? list.stream().filter(inboundTransferTask -> {
                return inboundTransferTask.getSource().equals(address);
            }).findFirst().orElse(null) : null;
        }
        if (orElse != null) {
            InboundTransferTask inboundTransferTask2 = orElse;
            return doApplyState(address, stateChunk.getSegmentId(), stateChunk.getCacheEntries()).thenAccept(obj -> {
                inboundTransferTask2.onStateReceived(stateChunk.getSegmentId(), stateChunk.isLastChunk());
            });
        }
        if (this.cache.wired().getStatus().allowInvocations()) {
            log.ignoringUnsolicitedState(address, stateChunk.getSegmentId(), this.cacheName);
        }
        return CompletableFutures.completedNull();
    }

    private CompletionStage<?> doApplyState(Address address, int i, Collection<InternalCacheEntry<?, ?>> collection) {
        if (collection == null || collection.isEmpty()) {
            return CompletableFutures.completedNull();
        }
        if (log.isTraceEnabled()) {
            log.tracef("Applying new state chunk for segment %d of cache %s from node %s: received %d cache entries", Integer.valueOf(i), this.cacheName, address, Integer.valueOf(collection.size()));
        }
        if (!(this.transactionManager != null)) {
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (InternalCacheEntry<?, ?> internalCacheEntry : collection) {
                aggregateCompletionStage.dependsOn(invokePut(i, this.icf.createSingleKeyNonTxInvocationContext(), internalCacheEntry).exceptionally(th -> {
                    logApplyException(th, internalCacheEntry.getKey());
                    return null;
                }));
            }
            return aggregateCompletionStage.freeze();
        }
        Object obj = NO_KEY;
        InvocationContext createInvocationContext = this.icf.createInvocationContext((Transaction) new ApplyStateTransaction(), false);
        LocalTransaction cacheTransaction = ((LocalTxInvocationContext) createInvocationContext).getCacheTransaction();
        try {
            cacheTransaction.setStateTransferFlag(Flag.PUT_FOR_STATE_TRANSFER);
            for (InternalCacheEntry<?, ?> internalCacheEntry2 : collection) {
                obj = internalCacheEntry2.getKey();
                if (!invokePut(i, createInvocationContext, internalCacheEntry2).isDone()) {
                    throw new IllegalStateException("State transfer in-tx put should always be synchronous");
                }
            }
            return invoke1PCPrepare(cacheTransaction).whenComplete((obj2, th2) -> {
                this.transactionTable.removeLocalTransaction(cacheTransaction);
                if (th2 != null) {
                    logApplyException(th2, NO_KEY);
                }
            });
        } catch (Throwable th3) {
            logApplyException(th3, obj);
            return invokeRollback(cacheTransaction).handle((obj3, th4) -> {
                this.transactionTable.removeLocalTransaction(cacheTransaction);
                if (th4 == null) {
                    return null;
                }
                th3.addSuppressed(th4);
                return null;
            });
        }
    }

    private CompletionStage<?> invoke1PCPrepare(LocalTransaction localTransaction) {
        return this.interceptorChain.invokeAsync(this.icf.createTxInvocationContext(localTransaction), Configurations.isTxVersioned(this.configuration) ? this.commandsFactory.buildVersionedPrepareCommand(localTransaction.getGlobalTransaction(), localTransaction.getModifications(), true) : this.commandsFactory.buildPrepareCommand(localTransaction.getGlobalTransaction(), localTransaction.getModifications(), true));
    }

    private CompletionStage<?> invokeRollback(LocalTransaction localTransaction) {
        RollbackCommand buildRollbackCommand = this.commandsFactory.buildRollbackCommand(localTransaction.getGlobalTransaction());
        return this.interceptorChain.invokeAsync(this.icf.createTxInvocationContext(localTransaction), buildRollbackCommand);
    }

    private CompletableFuture<?> invokePut(int i, InvocationContext invocationContext, InternalCacheEntry<?, ?> internalCacheEntry) {
        PutKeyValueCommand buildPutKeyValueCommand = this.commandsFactory.buildPutKeyValueCommand(internalCacheEntry.getKey(), internalCacheEntry.getValue(), i, new InternalMetadataImpl(internalCacheEntry), STATE_TRANSFER_FLAGS);
        buildPutKeyValueCommand.setInternalMetadata(internalCacheEntry.getInternalMetadata());
        invocationContext.setLockOwner(buildPutKeyValueCommand.getKeyLockOwner());
        return this.interceptorChain.invokeAsync(invocationContext, buildPutKeyValueCommand);
    }

    private void logApplyException(Throwable th, Object obj) {
        if (this.cache.wired().getStatus().allowInvocations()) {
            log.problemApplyingStateForKey(th.getMessage(), obj, th);
        } else {
            log.tracef("Cache %s is shutting down, stopping state transfer", this.cacheName);
        }
    }

    private void applyTransactions(Address address, Collection<TransactionInfo> collection, int i) {
        log.debugf("Applying %d transactions for cache %s transferred from node %s", collection.size(), (Object) this.cacheName, (Object) address);
        if (this.isTransactional) {
            for (TransactionInfo transactionInfo : collection) {
                GlobalTransaction globalTransaction = transactionInfo.getGlobalTransaction();
                if (!this.rpcManager.getAddress().equals(globalTransaction.getAddress())) {
                    globalTransaction.setRemote(true);
                    CacheTransaction localTransaction = this.transactionTable.getLocalTransaction(globalTransaction);
                    if (localTransaction == null) {
                        localTransaction = this.transactionTable.getRemoteTransaction(globalTransaction);
                        if (localTransaction == null) {
                            try {
                                localTransaction = this.transactionTable.getOrCreateRemoteTransaction(globalTransaction, transactionInfo.getModifications(), i - 1);
                                ((RemoteTransaction) localTransaction).setLookedUpEntriesTopology(i - 1);
                            } catch (Throwable th) {
                                if (log.isTraceEnabled()) {
                                    log.tracef(th, "Failed to create remote transaction %s", globalTransaction);
                                }
                            }
                        }
                    }
                    if (localTransaction != null) {
                        Set<Object> lockedKeys = transactionInfo.getLockedKeys();
                        CacheTransaction cacheTransaction = localTransaction;
                        Objects.requireNonNull(cacheTransaction);
                        lockedKeys.forEach(cacheTransaction::addBackupLockForKey);
                    }
                }
            }
        }
    }

    @Start(priority = 20)
    public void start() {
        this.cacheName = this.cache.wired().getName();
        this.isInvalidationMode = this.configuration.clustering().cacheMode().isInvalidation();
        this.isTransactional = this.configuration.transaction().transactionMode().isTransactional();
        this.timeout = this.configuration.clustering().stateTransfer().timeout();
        this.isFetchEnabled = this.configuration.clustering().cacheMode().needsStateTransfer() && (this.configuration.clustering().stateTransfer().fetchInMemoryState() || this.configuration.persistence().fetchPersistentState().booleanValue());
        this.rpcOptions = new RpcOptions(DeliverOrder.NONE, this.timeout, TimeUnit.MILLISECONDS);
        this.stateRequestExecutor = new LimitedExecutor("StateRequest-" + this.cacheName, this.nonBlockingExecutor, 1);
        this.running = true;
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    @Stop(priority = 0)
    public void stop() {
        if (log.isTraceEnabled()) {
            log.tracef("Shutting down StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        this.running = false;
        try {
            synchronized (this.transferMapsLock) {
                ArrayList arrayList = new ArrayList(this.transfersBySource.values());
                this.transfersBySource.clear();
                this.transfersBySegment.clear();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((List) it.next()).forEach((v0) -> {
                        v0.cancel();
                    });
                }
            }
            this.stateRequestExecutor.shutdownNow();
        } catch (Throwable th) {
            log.errorf(th, "Failed to stop StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    public void setKeyInvalidationListener(KeyInvalidationListener keyInvalidationListener) {
        this.keyInvalidationListener = keyInvalidationListener;
    }

    private CompletionStage<Void> addTransfers(IntSet intSet) {
        log.debugf("Adding inbound state transfer for segments %s", intSet);
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        CompletionStage<Void> completedNull = CompletableFutures.completedNull();
        if (this.isTransactional) {
            completedNull = requestTransactions(intSet, hashMap, hashSet);
        }
        if (this.isFetchEnabled) {
            completedNull = completedNull.thenRun(() -> {
                requestSegments(intSet, hashMap, hashSet);
            });
        }
        return completedNull;
    }

    private void findSources(IntSet intSet, Map<Address, IntSet> map, Set<Address> set) {
        if (this.cache.wired().getStatus().isTerminated()) {
            return;
        }
        int numSegments = this.configuration.clustering().hash().numSegments();
        IntSet mutableEmptySet = IntSets.mutableEmptySet(numSegments);
        PrimitiveIterator.OfInt it = intSet.iterator();
        while (it.hasNext()) {
            int nextInt = it.nextInt();
            Address findSource = findSource(nextInt, set);
            if (findSource != null) {
                map.computeIfAbsent(findSource, address -> {
                    return IntSets.mutableEmptySet(numSegments);
                }).set(nextInt);
            } else {
                mutableEmptySet.set(nextInt);
            }
        }
        if (mutableEmptySet.isEmpty()) {
            return;
        }
        log.noLiveOwnersFoundForSegments(mutableEmptySet, this.cacheName, set);
    }

    private Address findSource(int i, Set<Address> set) {
        List<Address> locateOwnersForSegment = this.cacheTopology.getReadConsistentHash().locateOwnersForSegment(i);
        if (locateOwnersForSegment.contains(this.rpcManager.getAddress())) {
            return null;
        }
        for (Address address : locateOwnersForSegment) {
            if (!address.equals(this.rpcManager.getAddress()) && !set.contains(address)) {
                return address;
            }
        }
        return null;
    }

    private CompletionStage<Void> requestTransactions(IntSet intSet, Map<Address, IntSet> map, Set<Address> set) {
        findSources(intSet, map, set);
        AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        IntSet concurrentSet = IntSets.concurrentSet(this.configuration.clustering().hash().numSegments());
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        int topologyId = this.cacheTopology.getTopologyId();
        for (Map.Entry<Address, IntSet> entry : map.entrySet()) {
            Address key = entry.getKey();
            IntSet value = entry.getValue();
            aggregateCompletionStage.dependsOn(getTransactions(key, value, topologyId).whenComplete((response, th) -> {
                processTransactionsResponse(intSet, map, concurrentSet, newKeySet, topologyId, key, value, response, th);
            }));
        }
        return aggregateCompletionStage.freeze().thenCompose(r11 -> {
            if (concurrentSet.isEmpty()) {
                return CompletableFutures.completedNull();
            }
            set.addAll(newKeySet);
            map.clear();
            findSources(concurrentSet, map, set);
            return requestTransactions(intSet, map, set);
        });
    }

    private void processTransactionsResponse(IntSet intSet, Map<Address, IntSet> map, IntSet intSet2, Set<Address> set, int i, Address address, IntSet intSet3, Response response, Throwable th) {
        boolean z = false;
        boolean z2 = false;
        if (th != null) {
            if (this.cache.wired().getStatus().isTerminated()) {
                log.debugf("Cache %s has stopped while requesting transactions", this.cacheName);
                map.clear();
                return;
            } else {
                log.failedToRetrieveTransactionsForSegments(this.cacheName, address, intSet, th);
                z = true;
            }
        }
        if (response instanceof SuccessfulResponse) {
            applyTransactions(address, (List) ((SuccessfulResponse) response).getResponseValue(), i);
        } else if (response instanceof CacheNotFoundResponse) {
            log.debugf("Cache %s was stopped on node %s before sending transaction information", this.cacheName, address);
            z = true;
            z2 = true;
        } else {
            log.unsuccessfulResponseRetrievingTransactionsForSegments(address, response);
            z = true;
        }
        if (z) {
            intSet2.addAll(intSet3);
        }
        if (z2) {
            set.add(address);
        }
    }

    private CompletionStage<Collection<ClusterListenerReplicateCallable<Object, Object>>> getClusterListeners(int i, List<Address> list) {
        if (list.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("Unable to acquire cluster listeners from other members, assuming none are present");
            }
            return CompletableFuture.completedFuture(Collections.emptySet());
        }
        Address address = list.get(0);
        if (list.get(0).equals(this.rpcManager.getAddress())) {
            return getClusterListeners(i, list.subList(1, list.size()));
        }
        if (log.isTraceEnabled()) {
            log.tracef("Requesting cluster listeners of cache %s from node %s", this.cacheName, list);
        }
        return CompletionStages.handleAndCompose(this.rpcManager.invokeCommand(address, this.commandsFactory.buildStateTransferGetListenersCommand(i), SingleResponseCollector.validOnly(), this.rpcOptions), (validResponse, th) -> {
            if (th != null) {
                log.exceptionDuringClusterListenerRetrieval(address, th);
            }
            if (validResponse instanceof SuccessfulResponse) {
                return CompletableFuture.completedFuture((Collection) validResponse.getResponseValue());
            }
            log.unsuccessfulResponseForClusterListeners(address, validResponse);
            return getClusterListeners(i, list.subList(1, list.size()));
        });
    }

    private CompletionStage<Response> getTransactions(Address address, IntSet intSet, int i) {
        if (log.isTraceEnabled()) {
            log.tracef("Requesting transactions from node %s for segments %s", address, intSet);
        }
        return this.rpcManager.invokeCommand(address, this.commandsFactory.buildStateTransferGetTransactionsCommand(i, intSet), PassthroughSingleResponseCollector.INSTANCE, this.rpcOptions);
    }

    private void requestSegments(IntSet intSet, Map<Address, IntSet> map, Set<Address> set) {
        if (map.isEmpty()) {
            findSources(intSet, map, set);
        }
        for (Map.Entry<Address, IntSet> entry : map.entrySet()) {
            addTransfer(entry.getKey(), entry.getValue());
        }
        if (log.isTraceEnabled()) {
            log.tracef("Finished adding inbound state transfer for segments %s", intSet, this.cacheName);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cancelTransfers(IntSet intSet) {
        synchronized (this.transferMapsLock) {
            ArrayList arrayList = new ArrayList(intSet);
            while (!arrayList.isEmpty()) {
                List<InboundTransferTask> list = this.transfersBySegment.get(Integer.valueOf(((Integer) arrayList.remove(0)).intValue()));
                if (list != null) {
                    for (InboundTransferTask inboundTransferTask : list) {
                        IntSet mutableCopyFrom = IntSets.mutableCopyFrom(intSet);
                        mutableCopyFrom.retainAll(inboundTransferTask.getSegments());
                        arrayList.removeAll(mutableCopyFrom);
                        this.transfersBySegment.keySet().removeAll(mutableCopyFrom);
                        inboundTransferTask.cancelSegments(mutableCopyFrom);
                        if (inboundTransferTask.isCancelled()) {
                            removeTransfer(inboundTransferTask);
                        }
                    }
                }
            }
        }
    }

    protected CompletionStage<Void> removeStaleData(IntSet intSet) {
        if (this.configuration.clustering().cacheMode().isInvalidation()) {
            return CompletableFutures.completedNull();
        }
        log.debugf("Removing no longer owned entries for cache %s", this.cacheName);
        if (this.keyInvalidationListener != null) {
            this.keyInvalidationListener.beforeInvalidation(intSet, IntSets.immutableEmptySet());
        }
        this.localPublisherManager.segmentsLost(intSet);
        this.dataContainer.removeSegments(intSet);
        return intSet.isEmpty() ? CompletableFutures.completedNull() : this.persistenceManager.removeSegments(intSet).thenCompose(bool -> {
            return invalidateStaleEntries(intSet, bool);
        });
    }

    private CompletionStage<Void> invalidateStaleEntries(IntSet intSet, Boolean bool) {
        if (bool.booleanValue()) {
            return CompletableFutures.completedNull();
        }
        AtomicLong atomicLong = new AtomicLong();
        return Flowable.fromPublisher(this.persistenceManager.publishKeys(obj -> {
            return intSet.contains(getSegment(obj));
        }, PersistenceManager.AccessMode.PRIVATE)).onErrorResumeNext(th -> {
            Log.PERSISTENCE.failedLoadingKeysFromCacheStore(th);
            return Flowable.empty();
        }).buffer(this.configuration.clustering().stateTransfer().chunkSize()).concatMapCompletable(list -> {
            atomicLong.addAndGet(list.size());
            return Completable.fromCompletionStage(invalidateBatch(list));
        }).toCompletionStage(null).thenRun(() -> {
            if (log.isTraceEnabled()) {
                log.tracef("Removed %d keys, data container now has %d keys", atomicLong.get(), this.dataContainer.sizeIncludingExpired());
            }
        });
    }

    protected CompletionStage<Void> invalidateBatch(Collection<Object> collection) {
        InvalidateCommand buildInvalidateCommand = this.commandsFactory.buildInvalidateCommand(EnumUtil.bitSetOf(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING), collection.toArray());
        NonTxInvocationContext createNonTxInvocationContext = this.icf.createNonTxInvocationContext();
        createNonTxInvocationContext.setLockOwner(buildInvalidateCommand.getKeyLockOwner());
        return this.interceptorChain.invokeAsync(createNonTxInvocationContext, buildInvalidateCommand).handle((obj, th) -> {
            if ((th instanceof IllegalLifecycleStateException) || th == null) {
                return null;
            }
            log.failedToInvalidateKeys(th);
            return null;
        });
    }

    private void restartBrokenTransfers(CacheTopology cacheTopology, IntSet intSet) {
        HashSet hashSet = new HashSet(cacheTopology.getReadConsistentHash().getMembers());
        synchronized (this.transferMapsLock) {
            Iterator<Map.Entry<Address, List<InboundTransferTask>>> it = this.transfersBySource.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Address, List<InboundTransferTask>> next = it.next();
                Address key = next.getKey();
                if (!hashSet.contains(key)) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Removing inbound transfers from source %s for cache %s", key, this.cacheName);
                    }
                    List<InboundTransferTask> value = next.getValue();
                    it.remove();
                    for (InboundTransferTask inboundTransferTask : value) {
                        if (log.isTraceEnabled()) {
                            log.tracef("Removing inbound transfers from node %s for segments %s", key, inboundTransferTask.getSegments());
                        }
                        IntSet unfinishedSegments = inboundTransferTask.getUnfinishedSegments();
                        inboundTransferTask.cancel();
                        intSet.addAll(unfinishedSegments);
                        this.transfersBySegment.keySet().removeAll(unfinishedSegments);
                    }
                }
            }
            intSet.removeAll(this.transfersBySegment.keySet());
        }
    }

    private int getSegment(Object obj) {
        return this.keyPartitioner.getSegment(obj);
    }

    private InboundTransferTask addTransfer(Address address, IntSet intSet) {
        synchronized (this.transferMapsLock) {
            if (log.isTraceEnabled()) {
                log.tracef("Adding transfer from %s for segments %s", address, intSet);
            }
            intSet.removeAll(this.transfersBySegment.keySet());
            if (intSet.isEmpty()) {
                if (log.isTraceEnabled()) {
                    log.tracef("All segments are already in progress, skipping", new Object[0]);
                }
                return null;
            }
            InboundTransferTask inboundTransferTask = new InboundTransferTask(intSet, address, this.cacheTopology.getTopologyId(), this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, true);
            addTransfer(inboundTransferTask, intSet);
            this.stateRequestExecutor.executeAsync(() -> {
                return inboundTransferTask.requestSegments().whenComplete((r5, th) -> {
                    onTaskCompletion(inboundTransferTask);
                });
            });
            return inboundTransferTask;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @GuardedBy("transferMapsLock")
    public void addTransfer(InboundTransferTask inboundTransferTask, IntSet intSet) {
        if (!this.running) {
            throw new IllegalLifecycleStateException("State consumer is not running for cache " + this.cacheName);
        }
        PrimitiveIterator.OfInt it = intSet.iterator();
        while (it.hasNext()) {
            this.transfersBySegment.computeIfAbsent(Integer.valueOf(it.nextInt()), num -> {
                return new ArrayList();
            }).add(inboundTransferTask);
        }
        this.transfersBySource.computeIfAbsent(inboundTransferTask.getSource(), address -> {
            return new ArrayList();
        }).add(inboundTransferTask);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean removeTransfer(InboundTransferTask inboundTransferTask) {
        boolean z = false;
        synchronized (this.transferMapsLock) {
            if (log.isTraceEnabled()) {
                log.tracef("Removing inbound transfers from node %s for segments %s", inboundTransferTask.getSegments(), inboundTransferTask.getSource(), this.cacheName);
            }
            List<InboundTransferTask> list = this.transfersBySource.get(inboundTransferTask.getSource());
            if (list != null) {
                boolean remove = list.remove(inboundTransferTask);
                z = remove;
                if (remove && list.isEmpty()) {
                    this.transfersBySource.remove(inboundTransferTask.getSource());
                }
            }
            PrimitiveIterator.OfInt it = inboundTransferTask.getSegments().iterator();
            while (it.hasNext()) {
                Integer num = (Integer) it.next();
                List<InboundTransferTask> list2 = this.transfersBySegment.get(num);
                if (list2 != null && list2.remove(inboundTransferTask) && list2.isEmpty()) {
                    this.transfersBySegment.remove(num);
                }
            }
        }
        return z;
    }

    protected void onTaskCompletion(InboundTransferTask inboundTransferTask) {
        if (log.isTraceEnabled()) {
            log.tracef("Inbound transfer finished: %s", inboundTransferTask);
        }
        if (inboundTransferTask.isCompletedSuccessfully()) {
            removeTransfer(inboundTransferTask);
            notifyEndOfStateTransferIfNeeded();
        }
    }
}
