package com.mongodb.reactivestreams.client.internal;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/mongodb/reactivestreams/client/internal/BatchCursorFlux.class */
class BatchCursorFlux<T> implements Publisher<T> {
    private final BatchCursorPublisher<T> batchCursorPublisher;
    private final AtomicBoolean inProgress = new AtomicBoolean(false);
    private final AtomicLong demandDelta = new AtomicLong(0);
    private volatile BatchCursor<T> batchCursor;
    private FluxSink<T> sink;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchCursorFlux(BatchCursorPublisher<T> batchCursorPublisher) {
        this.batchCursorPublisher = batchCursorPublisher;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        Flux.create(fluxSink -> {
            this.sink = fluxSink;
            fluxSink.onRequest(j -> {
                if (calculateDemand(j) <= 0 || !this.inProgress.compareAndSet(false, true)) {
                    return;
                }
                if (this.batchCursor != null) {
                    this.inProgress.set(false);
                    recurseCursor();
                    return;
                }
                Mono<BatchCursor<T>> batchCursor = this.batchCursorPublisher.batchCursor(calculateBatchSize(fluxSink.requestedFromDownstream()));
                Consumer<? super BatchCursor<T>> consumer = batchCursor2 -> {
                    this.batchCursor = batchCursor2;
                    this.inProgress.set(false);
                    if (fluxSink.isCancelled()) {
                        closeCursor();
                    } else {
                        recurseCursor();
                    }
                };
                Objects.requireNonNull(fluxSink);
                batchCursor.subscribe(consumer, fluxSink::error);
            });
            fluxSink.onCancel(this::closeCursor);
            fluxSink.onDispose(this::closeCursor);
        }, FluxSink.OverflowStrategy.BUFFER).subscribe(subscriber);
    }

    private void closeCursor() {
        if (this.batchCursor != null) {
            this.batchCursor.close();
        }
    }

    private void recurseCursor() {
        if (this.sink.isCancelled() || this.sink.requestedFromDownstream() <= 0 || !this.inProgress.compareAndSet(false, true)) {
            return;
        }
        if (this.batchCursor.isClosed()) {
            this.sink.complete();
        } else {
            this.batchCursor.setBatchSize(calculateBatchSize(this.sink.requestedFromDownstream()));
            Mono.from(this.batchCursor.next()).doOnCancel(this::closeCursor).doOnError(th -> {
                try {
                    closeCursor();
                } finally {
                    this.sink.error(th);
                }
            }).doOnSuccess(list -> {
                if (list != null) {
                    FluxSink<T> fluxSink = this.sink;
                    Objects.requireNonNull(fluxSink);
                    list.forEach(fluxSink::next);
                    calculateDemand(-list.size());
                }
                if (this.batchCursor.isClosed()) {
                    this.sink.complete();
                } else {
                    this.inProgress.set(false);
                    recurseCursor();
                }
            }).subscribe();
        }
    }

    long calculateDemand(long j) {
        return this.demandDelta.accumulateAndGet(j, (j2, j3) -> {
            long j2 = j2 + j3;
            if (j3 <= 0 || j2 >= j2) {
                return j2;
            }
            return Long.MAX_VALUE;
        });
    }

    int calculateBatchSize(long j) {
        Integer batchSize = this.batchCursorPublisher.getBatchSize();
        if (batchSize != null) {
            return batchSize.intValue();
        }
        if (j > 2147483647L) {
            return Integer.MAX_VALUE;
        }
        return Math.max(2, (int) j);
    }
}
