/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.BasicProperties;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import io.vertx.rabbitmq.impl.RabbitMQClientImpl;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;

public class RabbitMQPublisherImpl
implements RabbitMQPublisher,
ReadStream<RabbitMQPublisherConfirmation> {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisherImpl.class);
    private final RabbitMQClient client;
    private final InboundBuffer<RabbitMQPublisherConfirmation> confirmations;
    private final Context context;
    private final RabbitMQPublisherOptions options;
    private final Deque<MessageDetails> pendingAcks = new ArrayDeque<MessageDetails>();
    private final InboundBuffer<MessageDetails> sendQueue;
    private long lastChannelInstance = 0L;
    private volatile boolean stopped = false;

    public RabbitMQPublisherImpl(Vertx vertx, RabbitMQClient client, RabbitMQPublisherOptions options) {
        this.client = client;
        this.context = vertx.getOrCreateContext();
        this.confirmations = new InboundBuffer(this.context);
        this.sendQueue = new InboundBuffer(this.context);
        this.sendQueue.handler(md -> this.handleMessageSend((MessageDetails)md));
        this.options = options;
        this.client.addConnectionEstablishedCallback((Handler<Promise<Void>>)((Handler)p -> {
            this.addConfirmListener(client, options, (Promise<Void>)p);
            if (client instanceof RabbitMQClientImpl) {
                if (this.lastChannelInstance == 0L) {
                    this.lastChannelInstance = ((RabbitMQClientImpl)client).getChannelInstance();
                } else if (this.lastChannelInstance != ((RabbitMQClientImpl)client).getChannelInstance()) {
                    this.pendingAcks.clear();
                    this.lastChannelInstance = ((RabbitMQClientImpl)client).getChannelInstance();
                }
            }
        }));
    }

    @Override
    public void start(Handler<AsyncResult<Void>> resultHandler) {
        Promise<Void> promise = this.startForPromise();
        promise.future().onComplete(resultHandler);
    }

    @Override
    public Future<Void> start() {
        Promise<Void> promise = this.startForPromise();
        return promise.future();
    }

    @Override
    public void stop(Handler<AsyncResult<Void>> resultHandler) {
        this.stopped = true;
        this.sendQueue.pause();
        if (this.sendQueue.isEmpty()) {
            resultHandler.handle((Object)Future.succeededFuture());
        } else {
            this.sendQueue.emptyHandler(v -> resultHandler.handle((Object)Future.succeededFuture()));
        }
        this.sendQueue.resume();
    }

    @Override
    public Future<Void> stop() {
        Promise promise = Promise.promise();
        this.stop((Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public void restart() {
        this.stopped = false;
        this.sendQueue.pause();
        this.sendQueue.emptyHandler(null);
        this.sendQueue.resume();
    }

    private Promise<Void> startForPromise() {
        Promise promise = Promise.promise();
        this.addConfirmListener(this.client, this.options, (Promise<Void>)promise);
        return promise;
    }

    protected final void addConfirmListener(RabbitMQClient client1, RabbitMQPublisherOptions options1, Promise<Void> promise) {
        this.context.runOnContext(unused -> client1.addConfirmListener(options1.getMaxInternalQueueSize()).onComplete(ar -> {
            if (ar.succeeded()) {
                ((ReadStream)ar.result()).handler(confirmation -> this.handleConfirmation((RabbitMQConfirmation)confirmation));
                promise.complete();
            } else {
                log.error((Object)"Failed to add confirmListener: ", ar.cause());
                promise.fail(ar.cause());
            }
        }));
    }

    @Override
    public ReadStream<RabbitMQPublisherConfirmation> getConfirmationStream() {
        return this;
    }

    @Override
    public int queueSize() {
        return this.sendQueue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessageSend(MessageDetails md) {
        this.sendQueue.pause();
        Deque<MessageDetails> deque = this.pendingAcks;
        synchronized (deque) {
            this.pendingAcks.add(md);
        }
        this.doSend(md);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSend(MessageDetails md) {
        try {
            this.client.basicPublishWithDeliveryTag(md.exchange, md.routingKey, md.properties, md.message, (Handler<Long>)((Handler)dt -> md.setDeliveryTag((long)dt)), (Handler<AsyncResult<Void>>)((Handler)publishResult -> {
                if (publishResult.succeeded()) {
                    if (md.publishHandler != null) {
                        try {
                            md.publishHandler.handle(publishResult);
                        }
                        catch (Throwable ex) {
                            log.warn((Object)"Failed to handle publish result", ex);
                        }
                    }
                    this.sendQueue.resume();
                } else {
                    log.info((Object)("Failed to publish message: " + publishResult.cause().toString()));
                    Deque<MessageDetails> deque = this.pendingAcks;
                    synchronized (deque) {
                        this.pendingAcks.remove(md);
                    }
                    this.client.restartConnect(0, (Handler<AsyncResult<Void>>)((Handler)rcRt -> this.doSend(md)));
                }
            }));
        }
        catch (Throwable ex) {
            Deque<MessageDetails> deque = this.pendingAcks;
            synchronized (deque) {
                this.pendingAcks.remove(md);
            }
            this.client.restartConnect(0, (Handler<AsyncResult<Void>>)((Handler)rcRt -> this.doSend(md)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleConfirmation(RabbitMQConfirmation rawConfirmation) {
        Deque<MessageDetails> deque = this.pendingAcks;
        synchronized (deque) {
            if (rawConfirmation.isMultiple()) {
                MessageDetails md;
                Iterator<MessageDetails> iter = this.pendingAcks.iterator();
                while (iter.hasNext() && (md = iter.next()).deliveryTag <= rawConfirmation.getDeliveryTag()) {
                    String messageId = md.properties == null ? null : md.properties.getMessageId();
                    this.confirmations.write((Object)new RabbitMQPublisherConfirmation(messageId, rawConfirmation.getDeliveryTag(), rawConfirmation.isSucceeded()));
                    if (md.confirmHandler != null) {
                        try {
                            md.confirmHandler.handle((Object)(rawConfirmation.isSucceeded() ? Future.succeededFuture((Object)md.deliveryTag) : Future.failedFuture((String)"Message publish nacked by the broker")));
                        }
                        catch (Throwable ex) {
                            log.warn((Object)"Failed to handle publish confirm", ex);
                        }
                    }
                    iter.remove();
                }
            } else {
                Iterator<MessageDetails> iter = this.pendingAcks.iterator();
                while (iter.hasNext()) {
                    MessageDetails md = iter.next();
                    if (md.deliveryTag != rawConfirmation.getDeliveryTag()) continue;
                    String messageId = md.properties == null ? null : md.properties.getMessageId();
                    this.confirmations.write((Object)new RabbitMQPublisherConfirmation(messageId, rawConfirmation.getDeliveryTag(), rawConfirmation.isSucceeded()));
                    if (md.confirmHandler != null) {
                        try {
                            md.confirmHandler.handle((Object)(rawConfirmation.isSucceeded() ? Future.succeededFuture((Object)md.deliveryTag) : Future.failedFuture((String)"Message publish nacked by the broker")));
                        }
                        catch (Throwable ex) {
                            log.warn((Object)"Failed to handle publish confirm", ex);
                        }
                    }
                    iter.remove();
                    break;
                }
            }
        }
    }

    @Override
    public void publish(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<AsyncResult<Void>> resultHandler) {
        if (!this.stopped) {
            this.context.runOnContext(e -> this.sendQueue.write((Object)new MessageDetails(exchange, routingKey, properties, body, resultHandler, (Handler<AsyncResult<Long>>)((Handler)ar -> {}))));
        }
    }

    @Override
    public Future<Void> publish(String exchange, String routingKey, BasicProperties properties, Buffer body) {
        Promise promise = Promise.promise();
        if (!this.stopped) {
            this.context.runOnContext(e -> this.sendQueue.write((Object)new MessageDetails(exchange, routingKey, properties, body, (Handler<AsyncResult<Void>>)promise, null)));
        }
        return promise.future();
    }

    @Override
    public void publishConfirm(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<AsyncResult<Long>> resultHandler) {
        if (!this.stopped) {
            this.context.runOnContext(e -> this.sendQueue.write((Object)new MessageDetails(exchange, routingKey, properties, body, null, resultHandler)));
        }
    }

    @Override
    public Future<Long> publishConfirm(String exchange, String routingKey, BasicProperties properties, Buffer body) {
        return Future.future(p -> this.publishConfirm(exchange, routingKey, properties, body, (Handler<AsyncResult<Long>>)p));
    }

    public RabbitMQPublisherImpl exceptionHandler(Handler<Throwable> hndlr) {
        this.confirmations.exceptionHandler(hndlr);
        return this;
    }

    public RabbitMQPublisherImpl handler(Handler<RabbitMQPublisherConfirmation> hndlr) {
        this.confirmations.handler(hndlr);
        return this;
    }

    public RabbitMQPublisherImpl pause() {
        this.confirmations.pause();
        return this;
    }

    public RabbitMQPublisherImpl resume() {
        this.confirmations.resume();
        return this;
    }

    public RabbitMQPublisherImpl fetch(long l) {
        this.confirmations.fetch(l);
        return this;
    }

    public RabbitMQPublisherImpl endHandler(Handler<Void> hndlr) {
        return this;
    }

    static class MessageDetails {
        private final String exchange;
        private final String routingKey;
        private final BasicProperties properties;
        private final Buffer message;
        private final Handler<AsyncResult<Void>> publishHandler;
        private final Handler<AsyncResult<Long>> confirmHandler;
        private volatile long deliveryTag;

        MessageDetails(String exchange, String routingKey, BasicProperties properties, Buffer message, Handler<AsyncResult<Void>> publishHandler, Handler<AsyncResult<Long>> confirmHandler) {
            this.exchange = exchange;
            this.routingKey = routingKey;
            this.properties = properties;
            this.message = message;
            this.publishHandler = publishHandler;
            this.confirmHandler = confirmHandler;
        }

        public void setDeliveryTag(long deliveryTag) {
            this.deliveryTag = deliveryTag;
        }
    }
}

