package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.UUID;

/* loaded from: input_file:lib/vertx-core.jar:io/vertx/core/eventbus/impl/MessageProducerImpl.class */
public class MessageProducerImpl<T> implements MessageProducer<T> {
    public static final String CREDIT_ADDRESS_HEADER_NAME = "__vertx.credit";
    private final Vertx vertx;
    private final EventBusImpl bus;
    private final boolean send;
    private final String address;
    private final MessageConsumer<Integer> creditConsumer;
    private DeliveryOptions options;
    private Handler<Void> drainHandler;
    private final Queue<MessageImpl<T, ?>> pending = new ArrayDeque();
    private int maxSize = 1000;
    private int credits = 1000;

    public MessageProducerImpl(Vertx vertx, String str, boolean z, DeliveryOptions deliveryOptions) {
        this.vertx = vertx;
        this.bus = (EventBusImpl) vertx.eventBus();
        this.address = str;
        this.send = z;
        this.options = deliveryOptions;
        if (!z) {
            this.creditConsumer = null;
            return;
        }
        String str2 = UUID.randomUUID().toString() + "-credit";
        this.creditConsumer = this.bus.consumer(str2, message -> {
            doReceiveCredit(((Integer) message.body()).intValue());
        });
        deliveryOptions.addHeader(CREDIT_ADDRESS_HEADER_NAME, str2);
    }

    @Override // io.vertx.core.eventbus.MessageProducer
    public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions deliveryOptions) {
        if (this.creditConsumer != null) {
            deliveryOptions = new DeliveryOptions(deliveryOptions);
            deliveryOptions.addHeader(CREDIT_ADDRESS_HEADER_NAME, this.options.getHeaders().get(CREDIT_ADDRESS_HEADER_NAME));
        }
        this.options = deliveryOptions;
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageProducer
    public MessageProducer<T> send(T t) {
        doSend(t, null, null);
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageProducer
    public <R> MessageProducer<T> send(T t, Handler<AsyncResult<Message<R>>> handler) {
        doSend(t, handler, null);
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream, io.vertx.core.streams.StreamBase
    public MessageProducer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    /* renamed from: setWriteQueueMaxSize */
    public synchronized MessageProducer<T> setWriteQueueMaxSize2(int i) {
        int i2 = i - this.maxSize;
        this.maxSize = i;
        this.credits += i2;
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public synchronized MessageProducer<T> write(T t) {
        return write((MessageProducerImpl<T>) t, (Handler<AsyncResult<Void>>) null);
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public MessageProducer<T> write(T t, Handler<AsyncResult<Void>> handler) {
        if (this.send) {
            doSend(t, null, handler);
        } else {
            MessageImpl createMessage = this.bus.createMessage(false, this.address, this.options.getHeaders(), t, this.options.getCodecName(), handler);
            createMessage.writeHandler = handler;
            this.bus.sendOrPubInternal(createMessage, this.options, null);
        }
        return this;
    }

    @Override // io.vertx.core.streams.WriteStream
    public synchronized boolean writeQueueFull() {
        return this.credits == 0;
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public synchronized MessageProducer<T> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        if (handler != null) {
            checkDrained();
        }
        return this;
    }

    private void checkDrained() {
        Handler<Void> handler = this.drainHandler;
        if (handler == null || this.credits < this.maxSize / 2) {
            return;
        }
        this.drainHandler = null;
        this.vertx.runOnContext(r4 -> {
            handler.handle(null);
        });
    }

    @Override // io.vertx.core.eventbus.MessageProducer
    public String address() {
        return this.address;
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public void end() {
        close();
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public void end(Handler<AsyncResult<Void>> handler) {
        close(null);
    }

    @Override // io.vertx.core.eventbus.MessageProducer
    public void close() {
        close(null);
    }

    @Override // io.vertx.core.eventbus.MessageProducer
    public void close(Handler<AsyncResult<Void>> handler) {
        if (this.creditConsumer != null) {
            this.creditConsumer.unregister(handler);
        } else {
            this.vertx.runOnContext(r4 -> {
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                }
            });
        }
    }

    protected void finalize() throws Throwable {
        close();
        super.finalize();
    }

    private synchronized <R> void doSend(T t, Handler<AsyncResult<Message<R>>> handler, Handler<AsyncResult<Void>> handler2) {
        MessageImpl<T, ?> createMessage = this.bus.createMessage(true, this.address, this.options.getHeaders(), t, this.options.getCodecName(), handler2);
        if (this.credits <= 0) {
            this.pending.add(createMessage);
        } else {
            this.credits--;
            this.bus.sendOrPubInternal(createMessage, this.options, handler);
        }
    }

    private synchronized void doReceiveCredit(int i) {
        MessageImpl<T, ?> poll;
        this.credits += i;
        while (this.credits > 0 && (poll = this.pending.poll()) != null) {
            this.credits--;
            this.bus.sendOrPubInternal(poll, this.options, null);
        }
        checkDrained();
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public /* bridge */ /* synthetic */ WriteStream drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public /* bridge */ /* synthetic */ WriteStream write(Object obj, Handler handler) {
        return write((MessageProducerImpl<T>) obj, (Handler<AsyncResult<Void>>) handler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream
    public /* bridge */ /* synthetic */ WriteStream write(Object obj) {
        return write((MessageProducerImpl<T>) obj);
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ WriteStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.core.eventbus.MessageProducer, io.vertx.core.streams.WriteStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
