package org.infinispan.client.hotrod.impl.iteration;

import io.netty.channel.Channel;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.infinispan.client.hotrod.exceptions.RemoteIllegalLifecycleStateException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.operations.IterationNextResponse;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.reactive.AbstractAsyncPublisherHandler;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.logging.TraceException;

/* loaded from: input_file:WEB-INF/lib/infinispan-client-hotrod-14.0.27.Final.jar:org/infinispan/client/hotrod/impl/iteration/RemoteInnerPublisherHandler.class */
class RemoteInnerPublisherHandler<K, E> extends AbstractAsyncPublisherHandler<Map.Entry<SocketAddress, IntSet>, Map.Entry<K, E>, IterationStartResponse, IterationNextResponse<K, E>> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    protected final RemotePublisher<K, E> publisher;
    protected volatile Channel channel;
    private volatile byte[] iterationId;
    private AtomicBoolean cancelled;

    /* JADX INFO: Access modifiers changed from: protected */
    public RemoteInnerPublisherHandler(RemotePublisher<K, E> remotePublisher, int i, Supplier<Map.Entry<SocketAddress, IntSet>> supplier, Map.Entry<SocketAddress, IntSet> entry) {
        super(i, supplier, entry);
        this.cancelled = new AtomicBoolean();
        this.publisher = remotePublisher;
    }

    private String iterationId() {
        return this.publisher.iterationId(this.iterationId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.commons.reactive.AbstractAsyncPublisherHandler
    public void sendCancel(Map.Entry<SocketAddress, IntSet> entry) {
        if (this.cancelled.getAndSet(true)) {
            return;
        }
        actualCancel();
    }

    private void actualCancel() {
        if (this.iterationId == null || this.channel == null) {
            return;
        }
        this.publisher.sendCancel(this.iterationId, this.channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.commons.reactive.AbstractAsyncPublisherHandler
    public CompletionStage<IterationStartResponse> sendInitialCommand(Map.Entry<SocketAddress, IntSet> entry, int i) {
        SocketAddress key = entry.getKey();
        IntSet value = entry.getValue();
        log.tracef("Starting iteration with segments %s", value);
        return this.publisher.newIteratorStartOperation(key, value, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.commons.reactive.AbstractAsyncPublisherHandler
    public CompletionStage<IterationNextResponse<K, E>> sendNextCommand(Map.Entry<SocketAddress, IntSet> entry, int i) {
        return this.publisher.newIteratorNextOperation(this.iterationId, this.channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.commons.reactive.AbstractAsyncPublisherHandler
    public long handleInitialResponse(IterationStartResponse iterationStartResponse, Map.Entry<SocketAddress, IntSet> entry) {
        this.channel = iterationStartResponse.getChannel();
        this.iterationId = iterationStartResponse.getIterationId();
        if (log.isDebugEnabled()) {
            log.iterationTransportObtained(this.channel.remoteAddress(), iterationId());
            log.startedIteration(iterationId());
        }
        if (!this.cancelled.get()) {
            return 0L;
        }
        actualCancel();
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.commons.reactive.AbstractAsyncPublisherHandler
    public long handleNextResponse(IterationNextResponse<K, E> iterationNextResponse, Map.Entry<SocketAddress, IntSet> entry) {
        IntSet value;
        if (!iterationNextResponse.hasMore()) {
            sendCancel(entry);
            this.publisher.completeSegments(entry.getValue());
            targetComplete();
        }
        IntSet completedSegments = iterationNextResponse.getCompletedSegments();
        if (completedSegments != null && log.isTraceEnabled() && (value = entry.getValue()) != null) {
            value.removeAll(completedSegments);
        }
        this.publisher.completeSegments(completedSegments);
        Iterator<Map.Entry<K, E>> it = iterationNextResponse.getEntries().iterator();
        while (it.hasNext() && onNext(it.next())) {
        }
        return r0.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.commons.reactive.AbstractAsyncPublisherHandler
    public void handleThrowableInResponse(Throwable th, Map.Entry<SocketAddress, IntSet> entry) {
        IntSet value;
        if (!(th instanceof TransportException) && !(th instanceof RemoteIllegalLifecycleStateException) && !(th instanceof ConnectException)) {
            th.addSuppressed(new TraceException());
            super.handleThrowableInResponse(th, (Throwable) entry);
            return;
        }
        log.throwableDuringPublisher(th);
        if (log.isTraceEnabled() && (value = entry.getValue()) != null) {
            log.tracef("There are still outstanding segments %s that will need to be retried", value);
        }
        this.publisher.erroredServer(entry.getKey());
        targetComplete();
        accept(0L);
    }
}
