package org.infinispan.reactive.publisher.impl;

import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commons.util.IntSet;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-10.1.5.Final.jar:org/infinispan/reactive/publisher/impl/InnerPublisherSubscription.class */
class InnerPublisherSubscription<K, I, R> extends AtomicLong implements Publisher<R>, Subscription {
    protected static final Log log;
    protected static final boolean trace;
    private final ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> parent;
    private final SimplePlainQueue<R> queue;
    private final Supplier<Map.Entry<Address, IntSet>> supplier;
    private final int batchSize;
    private final Map<Address, Set<K>> excludedKeys;
    private final int topologyId;
    private AtomicReference<Subscriber<? super R>> subscriber;
    private final AtomicInteger requestors;
    private volatile Map.Entry<Address, IntSet> currentTarget;
    private volatile boolean cancelled;
    private volatile boolean alreadyCreated;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InnerPublisherSubscription(ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler, int i, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> map, int i2) {
        this.subscriber = new AtomicReference<>();
        this.requestors = new AtomicInteger();
        this.parent = subscriberHandler;
        this.queue = queueToUse(subscriberHandler.publisher.shouldTrackKeys, i);
        this.supplier = supplier;
        this.batchSize = i;
        this.excludedKeys = map;
        this.topologyId = i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InnerPublisherSubscription(ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler, int i, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> map, int i2, Map.Entry<Address, IntSet> entry) {
        this(subscriberHandler, i, supplier, map, i2);
        this.currentTarget = (Map.Entry) Objects.requireNonNull(entry);
    }

    private static <R> SimplePlainQueue<R> queueToUse(boolean z, int i) {
        return z ? new SpscLinkedArrayQueue(i) : new SpscArrayQueue(i);
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super R> subscriber) {
        if (!this.subscriber.compareAndSet(null, subscriber)) {
            EmptySubscription.error(new IllegalStateException("This processor allows only a single Subscriber"), subscriber);
            return;
        }
        if (trace) {
            log.tracef("Subscribed to %s via %s", this.parent.requestId, subscriber);
        }
        subscriber.onSubscribe(this);
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        if (SubscriptionHelper.validate(j)) {
            BackpressureHelper.add(this, j);
            if (this.requestors.getAndIncrement() == 0) {
                sendRequest(get());
            }
        }
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        Map.Entry<Address, IntSet> entry;
        this.cancelled = true;
        if (!this.alreadyCreated || (entry = this.currentTarget) == null) {
            return;
        }
        this.parent.sendCancelCommand(entry.getKey());
    }

    @GuardedBy("requestors")
    private void sendRequest(long j) {
        CompletionStage<PublisherResponse> sendInitialCommand;
        R poll;
        if (!$assertionsDisabled && j <= 0) {
            throw new AssertionError();
        }
        while (!this.queue.isEmpty()) {
            long j2 = 0;
            Subscriber<? super R> subscriber = this.subscriber.get();
            while (j2 < j && (poll = this.queue.poll()) != null) {
                subscriber.onNext(poll);
                j2++;
            }
            j = BackpressureHelper.produced(this, j2);
            if (j == 0) {
                j = continueWithRemaining(0L);
                if (j == 0) {
                    return;
                }
            }
        }
        if (!$assertionsDisabled && j <= 0) {
            throw new AssertionError();
        }
        if (checkCancelled()) {
            return;
        }
        Map.Entry<Address, IntSet> entry = this.currentTarget;
        if (entry == null) {
            this.alreadyCreated = false;
            entry = this.supplier.get();
            if (entry == null) {
                if (trace) {
                    log.tracef("Completing subscription %s", this);
                }
                this.subscriber.get().onComplete();
                return;
            }
            this.currentTarget = entry;
        }
        Address key = entry.getKey();
        IntSet value = entry.getValue();
        if (this.alreadyCreated) {
            sendInitialCommand = this.parent.sendNextCommand(key, this.topologyId);
        } else {
            this.alreadyCreated = true;
            sendInitialCommand = this.parent.sendInitialCommand(key, value, this.batchSize, this.excludedKeys.remove(key), this.topologyId);
        }
        sendInitialCommand.whenComplete((publisherResponse, th) -> {
            Object obj;
            KeyPublisherResponse keyPublisherResponse;
            int extraSize;
            if (th != null) {
                handleThrowableInResponse(th, key, value);
                return;
            }
            try {
                if (trace) {
                    log.tracef("Received %s for id %s from %s", publisherResponse, this.parent.requestId, key);
                }
                IntSet completedSegments = publisherResponse.getCompletedSegments();
                if (completedSegments != null) {
                    if (trace) {
                        log.tracef("Completed segments %s for id %s from %s", completedSegments, this.parent.requestId, key);
                    }
                    ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler = this.parent;
                    Objects.requireNonNull(subscriberHandler);
                    completedSegments.forEach(subscriberHandler::completeSegment);
                    Objects.requireNonNull(value);
                    completedSegments.forEach(value::remove);
                }
                IntSet lostSegments = publisherResponse.getLostSegments();
                if (lostSegments != null) {
                    if (trace) {
                        log.tracef("Lost segments %s for id %s from %s", completedSegments, this.parent.requestId, key);
                    }
                    Objects.requireNonNull(value);
                    lostSegments.forEach(value::remove);
                }
                if (publisherResponse.isComplete()) {
                    this.currentTarget = null;
                } else {
                    publisherResponse.forEachSegmentValue(this.parent, value.iterator().nextInt());
                }
                long j3 = get();
                if (!$assertionsDisabled && j3 <= 0) {
                    throw new AssertionError();
                }
                long j4 = 0;
                Object obj2 = null;
                Subscriber<? super R> subscriber2 = this.subscriber.get();
                Object[] results = publisherResponse.getResults();
                if ((publisherResponse instanceof KeyPublisherResponse) && (extraSize = (keyPublisherResponse = (KeyPublisherResponse) publisherResponse).getExtraSize()) > 0) {
                    int length = results.length;
                    Object[] objArr = new Object[length + extraSize];
                    System.arraycopy(results, 0, objArr, 0, length);
                    System.arraycopy(keyPublisherResponse.getExtraObjects(), 0, objArr, length, extraSize);
                    results = objArr;
                }
                Object[] objArr2 = results;
                int length2 = objArr2.length;
                for (int i = 0; i < length2 && (obj = objArr2[i]) != null; i++) {
                    if (checkCancelled()) {
                        return;
                    }
                    if (j4 < j3) {
                        subscriber2.onNext(obj);
                        j4++;
                    } else if (!this.queue.offer(obj)) {
                        throw new MissingBackpressureException("Inner queue full?!");
                    }
                    obj2 = obj;
                }
                if (completedSegments != null) {
                    this.parent.notifySegmentsComplete(completedSegments, obj2);
                }
                trySendRequest(j4);
            } catch (Throwable th) {
                handleThrowableInResponse(th, key, value);
            }
        });
    }

    private boolean checkCancelled() {
        if (!this.cancelled) {
            return false;
        }
        if (!trace) {
            return true;
        }
        log.tracef("Subscription %s was cancelled, terminating early", this);
        return true;
    }

    @GuardedBy("requestors")
    private void handleThrowableInResponse(Throwable th, Address address, IntSet intSet) {
        if (this.parent.handleThrowable(th, address, intSet)) {
            this.currentTarget = null;
            trySendRequest(0L);
        }
    }

    @GuardedBy("requestors")
    private void trySendRequest(long j) {
        long continueWithRemaining = continueWithRemaining(j);
        if (continueWithRemaining > 0) {
            sendRequest(continueWithRemaining);
        }
    }

    @GuardedBy("requestors")
    private long continueWithRemaining(long j) {
        long produced = BackpressureHelper.produced(this, j);
        if (produced > 0) {
            return produced;
        }
        while (this.requestors.decrementAndGet() != 0) {
            long j2 = get();
            if (j2 != 0) {
                return j2;
            }
        }
        return 0L;
    }

    @Override // java.util.concurrent.atomic.AtomicLong
    public String toString() {
        return "InnerPublisher-" + System.identityHashCode(this) + "{requestId=" + this.parent.requestId + ", topologyId=" + this.topologyId + '}';
    }

    static {
        $assertionsDisabled = !InnerPublisherSubscription.class.desiredAssertionStatus();
        log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
        trace = log.isTraceEnabled();
    }
}
