package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-14.0.5.Final.jar:org/infinispan/statetransfer/OutboundTransferTask.class */
public class OutboundTransferTask {
    private static final Log log = LogFactory.getLog(OutboundTransferTask.class);
    private final Consumer<Collection<StateChunk>> onChunkReplicated;
    private final int topologyId;
    private final Address destination;
    private final IntSet segments;
    private final int chunkSize;
    private final RpcManager rpcManager;
    private final CommandsFactory commandsFactory;
    private final long timeout;
    private final String cacheName;
    private final boolean applyState;
    private final boolean pushTransfer;
    private final RpcOptions rpcOptions;
    private volatile boolean cancelled;

    public OutboundTransferTask(Address address, IntSet intSet, int i, int i2, int i3, Consumer<Collection<StateChunk>> consumer, RpcManager rpcManager, CommandsFactory commandsFactory, long j, String str, boolean z, boolean z2) {
        if (intSet == null || intSet.isEmpty()) {
            throw new IllegalArgumentException("Segments must not be null or empty");
        }
        if (address == null) {
            throw new IllegalArgumentException("Destination address cannot be null");
        }
        if (i2 <= 0) {
            throw new IllegalArgumentException("chunkSize must be greater than 0");
        }
        this.onChunkReplicated = consumer;
        this.destination = address;
        this.segments = IntSets.concurrentCopyFrom(intSet, i);
        this.chunkSize = i2;
        this.topologyId = i3;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.timeout = j;
        this.cacheName = str;
        this.applyState = z;
        this.pushTransfer = z2;
        this.rpcOptions = new RpcOptions(DeliverOrder.NONE, j, TimeUnit.MILLISECONDS);
    }

    public Address getDestination() {
        return this.destination;
    }

    public IntSet getSegments() {
        return this.segments;
    }

    public int getTopologyId() {
        return this.topologyId;
    }

    public CompletionStage<Void> execute(Flowable<SegmentPublisherSupplier.Notification<InternalCacheEntry<?, ?>>> flowable) {
        return flowable.buffer(this.chunkSize).takeUntil(list -> {
            return this.cancelled;
        }).concatMapCompletable(list2 -> {
            HashMap hashMap = new HashMap();
            Iterator it2 = list2.iterator();
            while (it2.hasNext()) {
                SegmentPublisherSupplier.Notification notification = (SegmentPublisherSupplier.Notification) it2.next();
                if (notification.isValue()) {
                    hashMap.computeIfAbsent(Integer.valueOf(notification.valueSegment()), num -> {
                        return new StateChunk(num.intValue(), new ArrayList(), false);
                    }).getCacheEntries().add((InternalCacheEntry) notification.value());
                }
                if (notification.isSegmentComplete()) {
                    int completedSegment = notification.completedSegment();
                    hashMap.compute(Integer.valueOf(completedSegment), (num2, stateChunk) -> {
                        return stateChunk == null ? new StateChunk(num2.intValue(), Collections.emptyList(), true) : new StateChunk(completedSegment, stateChunk.getCacheEntries(), true);
                    });
                }
            }
            return Completable.fromCompletionStage(sendChunks(hashMap));
        }, 1).toCompletionStage(null);
    }

    private CompletionStage<Void> sendChunks(Map<Integer, StateChunk> map) {
        if (map.isEmpty()) {
            return CompletableFutures.completedNull();
        }
        if (log.isTraceEnabled()) {
            log.tracef("Sending to node %s %d cache entries from segments %s", this.destination, Long.valueOf(map.values().stream().mapToInt(stateChunk -> {
                return stateChunk.getCacheEntries().size();
            }).sum()), map.keySet());
        }
        try {
            return this.rpcManager.invokeCommand(this.destination, this.commandsFactory.buildStateResponseCommand(this.topologyId, map.values(), this.applyState, this.pushTransfer), SingleResponseCollector.validOnly(), this.rpcOptions).handle((validResponse, th) -> {
                if (th == null) {
                    this.onChunkReplicated.accept(map.values());
                    return null;
                }
                logSendException(th);
                cancel();
                return null;
            });
        } catch (IllegalLifecycleStateException e) {
            cancel();
            return CompletableFutures.completedNull();
        } catch (Exception e2) {
            logSendException(e2);
            cancel();
            return CompletableFutures.completedNull();
        }
    }

    private void logSendException(Throwable th) {
        Throwable extractException = CompletableFutures.extractException(th);
        if (extractException instanceof SuspectException) {
            log.debugf("Node %s left cache %s while we were sending state to it, cancelling transfer.", this.destination, this.cacheName);
        } else if (isCancelled()) {
            log.debugf("Stopping cancelled transfer to node %s, segments %s", this.destination, this.segments);
        } else {
            log.errorf(extractException, "Failed to send entries to node %s: %s", this.destination, extractException.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelSegments(IntSet intSet) {
        if (this.segments.removeAll(intSet)) {
            if (log.isTraceEnabled()) {
                log.tracef("Cancelling outbound transfer to node %s, segments %s (remaining segments %s)", this.destination, intSet, this.segments);
            }
            if (this.segments.isEmpty()) {
                cancel();
            }
        }
    }

    public void cancel() {
        if (this.cancelled) {
            return;
        }
        log.debugf("Cancelling outbound transfer to node %s, segments %s", this.destination, this.segments);
        this.cancelled = true;
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public String toString() {
        int i = this.topologyId;
        Address address = this.destination;
        IntSet intSet = this.segments;
        int i2 = this.chunkSize;
        long j = this.timeout;
        String str = this.cacheName;
        return "OutboundTransferTask{topologyId=" + i + ", destination=" + address + ", segments=" + intSet + ", chunkSize=" + i2 + ", timeout=" + j + ", cacheName='" + i + "'}";
    }
}
