package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:lib/vertx-core.jar:io/vertx/core/eventbus/impl/EventBusImpl.class */
public class EventBusImpl implements EventBus, MetricsProvider {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EventBusImpl.class);
    protected final VertxInternal vertx;
    protected final EventBusMetrics metrics;
    protected volatile boolean started;
    private final List<Handler<DeliveryContext>> sendInterceptors = new CopyOnWriteArrayList();
    private final List<Handler<DeliveryContext>> receiveInterceptors = new CopyOnWriteArrayList();
    private final AtomicLong replySequence = new AtomicLong(0);
    protected final ConcurrentMap<String, ConcurrentCyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap();
    protected final CodecManager codecManager = new CodecManager();

    /* loaded from: input_file:lib/vertx-core.jar:io/vertx/core/eventbus/impl/EventBusImpl$HandlerEntry.class */
    public class HandlerEntry<T> implements Closeable {
        final String address;
        final HandlerRegistration<T> handler;

        public HandlerEntry(String str, HandlerRegistration<T> handlerRegistration) {
            this.address = str;
            this.handler = handlerRegistration;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            HandlerEntry handlerEntry = (HandlerEntry) obj;
            return this.address.equals(handlerEntry.address) && this.handler.equals(handlerEntry.handler);
        }

        public int hashCode() {
            return (31 * (this.address != null ? this.address.hashCode() : 0)) + (this.handler != null ? this.handler.hashCode() : 0);
        }

        @Override // io.vertx.core.Closeable
        public void close(Handler<AsyncResult<Void>> handler) {
            this.handler.unregister(handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/vertx-core.jar:io/vertx/core/eventbus/impl/EventBusImpl$InboundDeliveryContext.class */
    public class InboundDeliveryContext<T> implements DeliveryContext<T> {
        private final MessageImpl message;
        private final Iterator<Handler<DeliveryContext>> iter;
        private final HandlerHolder<T> holder;

        private InboundDeliveryContext(MessageImpl messageImpl, HandlerHolder<T> handlerHolder) {
            this.message = messageImpl;
            this.holder = handlerHolder;
            this.iter = EventBusImpl.this.receiveInterceptors.iterator();
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public Message<T> message() {
            return this.message;
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public void next() {
            if (!this.iter.hasNext()) {
                this.holder.getHandler().handle((Message) this.message);
                return;
            }
            try {
                Handler<DeliveryContext> next = this.iter.next();
                if (next != null) {
                    next.handle(this);
                } else {
                    next();
                }
            } catch (Throwable th) {
                EventBusImpl.log.error("Failure in interceptor", th);
            }
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public boolean send() {
            return this.message.isSend();
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public Object body() {
            return this.message.receivedBody;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/vertx-core.jar:io/vertx/core/eventbus/impl/EventBusImpl$LocalRegistrationResult.class */
    public static class LocalRegistrationResult<T> {
        final HandlerHolder<T> holder;
        final boolean newAddress;

        LocalRegistrationResult(HandlerHolder<T> handlerHolder, boolean z) {
            this.holder = handlerHolder;
            this.newAddress = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/vertx-core.jar:io/vertx/core/eventbus/impl/EventBusImpl$OutboundDeliveryContext.class */
    public class OutboundDeliveryContext<T> implements DeliveryContext<T> {
        public final MessageImpl message;
        public final DeliveryOptions options;
        public final Iterator<Handler<DeliveryContext>> iter;
        private final HandlerRegistration<T> handlerRegistration;
        private final MessageImpl replierMessage;

        private OutboundDeliveryContext(EventBusImpl eventBusImpl, MessageImpl messageImpl, DeliveryOptions deliveryOptions, HandlerRegistration<T> handlerRegistration) {
            this(messageImpl, deliveryOptions, handlerRegistration, (MessageImpl) null);
        }

        private OutboundDeliveryContext(MessageImpl messageImpl, DeliveryOptions deliveryOptions, HandlerRegistration<T> handlerRegistration, MessageImpl messageImpl2) {
            this.message = messageImpl;
            this.options = deliveryOptions;
            this.handlerRegistration = handlerRegistration;
            this.iter = EventBusImpl.this.sendInterceptors.iterator();
            this.replierMessage = messageImpl2;
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public Message<T> message() {
            return this.message;
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public void next() {
            if (!this.iter.hasNext()) {
                if (this.replierMessage == null) {
                    EventBusImpl.this.sendOrPub(this);
                    return;
                } else {
                    EventBusImpl.this.sendReply(this, this.replierMessage);
                    return;
                }
            }
            Handler<DeliveryContext> next = this.iter.next();
            try {
                if (next != null) {
                    next.handle(this);
                } else {
                    next();
                }
            } catch (Throwable th) {
                EventBusImpl.log.error("Failure in interceptor", th);
            }
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public boolean send() {
            return this.message.isSend();
        }

        @Override // io.vertx.core.eventbus.DeliveryContext
        public Object body() {
            return this.message.sentBody;
        }
    }

    public EventBusImpl(VertxInternal vertxInternal) {
        VertxMetrics metricsSPI = vertxInternal.metricsSPI();
        this.vertx = vertxInternal;
        this.metrics = metricsSPI != null ? metricsSPI.createEventBusMetrics() : null;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus addOutboundInterceptor(Handler<DeliveryContext<T>> handler) {
        this.sendInterceptors.add(handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus addInboundInterceptor(Handler<DeliveryContext<T>> handler) {
        this.receiveInterceptors.add(handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus removeOutboundInterceptor(Handler<DeliveryContext<T>> handler) {
        this.sendInterceptors.remove(handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> handler) {
        this.receiveInterceptors.remove(handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public synchronized void start(Handler<AsyncResult<Void>> handler) {
        if (this.started) {
            throw new IllegalStateException("Already started");
        }
        this.started = true;
        handler.handle(Future.succeededFuture());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj) {
        return send(str, obj, new DeliveryOptions(), null);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, Handler<AsyncResult<Message<T>>> handler) {
        return send(str, obj, new DeliveryOptions(), handler);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj, DeliveryOptions deliveryOptions) {
        return send(str, obj, deliveryOptions, null);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubInternal(createMessage(true, str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName(), null), deliveryOptions, handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str) {
        Objects.requireNonNull(str, "address");
        return new MessageProducerImpl(this.vertx, str, true, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, "address");
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this.vertx, str, true, deliveryOptions);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str) {
        Objects.requireNonNull(str, "address");
        return new MessageProducerImpl(this.vertx, str, false, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, "address");
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this.vertx, str, false, deliveryOptions);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj) {
        return publish(str, obj, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj, DeliveryOptions deliveryOptions) {
        sendOrPubInternal(createMessage(false, str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName(), null), deliveryOptions, null);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str) {
        checkStarted();
        Objects.requireNonNull(str, "address");
        return new HandlerRegistration(this.vertx, this.metrics, this, str, null, false, null, -1L);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = consumer(str);
        consumer.handler2((Handler) handler);
        return consumer;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str) {
        checkStarted();
        Objects.requireNonNull(str, "address");
        return new HandlerRegistration(this.vertx, this.metrics, this, str, null, true, null, -1L);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> localConsumer = localConsumer(str);
        localConsumer.handler2((Handler) handler);
        return localConsumer;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus registerCodec(MessageCodec messageCodec) {
        this.codecManager.registerCodec(messageCodec);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus unregisterCodec(String str) {
        this.codecManager.unregisterCodec(str);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus registerDefaultCodec(Class<T> cls, MessageCodec<T, ?> messageCodec) {
        this.codecManager.registerDefaultCodec(cls, messageCodec);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus unregisterDefaultCodec(Class cls) {
        this.codecManager.unregisterDefaultCodec(cls);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public void close(Handler<AsyncResult<Void>> handler) {
        if (!this.started) {
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        } else {
            unregisterAll();
            if (this.metrics != null) {
                this.metrics.close();
            }
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        }
    }

    @Override // io.vertx.core.metrics.Measured
    public boolean isMetricsEnabled() {
        return this.metrics != null;
    }

    @Override // io.vertx.core.spi.metrics.MetricsProvider
    public EventBusMetrics<?> getMetrics() {
        return this.metrics;
    }

    public MessageImpl createMessage(boolean z, String str, MultiMap multiMap, Object obj, String str2, Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(str, "no null address accepted");
        return new MessageImpl(str, null, multiMap, obj, this.codecManager.lookupCodec(obj, str2), z, this, handler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> HandlerHolder<T> addRegistration(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2) {
        Objects.requireNonNull(handlerRegistration.getHandler(), "handler");
        LocalRegistrationResult<T> addLocalRegistration = addLocalRegistration(str, handlerRegistration, z, z2);
        boolean z3 = addLocalRegistration.newAddress;
        handlerRegistration.getClass();
        addRegistration(z3, str, z, z2, handlerRegistration::setResult);
        return addLocalRegistration.holder;
    }

    protected <T> void addRegistration(boolean z, String str, boolean z2, boolean z3, Handler<AsyncResult<Void>> handler) {
        handler.handle(Future.succeededFuture());
    }

    private <T> LocalRegistrationResult<T> addLocalRegistration(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2) {
        Objects.requireNonNull(str, "address");
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        handlerRegistration.setHandlerContext(orCreateContext);
        HandlerHolder handlerHolder = new HandlerHolder(handlerRegistration, z, z2, orCreateContext);
        ConcurrentCyclicSequence<T> add = new ConcurrentCyclicSequence().add(handlerHolder);
        ConcurrentCyclicSequence<T> concurrentCyclicSequence = (ConcurrentCyclicSequence) this.handlerMap.merge(str, add, (concurrentCyclicSequence2, concurrentCyclicSequence3) -> {
            return concurrentCyclicSequence2.add(concurrentCyclicSequence3.first());
        });
        if (orCreateContext.deploymentID() != null) {
            orCreateContext.addCloseHook(new HandlerEntry(str, handlerRegistration));
        }
        return new LocalRegistrationResult<>(handlerHolder, add == concurrentCyclicSequence);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void removeRegistration(HandlerHolder<T> handlerHolder, Handler<AsyncResult<Void>> handler) {
        removeRegistration(removeLocalRegistration(handlerHolder) ? handlerHolder : null, handlerHolder.getHandler().address(), handler);
    }

    protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, String str, Handler<AsyncResult<Void>> handler) {
        callCompletionHandlerAsync(handler);
    }

    private <T> boolean removeLocalRegistration(HandlerHolder<T> handlerHolder) {
        String address = handlerHolder.getHandler().address();
        boolean z = this.handlerMap.compute(address, (str, concurrentCyclicSequence) -> {
            if (concurrentCyclicSequence == null) {
                return null;
            }
            ConcurrentCyclicSequence remove = concurrentCyclicSequence.remove(handlerHolder);
            if (remove.size() == 0) {
                return null;
            }
            return remove;
        }) == null;
        if (handlerHolder.setRemoved() && handlerHolder.getContext().deploymentID() != null) {
            handlerHolder.getContext().removeCloseHook(new HandlerEntry(address, handlerHolder.getHandler()));
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void sendReply(MessageImpl messageImpl, MessageImpl messageImpl2, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        if (messageImpl.address() == null) {
            throw new IllegalStateException("address not specified");
        }
        new OutboundDeliveryContext(messageImpl, deliveryOptions, createReplyHandlerRegistration(messageImpl, deliveryOptions, handler), messageImpl2).next();
    }

    protected <T> void sendReply(OutboundDeliveryContext<T> outboundDeliveryContext, MessageImpl messageImpl) {
        sendOrPub(outboundDeliveryContext);
    }

    protected <T> void sendOrPub(OutboundDeliveryContext<T> outboundDeliveryContext) {
        MessageImpl messageImpl = outboundDeliveryContext.message;
        if (this.metrics != null) {
            this.metrics.messageSent(messageImpl.address(), !messageImpl.isSend(), true, false);
        }
        deliverMessageLocally(outboundDeliveryContext);
    }

    protected <T> Handler<Message<T>> convertHandler(Handler<AsyncResult<Message<T>>> handler) {
        return message -> {
            Future succeededFuture;
            if (message.body() instanceof ReplyException) {
                ReplyException replyException = (ReplyException) message.body();
                if (this.metrics != null) {
                    this.metrics.replyFailure(message.address(), replyException.failureType());
                }
                succeededFuture = Future.failedFuture(replyException);
            } else {
                succeededFuture = Future.succeededFuture(message);
            }
            handler.handle(succeededFuture);
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void callCompletionHandlerAsync(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.vertx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void deliverMessageLocally(OutboundDeliveryContext<T> outboundDeliveryContext) {
        ReplyException deliverMessageLocally = deliverMessageLocally(outboundDeliveryContext.message);
        if (deliverMessageLocally != null) {
            if (this.metrics != null) {
                this.metrics.replyFailure(outboundDeliveryContext.message.address, ReplyFailure.NO_HANDLERS);
            }
            if (((OutboundDeliveryContext) outboundDeliveryContext).handlerRegistration != null) {
                ((OutboundDeliveryContext) outboundDeliveryContext).handlerRegistration.sendAsyncResultFailure(deliverMessageLocally);
            }
        }
    }

    protected boolean isMessageLocal(MessageImpl messageImpl) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReplyException deliverMessageLocally(MessageImpl messageImpl) {
        ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence = this.handlerMap.get(messageImpl.address());
        if (concurrentCyclicSequence == null) {
            if (this.metrics != null) {
                this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal(messageImpl), 0);
            }
            ReplyException replyException = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + messageImpl.address);
            Handler<AsyncResult<Void>> handler = messageImpl.writeHandler;
            if (handler != null) {
                handler.handle(Future.failedFuture(replyException));
            }
            return replyException;
        }
        if (!messageImpl.isSend()) {
            if (this.metrics != null) {
                this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal(messageImpl), concurrentCyclicSequence.size());
            }
            Iterator<HandlerHolder> it = concurrentCyclicSequence.iterator();
            while (it.hasNext()) {
                deliverToHandler(messageImpl, it.next());
            }
            Handler<AsyncResult<Void>> handler2 = messageImpl.writeHandler;
            if (handler2 == null) {
                return null;
            }
            handler2.handle(Future.succeededFuture());
            return null;
        }
        HandlerHolder next = concurrentCyclicSequence.next();
        if (this.metrics != null) {
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal(messageImpl), next != null ? 1 : 0);
        }
        if (next == null) {
            return null;
        }
        deliverToHandler(messageImpl, next);
        Handler<AsyncResult<Void>> handler3 = messageImpl.writeHandler;
        if (handler3 == null) {
            return null;
        }
        handler3.handle(Future.succeededFuture());
        return null;
    }

    protected void checkStarted() {
        if (!this.started) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }

    protected String generateReplyAddress() {
        return "__vertx.reply." + Long.toString(this.replySequence.incrementAndGet());
    }

    private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl messageImpl, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        if (handler == null) {
            return null;
        }
        long sendTimeout = deliveryOptions.getSendTimeout();
        String generateReplyAddress = generateReplyAddress();
        messageImpl.setReplyAddress(generateReplyAddress);
        Handler<Message<T>> convertHandler = convertHandler(handler);
        HandlerRegistration<T> handlerRegistration = new HandlerRegistration<>(this.vertx, this.metrics, this, generateReplyAddress, messageImpl.address, true, handler, sendTimeout);
        handlerRegistration.handler2((Handler) convertHandler);
        return handlerRegistration;
    }

    public <T> void sendOrPubInternal(MessageImpl messageImpl, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        checkStarted();
        new OutboundDeliveryContext(messageImpl, deliveryOptions, createReplyHandlerRegistration(messageImpl, deliveryOptions, handler)).next();
    }

    private void unregisterAll() {
        Iterator<ConcurrentCyclicSequence<HandlerHolder>> it = this.handlerMap.values().iterator();
        while (it.hasNext()) {
            Iterator<HandlerHolder> it2 = it.next().iterator();
            while (it2.hasNext()) {
                it2.next().getHandler().unregister();
            }
        }
    }

    private <T> void deliverToHandler(MessageImpl messageImpl, HandlerHolder<T> handlerHolder) {
        InboundDeliveryContext inboundDeliveryContext = new InboundDeliveryContext(messageImpl.copyBeforeReceive(), handlerHolder);
        if (this.metrics != null) {
            this.metrics.scheduleMessage(handlerHolder.getHandler().getMetric(), messageImpl.isLocal());
        }
        handlerHolder.getContext().runOnContext(r4 -> {
            try {
                inboundDeliveryContext.next();
            } finally {
                if (handlerHolder.isReplyHandler()) {
                    handlerHolder.getHandler().unregister();
                }
            }
        });
    }

    protected void finalize() throws Throwable {
        close(asyncResult -> {
        });
        super.finalize();
    }
}
