package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.cluster.ClusterCacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener
/* loaded from: input_file:WEB-INF/lib/infinispan-core-8.2.3.Final.jar:org/infinispan/statetransfer/StateProviderImpl.class */
public class StateProviderImpl implements StateProvider {
    private static final Log log = LogFactory.getLog(StateProviderImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private String cacheName;
    private Configuration configuration;
    private RpcManager rpcManager;
    private CommandsFactory commandsFactory;
    private ClusterCacheNotifier clusterCacheNotifier;
    private TransactionTable transactionTable;
    private DataContainer dataContainer;
    private PersistenceManager persistenceManager;
    private ExecutorService executorService;
    private StateTransferLock stateTransferLock;
    private InternalEntryFactory entryFactory;
    private long timeout;
    private int chunkSize;
    private StateConsumer stateConsumer;
    private final Map<Address, List<OutboundTransferTask>> transfersByDestination = new HashMap();

    @Inject
    public void init(Cache cache, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService, Configuration configuration, RpcManager rpcManager, CommandsFactory commandsFactory, ClusterCacheNotifier clusterCacheNotifier, PersistenceManager persistenceManager, DataContainer dataContainer, TransactionTable transactionTable, StateTransferLock stateTransferLock, StateConsumer stateConsumer, InternalEntryFactory internalEntryFactory) {
        this.cacheName = cache.getName();
        this.executorService = executorService;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.clusterCacheNotifier = clusterCacheNotifier;
        this.persistenceManager = persistenceManager;
        this.dataContainer = dataContainer;
        this.transactionTable = transactionTable;
        this.stateTransferLock = stateTransferLock;
        this.stateConsumer = stateConsumer;
        this.entryFactory = internalEntryFactory;
        this.timeout = configuration.clustering().stateTransfer().timeout();
        this.chunkSize = configuration.clustering().stateTransfer().chunkSize();
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public boolean isStateTransferInProgress() {
        boolean z;
        synchronized (this.transfersByDestination) {
            z = !this.transfersByDestination.isEmpty();
        }
        return z;
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public void onTopologyUpdate(CacheTopology cacheTopology, boolean z) {
        boolean z2 = cacheTopology.getPendingCH() != null;
        HashSet hashSet = new HashSet(cacheTopology.getWriteConsistentHash().getMembers());
        synchronized (this.transfersByDestination) {
            Iterator<Address> it = this.transfersByDestination.keySet().iterator();
            while (it.hasNext()) {
                Address next = it.next();
                if (!hashSet.contains(next)) {
                    List<OutboundTransferTask> list = this.transfersByDestination.get(next);
                    it.remove();
                    Iterator<OutboundTransferTask> it2 = list.iterator();
                    while (it2.hasNext()) {
                        it2.next().cancel();
                    }
                }
            }
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    @Start(priority = 60)
    public void start() {
    }

    @Override // org.infinispan.statetransfer.StateProvider
    @Stop(priority = 0)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateProvider of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        try {
            synchronized (this.transfersByDestination) {
                Iterator<List<OutboundTransferTask>> it = this.transfersByDestination.values().iterator();
                while (it.hasNext()) {
                    List<OutboundTransferTask> next = it.next();
                    it.remove();
                    Iterator<OutboundTransferTask> it2 = next.iterator();
                    while (it2.hasNext()) {
                        it2.next().cancel();
                    }
                }
            }
        } catch (Throwable th) {
            log.errorf(th, "Failed to stop StateProvider of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public List<TransactionInfo> getTransactionsForSegments(Address address, int i, Set<Integer> set) throws InterruptedException {
        if (trace) {
            log.tracef("Received request for transactions from node %s for segments %s of cache %s with topology id %d", address, set, this.cacheName, Integer.valueOf(i));
        }
        CacheTopology cacheTopology = getCacheTopology(i, address, true);
        Set<Integer> segmentsForOwner = cacheTopology.getReadConsistentHash().getSegmentsForOwner(this.rpcManager.getAddress());
        if (!segmentsForOwner.containsAll(set)) {
            set.removeAll(segmentsForOwner);
            throw new IllegalArgumentException("Segments " + set + " are not owned by " + this.rpcManager.getAddress());
        }
        ArrayList arrayList = new ArrayList();
        if (this.configuration.transaction().transactionMode().isTransactional()) {
            collectTransactionsToTransfer(address, arrayList, this.transactionTable.getRemoteTransactions(), set, cacheTopology);
            collectTransactionsToTransfer(address, arrayList, this.transactionTable.getLocalTransactions(), set, cacheTopology);
            if (trace) {
                log.tracef("Found %d transaction(s) to transfer", arrayList.size());
            }
        }
        return arrayList;
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public Collection<DistributedCallable> getClusterListenersToInstall() {
        return this.clusterCacheNotifier.retrieveClusterListenerCallablesToInstall();
    }

    private CacheTopology getCacheTopology(int i, Address address, boolean z) throws InterruptedException {
        CacheTopology cacheTopology = this.stateConsumer.getCacheTopology();
        int topologyId = cacheTopology != null ? cacheTopology.getTopologyId() : -1;
        if (i < topologyId) {
            if (z) {
                log.debugf("Transactions were requested by node %s with topology %d, older than the local topology (%d)", address, Integer.valueOf(i), Integer.valueOf(topologyId));
            } else {
                log.debugf("Segments were requested by node %s with topology %d, older than the local topology (%d)", address, Integer.valueOf(i), Integer.valueOf(topologyId));
            }
        } else if (i > topologyId) {
            if (trace) {
                Log log2 = log;
                Object[] objArr = new Object[5];
                objArr[0] = z ? "Transactions" : "Segments";
                objArr[1] = address;
                objArr[2] = Integer.valueOf(i);
                objArr[3] = Integer.valueOf(topologyId);
                objArr[4] = Integer.valueOf(i);
                log2.tracef("%s were requested by node %s with topology %d, greater than the local topology (%d). Waiting for topology %d to be installed locally.", objArr);
            }
            this.stateTransferLock.waitForTopology(i, this.timeout, TimeUnit.MILLISECONDS);
            cacheTopology = this.stateConsumer.getCacheTopology();
        }
        return cacheTopology;
    }

    private void collectTransactionsToTransfer(Address address, List<TransactionInfo> list, Collection<? extends CacheTransaction> collection, Set<Integer> set, CacheTopology cacheTopology) {
        int topologyId = cacheTopology.getTopologyId();
        List<Address> members = cacheTopology.getMembers();
        ConsistentHash readConsistentHash = cacheTopology.getReadConsistentHash();
        for (CacheTransaction cacheTransaction : collection) {
            if (cacheTransaction.getTopologyId() != topologyId && members.contains(cacheTransaction.getGlobalTransaction().getAddress())) {
                HashSet hashSet = new HashSet();
                Set<Object> lockedKeys = cacheTransaction.getLockedKeys();
                synchronized (lockedKeys) {
                    for (Object obj : lockedKeys) {
                        if (set.contains(Integer.valueOf(readConsistentHash.getSegment(obj)))) {
                            hashSet.add(obj);
                        }
                    }
                }
                Set<Object> backupLockedKeys = cacheTransaction.getBackupLockedKeys();
                synchronized (backupLockedKeys) {
                    for (Object obj2 : backupLockedKeys) {
                        if (set.contains(Integer.valueOf(readConsistentHash.getSegment(obj2)))) {
                            hashSet.add(obj2);
                        }
                    }
                }
                if (!hashSet.isEmpty()) {
                    if (trace) {
                        log.tracef("Sending transaction %s to new owner %s", cacheTransaction, address);
                    }
                    List<WriteCommand> modifications = cacheTransaction.getModifications();
                    WriteCommand[] writeCommandArr = modifications.isEmpty() ? null : (WriteCommand[]) modifications.toArray(new WriteCommand[modifications.size()]);
                    if (cacheTransaction instanceof LocalTransaction) {
                        ((LocalTransaction) cacheTransaction).locksAcquired(Collections.singleton(address));
                        if (trace) {
                            log.tracef("Adding affected node %s to transferred transaction %s (keys %s)", address, cacheTransaction.getGlobalTransaction(), hashSet);
                        }
                    }
                    list.add(new TransactionInfo(cacheTransaction.getGlobalTransaction(), cacheTransaction.getTopologyId(), writeCommandArr, hashSet));
                } else if (trace) {
                    log.tracef("Skipping transaction %s because the state requestor %s doesn't own any key", cacheTransaction, address);
                }
            } else if (trace) {
                log.tracef("Skipping transaction %s as it was started in the current topology or by a leaver", cacheTransaction);
            }
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public void startOutboundTransfer(Address address, int i, Set<Integer> set) throws InterruptedException {
        if (trace) {
            log.tracef("Starting outbound transfer of segments %s to node %s with topology id %d for cache %s", set, address, Integer.valueOf(i), this.cacheName);
        }
        OutboundTransferTask outboundTransferTask = new OutboundTransferTask(address, set, this.chunkSize, i, getCacheTopology(i, address, false).getReadConsistentHash(), this, this.dataContainer, this.persistenceManager, this.rpcManager, this.commandsFactory, this.entryFactory, this.timeout, this.cacheName);
        addTransfer(outboundTransferTask);
        outboundTransferTask.execute(this.executorService);
    }

    private void addTransfer(OutboundTransferTask outboundTransferTask) {
        if (trace) {
            log.tracef("Adding outbound transfer of segments %s to %s", outboundTransferTask.getSegments(), outboundTransferTask.getDestination());
        }
        synchronized (this.transfersByDestination) {
            List<OutboundTransferTask> list = this.transfersByDestination.get(outboundTransferTask.getDestination());
            if (list == null) {
                list = new ArrayList();
                this.transfersByDestination.put(outboundTransferTask.getDestination(), list);
            }
            list.add(outboundTransferTask);
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public void cancelOutboundTransfer(Address address, int i, Set<Integer> set) {
        if (trace) {
            log.tracef("Cancelling outbound transfer of segments %s to node %s with topology id %d for cache %s", set, address, Integer.valueOf(i), this.cacheName);
        }
        synchronized (this.transfersByDestination) {
            List<OutboundTransferTask> list = this.transfersByDestination.get(address);
            if (list != null) {
                for (OutboundTransferTask outboundTransferTask : (OutboundTransferTask[]) list.toArray(new OutboundTransferTask[list.size()])) {
                    if (outboundTransferTask.getTopologyId() == i) {
                        outboundTransferTask.cancelSegments(set);
                    }
                }
            }
        }
    }

    private void removeTransfer(OutboundTransferTask outboundTransferTask) {
        synchronized (this.transfersByDestination) {
            List<OutboundTransferTask> list = this.transfersByDestination.get(outboundTransferTask.getDestination());
            if (list != null) {
                list.remove(outboundTransferTask);
                if (list.isEmpty()) {
                    this.transfersByDestination.remove(outboundTransferTask.getDestination());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onTaskCompletion(OutboundTransferTask outboundTransferTask) {
        if (trace) {
            Log log2 = log;
            Object[] objArr = new Object[4];
            objArr[0] = outboundTransferTask.isCancelled() ? "cancelled" : "completed";
            objArr[1] = outboundTransferTask.getSegments();
            objArr[2] = outboundTransferTask.getDestination();
            objArr[3] = this.cacheName;
            log2.tracef("Removing %s outbound transfer of segments %s to %s for cache %s", objArr);
        }
        removeTransfer(outboundTransferTask);
    }
}
