package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.executors.SemaphoreCompletionService;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.filter.KeyFilter;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.RemoteTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.totalorder.TotalOrderLatch;
import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;
import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.3.Final.jar:org/infinispan/statetransfer/StateConsumerImpl.class */
public class StateConsumerImpl implements StateConsumer {
    private static final Log log = LogFactory.getLog(StateConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    public static final int NO_REBALANCE_IN_PROGRESS = -1;
    private Cache cache;
    private StateTransferManager stateTransferManager;
    private String cacheName;
    private Configuration configuration;
    private RpcManager rpcManager;
    private TransactionManager transactionManager;
    private CommandsFactory commandsFactory;
    private TransactionTable transactionTable;
    private DataContainer<Object, Object> dataContainer;
    private PersistenceManager persistenceManager;
    private InterceptorChain interceptorChain;
    private InvocationContextFactory icf;
    private StateTransferLock stateTransferLock;
    private CacheNotifier cacheNotifier;
    private TotalOrderManager totalOrderManager;
    private BlockingTaskAwareExecutorService remoteCommandsExecutor;
    private long timeout;
    private boolean isFetchEnabled;
    private boolean isTransactional;
    private boolean isInvalidationMode;
    private boolean isTotalOrder;
    private volatile KeyInvalidationListener keyInvalidationListener;
    private CommitManager commitManager;
    private ExecutorService stateTransferExecutor;
    private volatile CacheTopology cacheTopology;
    private SemaphoreCompletionService<Void> stateRequestCompletionService;
    private RpcOptions rpcOptions;
    private final AtomicInteger stateTransferTopologyId = new AtomicInteger(-1);
    private final AtomicBoolean waitingForState = new AtomicBoolean(false);
    private final Object transferMapsLock = new Object();

    @GuardedBy("transferMapsLock")
    private final Map<Address, List<InboundTransferTask>> transfersBySource = new HashMap();

    @GuardedBy("transferMapsLock")
    private final Map<Integer, InboundTransferTask> transfersBySegment = new HashMap();
    private volatile boolean ownsData = false;

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.1.3.Final.jar:org/infinispan/statetransfer/StateConsumerImpl$KeyInvalidationListener.class */
    public interface KeyInvalidationListener {
        void beforeInvalidation(Set<Integer> set, Set<Integer> set2);
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public void stopApplyingState() {
        if (trace) {
            log.tracef("Stop keeping track of changed keys for state transfer", new Object[0]);
        }
        this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
    }

    @Inject
    public void init(Cache cache, @ComponentName("org.infinispan.executors.stateTransferExecutor") ExecutorService executorService, StateTransferManager stateTransferManager, InterceptorChain interceptorChain, InvocationContextFactory invocationContextFactory, Configuration configuration, RpcManager rpcManager, TransactionManager transactionManager, CommandsFactory commandsFactory, PersistenceManager persistenceManager, DataContainer<Object, Object> dataContainer, TransactionTable transactionTable, StateTransferLock stateTransferLock, CacheNotifier cacheNotifier, TotalOrderManager totalOrderManager, @ComponentName("org.infinispan.executors.remote") BlockingTaskAwareExecutorService blockingTaskAwareExecutorService, CommitManager commitManager) {
        this.cache = cache;
        this.cacheName = cache.getName();
        this.stateTransferExecutor = executorService;
        this.stateTransferManager = stateTransferManager;
        this.interceptorChain = interceptorChain;
        this.icf = invocationContextFactory;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.transactionManager = transactionManager;
        this.commandsFactory = commandsFactory;
        this.persistenceManager = persistenceManager;
        this.dataContainer = dataContainer;
        this.transactionTable = transactionTable;
        this.stateTransferLock = stateTransferLock;
        this.cacheNotifier = cacheNotifier;
        this.totalOrderManager = totalOrderManager;
        this.remoteCommandsExecutor = blockingTaskAwareExecutorService;
        this.commitManager = commitManager;
        this.isInvalidationMode = configuration.clustering().cacheMode().isInvalidation();
        this.isTransactional = configuration.transaction().transactionMode().isTransactional();
        this.isTotalOrder = configuration.transaction().transactionProtocol().isTotalOrder();
        this.timeout = configuration.clustering().stateTransfer().timeout();
        this.stateRequestCompletionService = new SemaphoreCompletionService<>(executorService, 1);
    }

    public boolean hasActiveTransfers() {
        boolean z;
        synchronized (this.transferMapsLock) {
            z = !this.transfersBySource.isEmpty();
        }
        return z;
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public boolean isStateTransferInProgress() {
        return this.stateTransferTopologyId.get() != -1;
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public boolean isStateTransferInProgressForKey(Object obj) {
        CacheTopology cacheTopology;
        if (this.isInvalidationMode || (cacheTopology = this.cacheTopology) == null || cacheTopology.getPendingCH() == null) {
            return false;
        }
        Address address = this.rpcManager.getAddress();
        return cacheTopology.getPendingCH().isKeyLocalToNode(address, obj) && !cacheTopology.getCurrentCH().isKeyLocalToNode(address, obj);
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public boolean ownsData() {
        return this.ownsData;
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public void onTopologyUpdate(CacheTopology cacheTopology, boolean z) {
        Set<Integer> hashSet;
        Set<Integer> hashSet2;
        boolean contains = cacheTopology.getMembers().contains(this.rpcManager.getAddress());
        if (trace) {
            log.tracef("Received new topology for cache %s, isRebalance = %b, isMember = %b, topology = %s", this.cacheName, Boolean.valueOf(z), Boolean.valueOf(contains), cacheTopology);
        }
        if (!this.ownsData && contains) {
            this.ownsData = true;
        } else if (this.ownsData && !contains) {
            this.ownsData = false;
        }
        if (z) {
            this.stateTransferTopologyId.compareAndSet(-1, cacheTopology.getTopologyId());
            this.cacheNotifier.notifyDataRehashed(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), cacheTopology.getUnionCH(), cacheTopology.getTopologyId(), true);
        }
        awaitTotalOrderTransactions(cacheTopology, z);
        this.waitingForState.set(false);
        ConsistentHash writeConsistentHash = cacheTopology.getWriteConsistentHash();
        ConsistentHash readConsistentHash = this.cacheTopology != null ? this.cacheTopology.getReadConsistentHash() : null;
        ConsistentHash writeConsistentHash2 = this.cacheTopology != null ? this.cacheTopology.getWriteConsistentHash() : null;
        this.stateTransferLock.acquireExclusiveTopologyLock();
        this.cacheTopology = cacheTopology;
        if (z) {
            if (trace) {
                log.tracef("Start keeping track of keys for rebalance", new Object[0]);
            }
            this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
            this.commitManager.startTrack(Flag.PUT_FOR_STATE_TRANSFER);
        }
        this.stateTransferLock.releaseExclusiveTopologyLock();
        this.stateTransferLock.notifyTopologyInstalled(cacheTopology.getTopologyId());
        this.remoteCommandsExecutor.checkForReadyTasks();
        try {
            if (this.isTransactional || this.isFetchEnabled) {
                if (writeConsistentHash2 == null) {
                    hashSet2 = getOwnedSegments(writeConsistentHash);
                    if (this.configuration.clustering().cacheMode().isDistributed()) {
                        for (DistributedCallable distributedCallable : getClusterListeners(cacheTopology)) {
                            distributedCallable.setEnvironment(this.cache, null);
                            try {
                                distributedCallable.call();
                            } catch (Exception e) {
                                log.clusterListenerInstallationFailure(e);
                            }
                        }
                    }
                    if (trace) {
                        log.tracef("On cache %s we have: added segments: %s", this.cacheName, hashSet2);
                    }
                } else {
                    Set<Integer> ownedSegments = getOwnedSegments(writeConsistentHash2);
                    Set<Integer> ownedSegments2 = getOwnedSegments(writeConsistentHash);
                    if (ownedSegments2.size() == writeConsistentHash.getNumSegments()) {
                        hashSet = InfinispanCollections.emptySet();
                    } else {
                        hashSet = new HashSet(ownedSegments);
                        hashSet.removeAll(ownedSegments2);
                    }
                    hashSet2 = new HashSet(ownedSegments2);
                    hashSet2.removeAll(ownedSegments);
                    if (trace) {
                        log.tracef("On cache %s we have: new segments: %s; old segments: %s", this.cacheName, ownedSegments2, ownedSegments);
                        log.tracef("On cache %s we have: added segments: %s; removed segments: %s", this.cacheName, hashSet2, hashSet);
                    }
                    cancelTransfers(hashSet);
                    restartBrokenTransfers(cacheTopology, hashSet2);
                }
                if (!hashSet2.isEmpty()) {
                    addTransfers(hashSet2);
                }
            }
            int i = this.stateTransferTopologyId.get();
            if (trace) {
                log.tracef("Topology update processed, stateTransferTopologyId = %d, isRebalance = %s, pending CH = %s", Integer.valueOf(i), Boolean.valueOf(z), cacheTopology.getPendingCH());
            }
            if (i != -1 && !z && cacheTopology.getPendingCH() == null && this.stateTransferTopologyId.compareAndSet(i, -1)) {
                stopApplyingState();
                this.cacheNotifier.notifyDataRehashed(readConsistentHash, cacheTopology.getCurrentCH(), writeConsistentHash2, cacheTopology.getTopologyId(), false);
                if (trace) {
                    log.tracef("Unlock State Transfer in Progress for topology ID %s", cacheTopology.getTopologyId());
                }
                if (this.isTotalOrder) {
                    this.totalOrderManager.notifyStateTransferEnd();
                }
            }
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            this.remoteCommandsExecutor.checkForReadyTasks();
            if (this.stateTransferTopologyId.get() != -1 && contains) {
                this.waitingForState.set(true);
            }
            notifyEndOfRebalanceIfNeeded(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId());
            if (this.transactionTable != null) {
                this.transactionTable.cleanupLeaverTransactions(this.rpcManager.getTransport().getMembers());
            }
            boolean z2 = writeConsistentHash2 != null && writeConsistentHash2.getMembers().contains(this.rpcManager.getAddress());
            if (contains || z2) {
                HashSet hashSet3 = new HashSet(writeConsistentHash.getNumSegments());
                for (int i2 = 0; i2 < writeConsistentHash.getNumSegments(); i2++) {
                    hashSet3.add(Integer.valueOf(i2));
                }
                hashSet3.removeAll(getOwnedSegments(writeConsistentHash));
                try {
                    removeStaleData(hashSet3);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new CacheException(e2);
                }
            }
        } catch (Throwable th) {
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            this.remoteCommandsExecutor.checkForReadyTasks();
            if (this.stateTransferTopologyId.get() != -1 && contains) {
                this.waitingForState.set(true);
            }
            notifyEndOfRebalanceIfNeeded(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId());
            if (this.transactionTable != null) {
                this.transactionTable.cleanupLeaverTransactions(this.rpcManager.getTransport().getMembers());
            }
            boolean z3 = writeConsistentHash2 != null && writeConsistentHash2.getMembers().contains(this.rpcManager.getAddress());
            if (contains || z3) {
                HashSet hashSet4 = new HashSet(writeConsistentHash.getNumSegments());
                for (int i3 = 0; i3 < writeConsistentHash.getNumSegments(); i3++) {
                    hashSet4.add(Integer.valueOf(i3));
                }
                hashSet4.removeAll(getOwnedSegments(writeConsistentHash));
                try {
                    removeStaleData(hashSet4);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    throw new CacheException(e3);
                }
            }
            throw th;
        }
    }

    private void awaitTotalOrderTransactions(CacheTopology cacheTopology, boolean z) {
        if (this.isTotalOrder) {
            if (trace) {
                log.trace("State Transfer in Total Order cache. Waiting for remote transactions to finish");
            }
            try {
                Iterator<TotalOrderLatch> it = this.totalOrderManager.notifyStateTransferStart(cacheTopology.getTopologyId(), z).iterator();
                while (it.hasNext()) {
                    it.next().awaitUntilUnBlock();
                }
                if (trace) {
                    log.trace("State Transfer in Total Order cache. All remote transactions are finished. Moving on...");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CacheException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyEndOfRebalanceIfNeeded(int i, int i2) {
        if (this.waitingForState.get() && !hasActiveTransfers() && this.waitingForState.compareAndSet(true, false)) {
            log.debugf("Finished receiving of segments for cache %s for topology %d.", this.cacheName, Integer.valueOf(i));
            stopApplyingState();
            this.stateTransferManager.notifyEndOfRebalance(i, i2);
            boolean z = false;
            Iterator<? extends Future<Void>> it = this.stateRequestCompletionService.drainCompletionQueue().iterator();
            while (it.hasNext()) {
                try {
                    it.next().get();
                } catch (InterruptedException e) {
                    z = true;
                } catch (ExecutionException e2) {
                    log.topologyUpdateError(i, e2.getCause());
                }
            }
            if (z) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private Set<Integer> getOwnedSegments(ConsistentHash consistentHash) {
        Address address = this.rpcManager.getAddress();
        return consistentHash.getMembers().contains(address) ? consistentHash.getSegmentsForOwner(address) : InfinispanCollections.emptySet();
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public void applyState(final Address address, int i, Collection<StateChunk> collection) {
        ConsistentHash writeConsistentHash = this.cacheTopology.getWriteConsistentHash();
        if (!writeConsistentHash.getMembers().contains(this.rpcManager.getAddress())) {
            if (trace) {
                log.tracef("Ignoring received state because we are no longer a member of cache %s", this.cacheName);
                return;
            }
            return;
        }
        int i2 = this.stateTransferTopologyId.get();
        if (i2 == -1) {
            log.debugf("Discarding state response with topology id %d for cache %s, we don't have a state transfer in progress", i, (Object) this.cacheName);
            return;
        }
        if (i < i2) {
            log.debugf("Discarding state response with old topology id %d for cache %s, state transfer request topology was %b", i, (Object) this.cacheName, (Object) this.waitingForState);
            return;
        }
        if (trace) {
            log.tracef("Before applying the received state the data container of cache %s has %d keys", this.cacheName, Integer.valueOf(this.dataContainer.size()));
        }
        final Set<Integer> segmentsForOwner = writeConsistentHash.getSegmentsForOwner(this.rpcManager.getAddress());
        final CountDownLatch countDownLatch = new CountDownLatch(collection.size());
        for (final StateChunk stateChunk : collection) {
            this.stateTransferExecutor.submit(new Runnable() { // from class: org.infinispan.statetransfer.StateConsumerImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    StateConsumerImpl.this.applyChunk(address, segmentsForOwner, stateChunk);
                    countDownLatch.countDown();
                }
            });
        }
        try {
            if (!countDownLatch.await(this.timeout, TimeUnit.MILLISECONDS)) {
                throw new TimeoutException("Timed out applying state");
            }
            if (trace) {
                log.tracef("After applying the received state the data container of cache %s has %d keys", this.cacheName, Integer.valueOf(this.dataContainer.size()));
                synchronized (this.transferMapsLock) {
                    log.tracef("Segments not received yet for cache %s: %s", this.cacheName, this.transfersBySource);
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CacheException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void applyChunk(Address address, Set<Integer> set, StateChunk stateChunk) {
        InboundTransferTask inboundTransferTask;
        if (!set.contains(Integer.valueOf(stateChunk.getSegmentId()))) {
            log.warnf("Discarding received cache entries for segment %d of cache %s because they do not belong to this node.", Integer.valueOf(stateChunk.getSegmentId()), this.cacheName);
            return;
        }
        synchronized (this.transferMapsLock) {
            inboundTransferTask = this.transfersBySegment.get(Integer.valueOf(stateChunk.getSegmentId()));
        }
        if (inboundTransferTask != null) {
            if (stateChunk.getCacheEntries() != null) {
                doApplyState(address, stateChunk.getSegmentId(), stateChunk.getCacheEntries());
            }
            inboundTransferTask.onStateReceived(stateChunk.getSegmentId(), stateChunk.isLastChunk());
        } else if (this.cache.getStatus().allowInvocations()) {
            log.ignoringUnsolicitedState(address, stateChunk.getSegmentId(), this.cacheName);
        }
    }

    private void doApplyState(Address address, int i, Collection<InternalCacheEntry> collection) {
        InvocationContext createInvocationContext;
        if (trace) {
            log.tracef("Applying new state chunk for segment %d of cache %s from node %s: received %d cache entries", Integer.valueOf(i), this.cacheName, address, Integer.valueOf(collection.size()));
        }
        EnumSet of = EnumSet.of(Flag.PUT_FOR_STATE_TRANSFER, Flag.CACHE_MODE_LOCAL, Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_OWNERSHIP_CHECK, Flag.SKIP_XSITE_BACKUP);
        boolean z = this.transactionManager != null;
        Iterator<InternalCacheEntry> it = collection.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            InternalCacheEntry next = it.next();
            if (z) {
                try {
                    try {
                        this.transactionManager.begin();
                        createInvocationContext = this.icf.createInvocationContext(this.transactionManager.getTransaction(), true);
                        ((TxInvocationContext) createInvocationContext).getCacheTransaction().setStateTransferFlag(Flag.PUT_FOR_STATE_TRANSFER);
                    } catch (Exception e) {
                        if (this.cache.getStatus().allowInvocations()) {
                            log.problemApplyingStateForKey(e.getMessage(), next.getKey(), e);
                            if (z) {
                                try {
                                    if (this.transactionManager.getTransaction() != null) {
                                        this.transactionManager.rollback();
                                    }
                                } catch (SystemException e2) {
                                }
                            }
                        } else {
                            log.debugf("Cache %s is shutting down, stopping state transfer", this.cacheName);
                            if (z) {
                                try {
                                    if (this.transactionManager.getTransaction() != null) {
                                        this.transactionManager.rollback();
                                    }
                                } catch (SystemException e3) {
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (z) {
                        try {
                            if (this.transactionManager.getTransaction() != null) {
                                this.transactionManager.rollback();
                            }
                        } catch (SystemException e4) {
                            throw th;
                        }
                    }
                    throw th;
                }
            } else {
                createInvocationContext = this.icf.createSingleKeyNonTxInvocationContext();
            }
            PutKeyValueCommand buildPutKeyValueCommand = this.commandsFactory.buildPutKeyValueCommand(next.getKey(), next.getValue(), next.getMetadata(), of);
            createInvocationContext.setLockOwner(buildPutKeyValueCommand.getKeyLockOwner());
            this.interceptorChain.invoke(createInvocationContext, buildPutKeyValueCommand);
            if (this.transactionManager != null) {
                this.transactionManager.commit();
            }
            if (z) {
                try {
                    if (this.transactionManager.getTransaction() != null) {
                        this.transactionManager.rollback();
                    }
                } catch (SystemException e5) {
                }
            }
        }
        if (trace) {
            log.tracef("Finished applying chunk of segment %d of cache %s", i, (Object) this.cacheName);
        }
    }

    private void applyTransactions(Address address, Collection<TransactionInfo> collection, int i) {
        log.debugf("Applying %d transactions for cache %s transferred from node %s", collection.size(), (Object) this.cacheName, (Object) address);
        if (this.isTransactional) {
            for (TransactionInfo transactionInfo : collection) {
                GlobalTransaction globalTransaction = transactionInfo.getGlobalTransaction();
                if (!this.rpcManager.getAddress().equals(globalTransaction.getAddress())) {
                    globalTransaction.setRemote(true);
                    CacheTransaction localTransaction = this.transactionTable.getLocalTransaction(globalTransaction);
                    if (localTransaction == null) {
                        localTransaction = this.transactionTable.getRemoteTransaction(globalTransaction);
                        if (localTransaction == null) {
                            localTransaction = this.transactionTable.getOrCreateRemoteTransaction(globalTransaction, transactionInfo.getModifications());
                            ((RemoteTransaction) localTransaction).setLookedUpEntriesTopology(i - 1);
                        }
                    }
                    Set<Object> lockedKeys = transactionInfo.getLockedKeys();
                    CacheTransaction cacheTransaction = localTransaction;
                    cacheTransaction.getClass();
                    lockedKeys.forEach(cacheTransaction::addBackupLockForKey);
                }
            }
        }
    }

    @Start(priority = 20)
    public void start() {
        CacheMode cacheMode = this.configuration.clustering().cacheMode();
        this.isFetchEnabled = (cacheMode.isDistributed() || cacheMode.isReplicated()) && (this.configuration.clustering().stateTransfer().fetchInMemoryState() || this.configuration.persistence().fetchPersistentState().booleanValue());
        this.rpcOptions = this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS).timeout(this.timeout, TimeUnit.MILLISECONDS).build();
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    @Stop(priority = 20)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        try {
            synchronized (this.transferMapsLock) {
                this.stateRequestCompletionService.cancelQueuedTasks();
                this.stateRequestCompletionService.drainCompletionQueue();
                Iterator<List<InboundTransferTask>> it = this.transfersBySource.values().iterator();
                while (it.hasNext()) {
                    List<InboundTransferTask> next = it.next();
                    it.remove();
                    Iterator<InboundTransferTask> it2 = next.iterator();
                    while (it2.hasNext()) {
                        it2.next().cancel();
                    }
                }
                this.transfersBySource.clear();
                this.transfersBySegment.clear();
            }
        } catch (Throwable th) {
            log.errorf(th, "Failed to stop StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    @Override // org.infinispan.statetransfer.StateConsumer
    public CacheTopology getCacheTopology() {
        return this.cacheTopology;
    }

    public void setKeyInvalidationListener(KeyInvalidationListener keyInvalidationListener) {
        this.keyInvalidationListener = keyInvalidationListener;
    }

    private void addTransfers(Set<Integer> set) {
        log.debugf("Adding inbound state transfer for segments %s of cache %s", set, this.cacheName);
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        if (this.isTransactional && !this.isTotalOrder) {
            requestTransactions(set, hashMap, hashSet);
        }
        if (this.isFetchEnabled) {
            requestSegments(set, hashMap, hashSet);
        }
        if (trace) {
            log.tracef("Finished adding inbound state transfer for segments %s of cache %s", set, this.cacheName);
        }
    }

    private void findSources(Set<Integer> set, Map<Address, Set<Integer>> map, Set<Address> set2) {
        for (Integer num : set) {
            Address findSource = findSource(num.intValue(), set2);
            if (findSource != null) {
                Set<Integer> set3 = map.get(findSource);
                if (set3 == null) {
                    set3 = new HashSet();
                    map.put(findSource, set3);
                }
                set3.add(num);
            }
        }
    }

    private Address findSource(int i, Set<Address> set) {
        List<Address> locateOwnersForSegment = this.cacheTopology.getReadConsistentHash().locateOwnersForSegment(i);
        if (locateOwnersForSegment.contains(this.rpcManager.getAddress())) {
            return null;
        }
        for (int i2 = 0; i2 < locateOwnersForSegment.size(); i2++) {
            Address address = locateOwnersForSegment.get(i2);
            if (!address.equals(this.rpcManager.getAddress()) && !set.contains(address)) {
                return address;
            }
        }
        log.noLiveOwnersFoundForSegment(i, this.cacheName, locateOwnersForSegment, set);
        return null;
    }

    private void requestTransactions(Set<Integer> set, Map<Address, Set<Integer>> map, Set<Address> set2) {
        findSources(set, map, set2);
        boolean z = false;
        while (true) {
            HashSet hashSet = new HashSet();
            int topologyId = this.cacheTopology.getTopologyId();
            for (Map.Entry<Address, Set<Integer>> entry : map.entrySet()) {
                Address key = entry.getKey();
                Set<Integer> value = entry.getValue();
                boolean z2 = false;
                boolean z3 = false;
                try {
                    Response transactions = getTransactions(key, value, topologyId);
                    if (transactions instanceof SuccessfulResponse) {
                        applyTransactions(key, (List) ((SuccessfulResponse) transactions).getResponseValue(), topologyId);
                    } else if (transactions instanceof CacheNotFoundResponse) {
                        log.debugf("Cache %s was stopped on node %s before sending transaction information", this.cacheName, key);
                        z2 = true;
                        z3 = true;
                    } else {
                        log.unsuccessfulResponseRetrievingTransactionsForSegments(key, transactions);
                        z2 = true;
                    }
                } catch (SuspectException e) {
                    log.debugf("Node %s left the cluster before sending transaction information", key);
                    z2 = true;
                    z3 = true;
                } catch (Exception e2) {
                    log.failedToRetrieveTransactionsForSegments(set, this.cacheName, key, e2);
                    z2 = true;
                }
                if (z2) {
                    hashSet.addAll(value);
                }
                if (z3) {
                    set2.add(key);
                }
            }
            if (hashSet.isEmpty()) {
                break;
            }
            z = true;
            map.clear();
            findSources(hashSet, map, set2);
        }
        if (z) {
            map.clear();
        }
    }

    private Collection<DistributedCallable> getClusterListeners(CacheTopology cacheTopology) {
        for (Address address : cacheTopology.getMembers()) {
            if (!address.equals(this.rpcManager.getAddress())) {
                if (trace) {
                    log.tracef("Requesting cluster listeners of cache %s from node %s", this.cacheName, address);
                }
                try {
                    Response response = this.rpcManager.invokeRemotely(Collections.singleton(address), this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.GET_CACHE_LISTENERS, this.rpcManager.getAddress(), cacheTopology.getTopologyId(), null), this.rpcOptions).get(address);
                    if (response instanceof SuccessfulResponse) {
                        return (Collection) ((SuccessfulResponse) response).getResponseValue();
                    }
                    log.unsuccessfulResponseForClusterListeners(address, response);
                } catch (CacheException e) {
                    log.exceptionDuringClusterListenerRetrieval(address, e);
                }
            }
        }
        if (trace) {
            log.trace("Unable to acquire cluster listeners from other members, assuming none are present");
        }
        return Collections.emptySet();
    }

    private Response getTransactions(Address address, Set<Integer> set, int i) {
        if (trace) {
            log.tracef("Requesting transactions for segments %s of cache %s from node %s", set, this.cacheName, address);
        }
        return this.rpcManager.invokeRemotely(Collections.singleton(address), this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.GET_TRANSACTIONS, this.rpcManager.getAddress(), i, set), this.rpcOptions).get(address);
    }

    private void requestSegments(Set<Integer> set, Map<Address, Set<Integer>> map, Set<Address> set2) {
        if (map.isEmpty()) {
            findSources(set, map, set2);
        }
        for (Map.Entry<Address, Set<Integer>> entry : map.entrySet()) {
            addTransfer(entry.getKey(), entry.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retryTransferTask(InboundTransferTask inboundTransferTask) {
        if (trace) {
            log.tracef("Retrying failed task: %s", inboundTransferTask);
        }
        inboundTransferTask.cancel();
        synchronized (this.transferMapsLock) {
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            if (removeTransfer(inboundTransferTask)) {
                hashSet2.add(inboundTransferTask.getSource());
                hashSet.addAll(inboundTransferTask.getSegments());
            }
            hashSet.retainAll(getOwnedSegments(this.cacheTopology.getWriteConsistentHash()));
            HashMap hashMap = new HashMap();
            findSources(hashSet, hashMap, hashSet2);
            for (Map.Entry<Address, Set<Integer>> entry : hashMap.entrySet()) {
                addTransfer(entry.getKey(), entry.getValue());
            }
        }
    }

    private void cancelTransfers(Set<Integer> set) {
        synchronized (this.transferMapsLock) {
            ArrayList arrayList = new ArrayList(set);
            while (!arrayList.isEmpty()) {
                InboundTransferTask inboundTransferTask = this.transfersBySegment.get(Integer.valueOf(((Integer) arrayList.remove(0)).intValue()));
                if (inboundTransferTask != null) {
                    HashSet hashSet = new HashSet(set);
                    hashSet.retainAll(inboundTransferTask.getSegments());
                    arrayList.removeAll(hashSet);
                    this.transfersBySegment.keySet().removeAll(hashSet);
                    inboundTransferTask.cancelSegments(hashSet);
                    if (inboundTransferTask.isCancelled()) {
                        removeTransfer(inboundTransferTask);
                    }
                }
            }
        }
    }

    private void removeStaleData(final Set<Integer> set) throws InterruptedException {
        log.debugf("Removing no longer owned entries for cache %s", this.cacheName);
        if (this.keyInvalidationListener != null) {
            this.keyInvalidationListener.beforeInvalidation(set, InfinispanCollections.emptySet());
        }
        if (set.isEmpty()) {
            return;
        }
        final ConcurrentHashSet concurrentHashSet = new ConcurrentHashSet();
        this.dataContainer.executeTask(KeyFilter.ACCEPT_ALL_FILTER, (obj, internalCacheEntry) -> {
            Object key = internalCacheEntry.getKey();
            if (set.contains(Integer.valueOf(getSegment(key)))) {
                concurrentHashSet.add(key);
            }
        });
        if (!set.isEmpty()) {
            try {
                this.persistenceManager.processOnAllStores(new KeyFilter() { // from class: org.infinispan.statetransfer.StateConsumerImpl.2
                    @Override // org.infinispan.filter.KeyFilter
                    public boolean accept(Object obj2) {
                        if (StateConsumerImpl.this.dataContainer.containsKey(obj2)) {
                            return false;
                        }
                        return set.contains(Integer.valueOf(StateConsumerImpl.this.getSegment(obj2)));
                    }
                }, new AdvancedCacheLoader.CacheLoaderTask() { // from class: org.infinispan.statetransfer.StateConsumerImpl.3
                    @Override // org.infinispan.persistence.spi.AdvancedCacheLoader.CacheLoaderTask
                    public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
                        concurrentHashSet.add(marshalledEntry.getKey());
                    }
                }, false, false, PersistenceManager.AccessMode.PRIVATE);
            } catch (CacheException e) {
                log.failedLoadingKeysFromCacheStore(e);
            }
        }
        if (concurrentHashSet.isEmpty()) {
            return;
        }
        try {
            InvalidateCommand buildInvalidateCommand = this.commandsFactory.buildInvalidateCommand(EnumSet.of(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING), concurrentHashSet.toArray());
            NonTxInvocationContext createNonTxInvocationContext = this.icf.createNonTxInvocationContext();
            createNonTxInvocationContext.setLockOwner(buildInvalidateCommand.getKeyLockOwner());
            this.interceptorChain.invoke(createNonTxInvocationContext, buildInvalidateCommand);
            if (trace) {
                log.tracef("Removed %d keys, data container now has %d keys", concurrentHashSet.size(), this.dataContainer.size());
            }
        } catch (CacheException e2) {
            log.failedToInvalidateKeys(e2);
        }
    }

    private void restartBrokenTransfers(CacheTopology cacheTopology, Set<Integer> set) {
        HashSet hashSet = new HashSet(cacheTopology.getReadConsistentHash().getMembers());
        synchronized (this.transferMapsLock) {
            Iterator<Map.Entry<Address, List<InboundTransferTask>>> it = this.transfersBySource.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Address, List<InboundTransferTask>> next = it.next();
                Address key = next.getKey();
                if (!hashSet.contains(key)) {
                    if (trace) {
                        log.tracef("Removing inbound transfers from source %s for cache %s", key, this.cacheName);
                    }
                    List<InboundTransferTask> value = next.getValue();
                    it.remove();
                    for (InboundTransferTask inboundTransferTask : value) {
                        if (trace) {
                            log.tracef("Removing inbound transfers for segments %s from source %s for cache %s", inboundTransferTask.getSegments(), key, this.cacheName);
                        }
                        inboundTransferTask.cancel();
                        this.transfersBySegment.keySet().removeAll(inboundTransferTask.getSegments());
                        set.addAll(inboundTransferTask.getUnfinishedSegments());
                    }
                }
            }
            set.removeAll(this.transfersBySegment.keySet());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getSegment(Object obj) {
        return this.cacheTopology.getReadConsistentHash().getSegment(obj);
    }

    private InboundTransferTask addTransfer(Address address, Set<Integer> set) {
        synchronized (this.transferMapsLock) {
            if (trace) {
                log.tracef("Adding transfer from %s for segments %s", address, set);
            }
            set.removeAll(this.transfersBySegment.keySet());
            if (set.isEmpty()) {
                if (trace) {
                    log.tracef("All segments are already in progress, skipping", new Object[0]);
                }
                return null;
            }
            final InboundTransferTask inboundTransferTask = new InboundTransferTask(set, address, this.cacheTopology.getTopologyId(), this, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName);
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                this.transfersBySegment.put(Integer.valueOf(it.next().intValue()), inboundTransferTask);
            }
            List<InboundTransferTask> list = this.transfersBySource.get(inboundTransferTask.getSource());
            if (list == null) {
                list = new ArrayList();
                this.transfersBySource.put(inboundTransferTask.getSource(), list);
            }
            list.add(inboundTransferTask);
            this.stateRequestCompletionService.submit(new Callable<Void>() { // from class: org.infinispan.statetransfer.StateConsumerImpl.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    inboundTransferTask.requestSegments();
                    if (StateConsumerImpl.trace) {
                        StateConsumerImpl.log.tracef("Waiting for inbound transfer to finish: %s", inboundTransferTask);
                    }
                    StateConsumerImpl.this.stateRequestCompletionService.continueTaskInBackground();
                    return null;
                }
            });
            return inboundTransferTask;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean removeTransfer(InboundTransferTask inboundTransferTask) {
        synchronized (this.transferMapsLock) {
            if (trace) {
                log.tracef("Removing inbound transfers for segments %s from source %s for cache %s", inboundTransferTask.getSegments(), inboundTransferTask.getSource(), this.cacheName);
            }
            List<InboundTransferTask> list = this.transfersBySource.get(inboundTransferTask.getSource());
            if (list == null || !list.remove(inboundTransferTask)) {
                return false;
            }
            if (list.isEmpty()) {
                this.transfersBySource.remove(inboundTransferTask.getSource());
            }
            this.transfersBySegment.keySet().removeAll(inboundTransferTask.getSegments());
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onTaskCompletion(final InboundTransferTask inboundTransferTask) {
        this.stateRequestCompletionService.backgroundTaskFinished(new Callable<Void>() { // from class: org.infinispan.statetransfer.StateConsumerImpl.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                StateConsumerImpl.this.removeTransfer(inboundTransferTask);
                if (!inboundTransferTask.isCompletedSuccessfully() && !inboundTransferTask.isCancelled()) {
                    StateConsumerImpl.this.retryTransferTask(inboundTransferTask);
                    return null;
                }
                if (StateConsumerImpl.trace) {
                    StateConsumerImpl.log.tracef("Inbound transfer finished: %s", inboundTransferTask);
                }
                StateConsumerImpl.this.notifyEndOfRebalanceIfNeeded(StateConsumerImpl.this.cacheTopology.getTopologyId(), StateConsumerImpl.this.cacheTopology.getRebalanceId());
                return null;
            }
        });
    }
}
