package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

/* loaded from: input_file:org/infinispan/reactive/publisher/impl/InnerPublisherSubscription.class */
public class InnerPublisherSubscription<K, I, R, E> implements LongConsumer, Action {
    protected static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final InnerPublisherSubscriptionBuilder<K, I, R> builder;
    private final FlowableProcessor<E> flowableProcessor;
    private final AtomicLong requestedAmount = new AtomicLong();
    private volatile Map.Entry<Address, IntSet> currentTarget;
    private volatile boolean cancelled;
    private volatile boolean alreadyCreated;

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/InnerPublisherSubscription$InnerPublisherSubscriptionBuilder.class */
    public static class InnerPublisherSubscriptionBuilder<K, I, R> {
        private final ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> parent;
        private final int batchSize;
        private final Supplier<Map.Entry<Address, IntSet>> supplier;
        private final Map<Address, Set<K>> excludedKeys;
        private final int topologyId;

        public InnerPublisherSubscriptionBuilder(ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler, int i, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> map, int i2) {
            this.parent = subscriberHandler;
            this.batchSize = i;
            this.supplier = supplier;
            this.excludedKeys = map;
            this.topologyId = i2;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Publisher<R> createValuePublisher(Map.Entry<Address, IntSet> entry) {
            final UnicastProcessor create = UnicastProcessor.create(this.batchSize);
            InnerPublisherSubscription<K, I, R, R> innerPublisherSubscription = new InnerPublisherSubscription<K, I, R, R>(this, create, entry) { // from class: org.infinispan.reactive.publisher.impl.InnerPublisherSubscription.InnerPublisherSubscriptionBuilder.1
                @Override // org.infinispan.reactive.publisher.impl.InnerPublisherSubscription
                protected void doOnValue(R r, int i) {
                    create.onNext(r);
                }
            };
            return create.doOnLifecycle(RxJavaInterop.emptyConsumer(), innerPublisherSubscription, innerPublisherSubscription);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Publisher<SegmentPublisherSupplier.Notification<R>> createNotificationPublisher(Map.Entry<Address, IntSet> entry) {
            final UnicastProcessor create = UnicastProcessor.create(this.batchSize);
            InnerPublisherSubscription<K, I, R, SegmentPublisherSupplier.Notification<R>> innerPublisherSubscription = new InnerPublisherSubscription<K, I, R, SegmentPublisherSupplier.Notification<R>>(this, create, entry) { // from class: org.infinispan.reactive.publisher.impl.InnerPublisherSubscription.InnerPublisherSubscriptionBuilder.2
                @Override // org.infinispan.reactive.publisher.impl.InnerPublisherSubscription
                protected void doOnValue(R r, int i) {
                    create.onNext(Notifications.value(r, i));
                }

                @Override // org.infinispan.reactive.publisher.impl.InnerPublisherSubscription
                protected void doOnSegmentComplete(int i) {
                    create.onNext(Notifications.segmentComplete(i));
                }
            };
            return create.doOnLifecycle(RxJavaInterop.emptyConsumer(), innerPublisherSubscription, innerPublisherSubscription);
        }
    }

    private InnerPublisherSubscription(InnerPublisherSubscriptionBuilder<K, I, R> innerPublisherSubscriptionBuilder, FlowableProcessor<E> flowableProcessor, Map.Entry<Address, IntSet> entry) {
        this.builder = innerPublisherSubscriptionBuilder;
        this.flowableProcessor = flowableProcessor;
        this.currentTarget = entry;
    }

    @Override // io.reactivex.rxjava3.functions.Action
    public void run() {
        Map.Entry<Address, IntSet> entry;
        this.cancelled = true;
        if (!this.alreadyCreated || (entry = this.currentTarget) == null) {
            return;
        }
        ((InnerPublisherSubscriptionBuilder) this.builder).parent.sendCancelCommand(entry.getKey());
    }

    @Override // io.reactivex.rxjava3.functions.LongConsumer
    public void accept(long j) {
        CompletionStage<PublisherResponse> sendInitialCommand;
        if (!shouldSubmit(j) || checkCancelled()) {
            return;
        }
        Map.Entry<Address, IntSet> entry = this.currentTarget;
        if (entry == null) {
            this.alreadyCreated = false;
            entry = ((InnerPublisherSubscriptionBuilder) this.builder).supplier.get();
            if (entry == null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Completing processor %s", this.flowableProcessor);
                }
                this.flowableProcessor.onComplete();
                return;
            }
            this.currentTarget = entry;
        }
        ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler = ((InnerPublisherSubscriptionBuilder) this.builder).parent;
        Address key = entry.getKey();
        IntSet value = entry.getValue();
        try {
            if (this.alreadyCreated) {
                sendInitialCommand = subscriberHandler.sendNextCommand(key, ((InnerPublisherSubscriptionBuilder) this.builder).topologyId);
            } else {
                this.alreadyCreated = true;
                sendInitialCommand = subscriberHandler.sendInitialCommand(key, value, ((InnerPublisherSubscriptionBuilder) this.builder).batchSize, ((InnerPublisherSubscriptionBuilder) this.builder).excludedKeys.remove(key), ((InnerPublisherSubscriptionBuilder) this.builder).topologyId);
            }
            sendInitialCommand.whenComplete((publisherResponse, th) -> {
                KeyPublisherResponse keyPublisherResponse;
                int extraSize;
                if (th != null) {
                    handleThrowableInResponse(CompletableFutures.extractException(th), key, value);
                    return;
                }
                try {
                    if (log.isTraceEnabled()) {
                        log.tracef("Received %s for id %s from %s", publisherResponse, subscriberHandler.requestId, key);
                    }
                    IntSet completedSegments = publisherResponse.getCompletedSegments();
                    if (completedSegments != null) {
                        if (log.isTraceEnabled()) {
                            log.tracef("Completed segments %s for id %s from %s", completedSegments, subscriberHandler.requestId, key);
                        }
                        Objects.requireNonNull(subscriberHandler);
                        completedSegments.forEach(subscriberHandler::completeSegment);
                        Objects.requireNonNull(value);
                        completedSegments.forEach(value::remove);
                    }
                    IntSet lostSegments = publisherResponse.getLostSegments();
                    if (lostSegments != null) {
                        if (log.isTraceEnabled()) {
                            log.tracef("Lost segments %s for id %s from %s", completedSegments, subscriberHandler.requestId, key);
                        }
                        Objects.requireNonNull(value);
                        lostSegments.forEach(value::remove);
                    }
                    if (publisherResponse.isComplete()) {
                        this.currentTarget = null;
                    } else {
                        publisherResponse.keysForNonCompletedSegments(subscriberHandler);
                    }
                    R[] results = publisherResponse.getResults();
                    if ((publisherResponse instanceof KeyPublisherResponse) && (extraSize = (keyPublisherResponse = (KeyPublisherResponse) publisherResponse).getExtraSize()) > 0) {
                        int length = results.length;
                        ?? r0 = new Object[length + extraSize];
                        System.arraycopy(results, 0, r0, 0, length);
                        System.arraycopy(keyPublisherResponse.getExtraObjects(), 0, r0, length, extraSize);
                        results = r0;
                    }
                    int i = 0;
                    for (PublisherHandler.SegmentResult segmentResult : publisherResponse.getSegmentResults()) {
                        if (checkCancelled()) {
                            return;
                        }
                        int segment = segmentResult.getSegment();
                        for (int i2 = 0; i2 < segmentResult.getEntryCount(); i2++) {
                            int i3 = i;
                            i++;
                            doOnValue(results[i3], segment);
                        }
                        if (completedSegments != null && completedSegments.remove(segment)) {
                            doOnSegmentComplete(segment);
                        }
                    }
                    if (completedSegments != null) {
                        completedSegments.forEach(this::doOnSegmentComplete);
                    }
                    accept(-i);
                } catch (Throwable th) {
                    handleThrowableInResponse(th, key, value);
                }
            });
        } catch (Throwable th2) {
            handleThrowableInResponse(th2, key, value);
        }
    }

    protected void doOnValue(R r, int i) {
    }

    protected void doOnSegmentComplete(int i) {
    }

    private boolean shouldSubmit(long j) {
        long j2;
        long j3;
        do {
            j2 = this.requestedAmount.get();
            j3 = j2 + j;
        } while (!this.requestedAmount.compareAndSet(j2, j3));
        return j3 > 0 && (j2 <= 0 || j <= 0);
    }

    private void handleThrowableInResponse(Throwable th, Address address, IntSet intSet) {
        if (this.cancelled) {
            log.tracef("Encountered exception after subscription was cancelled, this can most likely ignored, message is %s", th.getMessage());
        } else if (!((InnerPublisherSubscriptionBuilder) this.builder).parent.handleThrowable(th, address, intSet)) {
            this.flowableProcessor.onError(th);
        } else {
            this.currentTarget = null;
            accept(0L);
        }
    }

    private boolean checkCancelled() {
        if (!this.cancelled) {
            return false;
        }
        if (!log.isTraceEnabled()) {
            return true;
        }
        log.tracef("Subscription %s was cancelled, terminating early", this);
        return true;
    }
}
