package dev.snowdrop.vertx.http.common;

import io.vertx.core.streams.WriteStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:dev/snowdrop/vertx/http/common/WriteStreamSubscriber.class */
public class WriteStreamSubscriber<T extends WriteStream<?>, U> extends BaseSubscriber<U> {
    private final Logger logger;
    private final T writeStream;
    private final BiConsumer<T, U> nextHandler;
    private final MonoSink<Void> endHook;
    private final long requestLimit;
    private AtomicLong pendingCount;
    private final String logPrefix;

    /* loaded from: input_file:dev/snowdrop/vertx/http/common/WriteStreamSubscriber$Builder.class */
    public static class Builder<T extends WriteStream<?>, U> {
        private T writeStream;
        private BiConsumer<T, U> nextHandler;
        private MonoSink<Void> endHook;
        private long requestLimit = 1;

        public Builder<T, U> writeStream(T t) {
            this.writeStream = t;
            return this;
        }

        public Builder<T, U> nextHandler(BiConsumer<T, U> biConsumer) {
            this.nextHandler = biConsumer;
            return this;
        }

        public Builder<T, U> endHook(MonoSink<Void> monoSink) {
            this.endHook = monoSink;
            return this;
        }

        public Builder<T, U> requestLimit(long j) {
            this.requestLimit = j;
            return this;
        }

        public WriteStreamSubscriber<T, U> build() {
            Objects.requireNonNull(this.writeStream, "Write stream is required");
            Objects.requireNonNull(this.nextHandler, "Next handler is required");
            Objects.requireNonNull(this.endHook, "End hook is required");
            return new WriteStreamSubscriber<>(this.writeStream, this.nextHandler, this.endHook, this.requestLimit);
        }
    }

    private WriteStreamSubscriber(T t, BiConsumer<T, U> biConsumer, MonoSink<Void> monoSink, long j) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.pendingCount = new AtomicLong();
        this.writeStream = t;
        this.nextHandler = biConsumer;
        this.endHook = monoSink;
        this.requestLimit = j;
        this.logPrefix = "[" + ObjectUtils.getIdentityHexString(t) + "] ";
        t.exceptionHandler(this::exceptionHandler);
    }

    protected void hookOnSubscribe(Subscription subscription) {
        this.logger.debug("{}{} subscribed", this.logPrefix, this.writeStream);
        requestIfNotFull();
        this.writeStream.drainHandler(r3 -> {
            requestIfNotFull();
        });
    }

    protected void hookOnNext(U u) {
        this.logger.debug("{}Next: {}", this.logPrefix, u);
        this.nextHandler.accept(this.writeStream, u);
        this.pendingCount.decrementAndGet();
        requestIfNotFull();
    }

    protected void hookOnComplete() {
        this.logger.debug("{}Completed", this.logPrefix);
        this.endHook.success();
    }

    protected void hookOnCancel() {
        this.logger.debug("{}Canceled", this.logPrefix);
        this.endHook.success();
    }

    protected void hookOnError(Throwable th) {
        this.logger.debug("{}Error: {}", this.logPrefix, th);
        this.endHook.error(th);
    }

    private void exceptionHandler(Throwable th) {
        cancel();
    }

    private void requestIfNotFull() {
        if (this.writeStream.writeQueueFull() || this.pendingCount.get() >= this.requestLimit) {
            return;
        }
        this.logger.debug("{}Requesting more data", this.logPrefix);
        request(this.requestLimit - this.pendingCount.getAndSet(this.requestLimit));
    }
}
