package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.container.DataContainer;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-8.1.0-SNAPSHOT.jar:org/infinispan/statetransfer/OutboundTransferTask.class */
public class OutboundTransferTask implements Runnable {
    private static final Log log = LogFactory.getLog(OutboundTransferTask.class);
    private final StateProviderImpl stateProvider;
    private final int topologyId;
    private final Address destination;
    private final int stateTransferChunkSize;
    private final ConsistentHash readCh;
    private final DataContainer<Object, Object> dataContainer;
    private final PersistenceManager persistenceManager;
    private final RpcManager rpcManager;
    private final CommandsFactory commandsFactory;
    private final long timeout;
    private final String cacheName;
    private int accumulatedEntries;
    private FutureTask<Void> runnableFuture;
    private final RpcOptions rpcOptions;
    private InternalEntryFactory entryFactory;
    private final boolean trace = log.isTraceEnabled();
    private final Set<Integer> segments = new CopyOnWriteArraySet();
    private final Map<Integer, List<InternalCacheEntry>> entriesBySegment = CollectionFactory.makeConcurrentMap();

    public OutboundTransferTask(Address address, Set<Integer> set, int i, int i2, ConsistentHash consistentHash, StateProviderImpl stateProviderImpl, DataContainer dataContainer, PersistenceManager persistenceManager, RpcManager rpcManager, CommandsFactory commandsFactory, InternalEntryFactory internalEntryFactory, long j, String str) {
        if (set == null || set.isEmpty()) {
            throw new IllegalArgumentException("Segments must not be null or empty");
        }
        if (address == null) {
            throw new IllegalArgumentException("Destination address cannot be null");
        }
        if (i <= 0) {
            throw new IllegalArgumentException("stateTransferChunkSize must be greater than 0");
        }
        this.stateProvider = stateProviderImpl;
        this.destination = address;
        this.segments.addAll(set);
        this.stateTransferChunkSize = i;
        this.topologyId = i2;
        this.readCh = consistentHash;
        this.dataContainer = dataContainer;
        this.persistenceManager = persistenceManager;
        this.entryFactory = internalEntryFactory;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.timeout = j;
        this.cacheName = str;
        this.rpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS).timeout(j, TimeUnit.MILLISECONDS).build();
    }

    public void execute(ExecutorService executorService) {
        if (this.runnableFuture != null) {
            throw new IllegalStateException("This task was already submitted");
        }
        this.runnableFuture = new FutureTask<Void>(this, null) { // from class: org.infinispan.statetransfer.OutboundTransferTask.1
            @Override // java.util.concurrent.FutureTask
            protected void done() {
                OutboundTransferTask.this.stateProvider.onTaskCompletion(OutboundTransferTask.this);
            }
        };
        executorService.submit(this.runnableFuture);
    }

    public Address getDestination() {
        return this.destination;
    }

    public Set<Integer> getSegments() {
        return this.segments;
    }

    public int getTopologyId() {
        return this.topologyId;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            for (InternalCacheEntry<Object, Object> internalCacheEntry : this.dataContainer) {
                int segment = this.readCh.getSegment(internalCacheEntry.getKey());
                if (this.segments.contains(Integer.valueOf(segment))) {
                    sendEntry(internalCacheEntry, segment);
                }
            }
            AdvancedCacheLoader stateTransferProvider = this.persistenceManager.getStateTransferProvider();
            if (stateTransferProvider != null) {
                try {
                    stateTransferProvider.process(new CollectionKeyFilter(new ReadOnlyDataContainerBackedKeySet(this.dataContainer)), new AdvancedCacheLoader.CacheLoaderTask() { // from class: org.infinispan.statetransfer.OutboundTransferTask.2
                        @Override // org.infinispan.persistence.spi.AdvancedCacheLoader.CacheLoaderTask
                        public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
                            int segment2 = OutboundTransferTask.this.readCh.getSegment(marshalledEntry.getKey());
                            if (OutboundTransferTask.this.segments.contains(Integer.valueOf(segment2))) {
                                try {
                                    OutboundTransferTask.this.sendEntry(OutboundTransferTask.this.entryFactory.create((InternalEntryFactory) marshalledEntry.getKey(), marshalledEntry.getValue(), (Metadata) marshalledEntry.getMetadata()), segment2);
                                } catch (CacheException e) {
                                    OutboundTransferTask.log.failedLoadingValueFromCacheStore(marshalledEntry.getKey(), e);
                                }
                            }
                        }
                    }, new WithinThreadExecutor(), true, true);
                } catch (CacheException e) {
                    log.failedLoadingKeysFromCacheStore(e);
                }
            }
            sendEntries(true);
        } catch (Throwable th) {
            if (isCancelled()) {
                log.debugf("Transfer of segments %s of cache %s to node %s cancelled", this.segments, this.cacheName, this.destination);
            } else {
                log.failedOutBoundTransferExecution(th);
            }
        }
        if (this.trace) {
            log.tracef("Outbound transfer of segments %s of cache %s to node %s is complete", this.segments, this.cacheName, this.destination);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEntry(InternalCacheEntry internalCacheEntry, int i) {
        if (this.accumulatedEntries >= this.stateTransferChunkSize) {
            sendEntries(false);
            this.accumulatedEntries = 0;
        }
        List<InternalCacheEntry> list = this.entriesBySegment.get(Integer.valueOf(i));
        if (list == null) {
            list = new ArrayList();
            this.entriesBySegment.put(Integer.valueOf(i), list);
        }
        list.add(internalCacheEntry);
        this.accumulatedEntries++;
    }

    private void sendEntries(boolean z) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, List<InternalCacheEntry>> entry : this.entriesBySegment.entrySet()) {
            List<InternalCacheEntry> value = entry.getValue();
            if (!value.isEmpty() || z) {
                arrayList.add(new StateChunk(entry.getKey().intValue(), new ArrayList(value), z));
                value.clear();
            }
        }
        if (z) {
            Iterator<Integer> it = this.segments.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (this.entriesBySegment.get(Integer.valueOf(intValue)) == null) {
                    arrayList.add(new StateChunk(intValue, InfinispanCollections.emptyList(), true));
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        if (this.trace) {
            if (z) {
                log.tracef("Sending last chunk containing %d cache entries from segments %s of cache %s to node %s", Integer.valueOf(this.accumulatedEntries), this.segments, this.cacheName, this.destination);
            } else {
                log.tracef("Sending %d cache entries from segments %s of cache %s to node %s", Integer.valueOf(this.accumulatedEntries), this.entriesBySegment.keySet(), this.cacheName, this.destination);
            }
        }
        try {
            this.rpcManager.invokeRemotely(Collections.singleton(this.destination), this.commandsFactory.buildStateResponseCommand(this.rpcManager.getAddress(), this.topologyId, arrayList), this.rpcOptions);
        } catch (SuspectException e) {
            log.debugf("Node %s left cache %s while we were sending state to it, cancelling transfer.", this.destination, this.cacheName);
            cancel();
        } catch (Exception e2) {
            if (isCancelled()) {
                log.debugf("Stopping cancelled transfer of segments %s of cache %s to node %s", this.segments, this.cacheName, this.destination);
            } else {
                log.errorf(e2, "Failed to send entries to node %s : %s", this.destination, e2.getMessage());
            }
        }
    }

    public void cancelSegments(Set<Integer> set) {
        if (this.segments.removeAll(set)) {
            if (this.trace) {
                log.tracef("Cancelling outbound transfer of segments %s of cache %s to node %s (remaining segments %s)", set, this.cacheName, this.destination, this.segments);
            }
            this.entriesBySegment.keySet().removeAll(set);
            if (this.segments.isEmpty()) {
                cancel();
            }
        }
    }

    public void cancel() {
        if (this.runnableFuture == null || this.runnableFuture.isCancelled()) {
            return;
        }
        log.debugf("Cancelling outbound transfer of segments %s of cache %s to node %s", this.segments, this.cacheName, this.destination);
        this.runnableFuture.cancel(true);
    }

    public boolean isCancelled() {
        return this.runnableFuture != null && this.runnableFuture.isCancelled();
    }

    public String toString() {
        return "OutboundTransferTask{topologyId=" + this.topologyId + ", destination=" + this.destination + ", segments=" + this.segments + ", stateTransferChunkSize=" + this.stateTransferChunkSize + ", timeout=" + this.timeout + ", cacheName='" + this.cacheName + "'}";
    }
}
