package org.vertx.java.core.eventbus.impl;

import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.VoidHandler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.ReplyFailure;
import org.vertx.java.core.impl.Closeable;
import org.vertx.java.core.impl.DefaultContext;
import org.vertx.java.core.impl.DefaultFutureResult;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.impl.management.ManagementRegistry;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import org.vertx.java.core.net.NetClient;
import org.vertx.java.core.net.NetServer;
import org.vertx.java.core.net.NetSocket;
import org.vertx.java.core.net.impl.ServerID;
import org.vertx.java.core.parsetools.RecordParser;
import org.vertx.java.core.spi.cluster.AsyncMultiMap;
import org.vertx.java.core.spi.cluster.ChoosableIterable;
import org.vertx.java.core.spi.cluster.ClusterManager;

/* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus.class */
public class DefaultEventBus implements EventBus {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultEventBus.class);
    private static final Buffer PONG = new Buffer(new byte[]{1});
    private static final long PING_INTERVAL = 20000;
    private static final long PING_REPLY_INTERVAL = 20000;
    private final VertxInternal vertx;
    private ServerID serverID;
    private NetServer server;
    private AsyncMultiMap<String, ServerID> subs;
    private long defaultReplyTimeout;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections;
    private final ConcurrentMap<String, Handlers> handlerMap;
    private final ClusterManager clusterMgr;
    private final AtomicLong replySequence;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$ConnectionHolder.class */
    public class ConnectionHolder {
        final NetClient client;
        volatile NetSocket socket;
        final Queue<BaseMessage> pending;
        volatile boolean connected;
        long timeoutID;
        long pingTimeoutID;
        ServerID theServerID;

        private ConnectionHolder(NetClient netClient) {
            this.pending = new ConcurrentLinkedQueue();
            this.timeoutID = -1L;
            this.pingTimeoutID = -1L;
            this.client = netClient;
        }

        void writeMessage(BaseMessage baseMessage) {
            if (this.connected) {
                baseMessage.write(this.socket);
                return;
            }
            synchronized (this) {
                if (this.connected) {
                    baseMessage.write(this.socket);
                } else {
                    this.pending.add(baseMessage);
                }
            }
        }

        synchronized void connected(final ServerID serverID, NetSocket netSocket) {
            this.socket = netSocket;
            this.theServerID = serverID;
            this.connected = true;
            netSocket.exceptionHandler(new Handler<Throwable>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.1
                @Override // org.vertx.java.core.Handler
                public void handle(Throwable th) {
                    DefaultEventBus.this.cleanupConnection(serverID, ConnectionHolder.this, true);
                }
            });
            netSocket.closeHandler(new VoidHandler() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.2
                @Override // org.vertx.java.core.VoidHandler
                public void handle() {
                    DefaultEventBus.this.cleanupConnection(serverID, ConnectionHolder.this, false);
                }
            });
            netSocket.dataHandler(new Handler<Buffer>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.3
                @Override // org.vertx.java.core.Handler
                public void handle(Buffer buffer) {
                    DefaultEventBus.this.vertx.cancelTimer(ConnectionHolder.this.timeoutID);
                    DefaultEventBus.this.schedulePing(ConnectionHolder.this);
                }
            });
            DefaultEventBus.this.schedulePing(this);
            Iterator<BaseMessage> it = this.pending.iterator();
            while (it.hasNext()) {
                it.next().write(netSocket);
            }
            this.pending.clear();
        }

        void connect(NetClient netClient, final ServerID serverID) {
            netClient.connect(serverID.port, serverID.host, new AsyncResultHandler<NetSocket>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.4
                @Override // org.vertx.java.core.Handler
                public void handle(AsyncResult<NetSocket> asyncResult) {
                    if (asyncResult.succeeded()) {
                        ConnectionHolder.this.connected(serverID, asyncResult.result());
                    } else {
                        DefaultEventBus.this.cleanupConnection(serverID, ConnectionHolder.this, true);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$HandlerEntry.class */
    public class HandlerEntry implements Closeable {
        final String address;
        final Handler<? extends Message> handler;

        private HandlerEntry(String str, Handler<? extends Message> handler) {
            this.address = str;
            this.handler = handler;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            HandlerEntry handlerEntry = (HandlerEntry) obj;
            return this.address.equals(handlerEntry.address) && this.handler.equals(handlerEntry.handler);
        }

        public int hashCode() {
            return (31 * (this.address != null ? this.address.hashCode() : 0)) + (this.handler != null ? this.handler.hashCode() : 0);
        }

        @Override // org.vertx.java.core.impl.Closeable
        public void close(Handler<AsyncResult<Void>> handler) {
            DefaultEventBus.this.unregisterHandler(this.address, this.handler);
            handler.handle(new DefaultFutureResult((Void) null));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$HandlerHolder.class */
    public static class HandlerHolder<T> {
        final DefaultContext context;
        final Handler<Message<T>> handler;
        final boolean replyHandler;
        final boolean localOnly;
        final long timeoutID;
        boolean removed;

        HandlerHolder(Handler<Message<T>> handler, boolean z, boolean z2, DefaultContext defaultContext, long j) {
            this.context = defaultContext;
            this.handler = handler;
            this.replyHandler = z;
            this.localOnly = z2;
            this.timeoutID = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return this.handler.equals(((HandlerHolder) obj).handler);
        }

        public int hashCode() {
            return this.handler.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$Handlers.class */
    public static class Handlers {
        final List<HandlerHolder> list;
        final AtomicInteger pos;

        private Handlers() {
            this.list = new CopyOnWriteArrayList();
            this.pos = new AtomicInteger(0);
        }

        HandlerHolder choose() {
            while (true) {
                int size = this.list.size();
                if (size == 0) {
                    return null;
                }
                int andIncrement = this.pos.getAndIncrement();
                if (andIncrement >= size - 1) {
                    this.pos.set(0);
                }
                try {
                    return this.list.get(andIncrement);
                } catch (IndexOutOfBoundsException e) {
                    this.pos.set(0);
                }
            }
        }
    }

    public DefaultEventBus(VertxInternal vertxInternal) {
        this.defaultReplyTimeout = -1L;
        this.connections = new ConcurrentHashMap();
        this.handlerMap = new ConcurrentHashMap();
        this.replySequence = new AtomicLong(0L);
        this.vertx = vertxInternal;
        this.serverID = new ServerID(-1, "localhost");
        this.server = null;
        this.subs = null;
        this.clusterMgr = null;
        ManagementRegistry.registerEventBus(this.serverID);
    }

    public DefaultEventBus(VertxInternal vertxInternal, int i, String str, ClusterManager clusterManager) {
        this(vertxInternal, i, str, clusterManager, null);
    }

    public DefaultEventBus(VertxInternal vertxInternal, int i, String str, ClusterManager clusterManager, Handler<AsyncResult<Void>> handler) {
        this.defaultReplyTimeout = -1L;
        this.connections = new ConcurrentHashMap();
        this.handlerMap = new ConcurrentHashMap();
        this.replySequence = new AtomicLong(0L);
        this.vertx = vertxInternal;
        this.clusterMgr = clusterManager;
        this.subs = this.clusterMgr.getAsyncMultiMap("subs");
        this.server = setServer(i, str, handler);
        ManagementRegistry.registerEventBus(this.serverID);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Object obj, final Handler<Message> handler) {
        sendOrPub(createMessage(true, str, obj), handler == null ? null : new Handler<Message<String>>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.1
            /* renamed from: handle, reason: avoid collision after fix types in other method */
            public void handle2(Message message) {
                handler.handle(message);
            }

            @Override // org.vertx.java.core.Handler
            public /* bridge */ /* synthetic */ void handle(Message<String> message) {
                handle2((Message) message);
            }
        });
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Object obj, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, obj), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Object obj) {
        sendOrPub(createMessage(true, str, obj), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, JsonObject jsonObject, Handler<Message<T>> handler) {
        sendOrPub(new JsonObjectMessage(true, str, jsonObject), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, JsonObject jsonObject, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, jsonObject), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, JsonObject jsonObject) {
        sendOrPub(new JsonObjectMessage(true, str, jsonObject), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, JsonArray jsonArray, Handler<Message<T>> handler) {
        sendOrPub(new JsonArrayMessage(true, str, jsonArray), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, JsonArray jsonArray, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, jsonArray), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, JsonArray jsonArray) {
        sendOrPub(new JsonArrayMessage(true, str, jsonArray), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Buffer buffer, Handler<Message<T>> handler) {
        sendOrPub(new BufferMessage(true, str, buffer), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Buffer buffer, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, buffer), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Buffer buffer) {
        sendOrPub(new BufferMessage(true, str, buffer), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, byte[] bArr, Handler<Message<T>> handler) {
        sendOrPub(new ByteArrayMessage(true, str, bArr), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, byte[] bArr, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, bArr), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, byte[] bArr) {
        sendOrPub(new ByteArrayMessage(true, str, bArr), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, String str2, Handler<Message<T>> handler) {
        sendOrPub(new StringMessage(true, str, str2), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, String str2, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, str2), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, String str2) {
        sendOrPub(new StringMessage(true, str, str2), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Integer num, Handler<Message<T>> handler) {
        sendOrPub(new IntMessage(true, str, num), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Integer num, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, num), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Integer num) {
        sendOrPub(new IntMessage(true, str, num), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Long l, Handler<Message<T>> handler) {
        sendOrPub(new LongMessage(true, str, l), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Long l, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, l), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Long l) {
        sendOrPub(new LongMessage(true, str, l), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Float f, Handler<Message<T>> handler) {
        sendOrPub(new FloatMessage(true, str, f), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Float f, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, f), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Float f) {
        sendOrPub(new FloatMessage(true, str, f), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Double d, Handler<Message<T>> handler) {
        sendOrPub(new DoubleMessage(true, str, d), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Double d, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, d), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Double d) {
        sendOrPub(new DoubleMessage(true, str, d), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Boolean bool, Handler<Message<T>> handler) {
        sendOrPub(new BooleanMessage(true, str, bool), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Boolean bool, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, bool), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Boolean bool) {
        sendOrPub(new BooleanMessage(true, str, bool), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Short sh, Handler<Message<T>> handler) {
        sendOrPub(new ShortMessage(true, str, sh), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Short sh, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, sh), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Short sh) {
        sendOrPub(new ShortMessage(true, str, sh), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Character ch, Handler<Message<T>> handler) {
        sendOrPub(new CharacterMessage(true, str, ch), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Character ch, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, ch), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Character ch) {
        sendOrPub(new CharacterMessage(true, str, ch), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus send(String str, Byte b, Handler<Message<T>> handler) {
        sendOrPub(new ByteMessage(true, str, b), handler);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public <T> EventBus sendWithTimeout(String str, Byte b, long j, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubWithTimeout(createMessage(true, str, b), handler, j);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus send(String str, Byte b) {
        sendOrPub(new ByteMessage(true, str, b), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Object obj) {
        sendOrPub(createMessage(false, str, obj), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, JsonObject jsonObject) {
        sendOrPub(new JsonObjectMessage(false, str, jsonObject), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, JsonArray jsonArray) {
        sendOrPub(new JsonArrayMessage(false, str, jsonArray), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Buffer buffer) {
        sendOrPub(new BufferMessage(false, str, buffer), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, byte[] bArr) {
        sendOrPub(new ByteArrayMessage(false, str, bArr), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, String str2) {
        sendOrPub(new StringMessage(false, str, str2), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Integer num) {
        sendOrPub(new IntMessage(false, str, num), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Long l) {
        sendOrPub(new LongMessage(false, str, l), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Float f) {
        sendOrPub(new FloatMessage(false, str, f), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Double d) {
        sendOrPub(new DoubleMessage(false, str, d), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Boolean bool) {
        sendOrPub(new BooleanMessage(false, str, bool), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Short sh) {
        sendOrPub(new ShortMessage(false, str, sh), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Character ch) {
        sendOrPub(new CharacterMessage(false, str, ch), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus publish(String str, Byte b) {
        sendOrPub(new ByteMessage(false, str, b), null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus registerHandler(String str, Handler<? extends Message> handler, Handler<AsyncResult<Void>> handler2) {
        registerHandler(str, handler, handler2, false, false, -1L);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus registerHandler(String str, Handler<? extends Message> handler) {
        registerHandler(str, handler, null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus registerLocalHandler(String str, Handler<? extends Message> handler) {
        registerHandler(str, handler, null, false, true, -1L);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus unregisterHandler(String str, Handler<? extends Message> handler, Handler<AsyncResult<Void>> handler2) {
        checkStarted();
        Handlers handlers = this.handlerMap.get(str);
        if (handlers != null) {
            synchronized (handlers) {
                int size = handlers.list.size();
                for (int i = 0; i < size; i++) {
                    HandlerHolder handlerHolder = handlers.list.get(i);
                    if (handlerHolder.handler == handler) {
                        if (handlerHolder.timeoutID != -1) {
                            this.vertx.cancelTimer(handlerHolder.timeoutID);
                        }
                        handlers.list.remove(i);
                        handlerHolder.removed = true;
                        if (handlers.list.isEmpty()) {
                            this.handlerMap.remove(str);
                            if (this.subs != null && !handlerHolder.localOnly) {
                                removeSub(str, this.serverID, handler2);
                            } else if (handler2 != null) {
                                callCompletionHandler(handler2);
                            }
                        } else if (handler2 != null) {
                            callCompletionHandler(handler2);
                        }
                        handlerHolder.context.removeCloseHook(new HandlerEntry(str, handler));
                        return this;
                    }
                }
            }
        }
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus unregisterHandler(String str, Handler<? extends Message> handler) {
        unregisterHandler(str, handler, null);
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void close(Handler<AsyncResult<Void>> handler) {
        if (this.clusterMgr != null) {
            this.clusterMgr.leave();
        }
        if (this.server != null) {
            this.server.close(handler);
        }
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public EventBus setDefaultReplyTimeout(long j) {
        this.defaultReplyTimeout = j;
        return this;
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public long getDefaultReplyTimeout() {
        return this.defaultReplyTimeout;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, U> void sendReply(ServerID serverID, BaseMessage<U> baseMessage, Handler<Message<T>> handler) {
        sendOrPub(serverID, baseMessage, handler, -1L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, U> void sendReplyWithTimeout(ServerID serverID, BaseMessage<U> baseMessage, long j, Handler<AsyncResult<Message<T>>> handler) {
        if (baseMessage.address == null) {
            sendNoHandlersFailure(handler);
        } else {
            sendOrPub(serverID, baseMessage, convertHandler(handler), handler, j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public static <U> BaseMessage<U> createMessage(boolean z, String str, U u) {
        BaseMessage stringMessage;
        if (u instanceof String) {
            stringMessage = new StringMessage(z, str, (String) u);
        } else if (u instanceof Buffer) {
            stringMessage = new BufferMessage(z, str, (Buffer) u);
        } else if (u instanceof JsonObject) {
            stringMessage = new JsonObjectMessage(z, str, (JsonObject) u);
        } else if (u instanceof JsonArray) {
            stringMessage = new JsonArrayMessage(z, str, (JsonArray) u);
        } else if (u instanceof byte[]) {
            stringMessage = new ByteArrayMessage(z, str, (byte[]) u);
        } else if (u instanceof Integer) {
            stringMessage = new IntMessage(z, str, (Integer) u);
        } else if (u instanceof Long) {
            stringMessage = new LongMessage(z, str, (Long) u);
        } else if (u instanceof Float) {
            stringMessage = new FloatMessage(z, str, (Float) u);
        } else if (u instanceof Double) {
            stringMessage = new DoubleMessage(z, str, (Double) u);
        } else if (u instanceof Boolean) {
            stringMessage = new BooleanMessage(z, str, (Boolean) u);
        } else if (u instanceof Short) {
            stringMessage = new ShortMessage(z, str, (Short) u);
        } else if (u instanceof Character) {
            stringMessage = new CharacterMessage(z, str, (Character) u);
        } else if (u instanceof Byte) {
            stringMessage = new ByteMessage(z, str, (Byte) u);
        } else {
            if (u != 0) {
                throw new IllegalArgumentException("Cannot send object of class " + u.getClass() + " on the event bus: " + u);
            }
            stringMessage = new StringMessage(z, str, null);
        }
        return stringMessage;
    }

    private NetServer setServer(int i, final String str, final Handler<AsyncResult<Void>> handler) {
        final NetServer connectHandler = this.vertx.createNetServer().connectHandler(new Handler<NetSocket>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.2
            @Override // org.vertx.java.core.Handler
            public void handle(final NetSocket netSocket) {
                final RecordParser newFixed = RecordParser.newFixed(4, null);
                newFixed.setOutput(new Handler<Buffer>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.2.1
                    int size = -1;

                    @Override // org.vertx.java.core.Handler
                    public void handle(Buffer buffer) {
                        if (this.size == -1) {
                            this.size = buffer.getInt(0);
                            newFixed.fixedSizeMode(this.size);
                            return;
                        }
                        BaseMessage read = MessageFactory.read(buffer);
                        if (read.type() == 0) {
                            netSocket.write2(DefaultEventBus.PONG);
                        } else {
                            DefaultEventBus.this.receiveMessage(read, -1L, null, null);
                        }
                        newFixed.fixedSizeMode(4);
                        this.size = -1;
                    }
                });
                netSocket.dataHandler(newFixed);
            }
        });
        connectHandler.listen(i, str, new AsyncResultHandler<NetServer>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.3
            @Override // org.vertx.java.core.Handler
            public void handle(AsyncResult<NetServer> asyncResult) {
                if (asyncResult.succeeded()) {
                    int intValue = Integer.getInteger("vertx.cluster.public.port", -1).intValue();
                    String property = System.getProperty("vertx.cluster.public.host", null);
                    int port = intValue == -1 ? connectHandler.port() : intValue;
                    DefaultEventBus.this.serverID = new ServerID(port, property == null ? str : property);
                }
                if (handler == null) {
                    if (asyncResult.failed()) {
                        DefaultEventBus.log.error("Failed to listen", asyncResult.cause());
                    }
                } else if (asyncResult.succeeded()) {
                    handler.handle(new DefaultFutureResult((Void) null));
                } else {
                    handler.handle(new DefaultFutureResult(asyncResult.cause()));
                }
            }
        });
        return connectHandler;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void sendToSubs(ChoosableIterable<ServerID> choosableIterable, BaseMessage baseMessage, long j, Handler<AsyncResult<Message<T>>> handler, Handler<Message<T>> handler2) {
        if (baseMessage.send) {
            ServerID choose = choosableIterable.choose();
            if (choose.equals(this.serverID)) {
                receiveMessage(baseMessage, j, handler, handler2);
                return;
            } else {
                sendRemote(choose, baseMessage);
                return;
            }
        }
        for (ServerID serverID : choosableIterable) {
            if (serverID.equals(this.serverID)) {
                receiveMessage(baseMessage, j, null, handler2);
            } else {
                sendRemote(serverID, baseMessage);
            }
        }
    }

    private <T, U> void sendOrPubWithTimeout(BaseMessage<U> baseMessage, Handler<AsyncResult<Message<T>>> handler, long j) {
        sendOrPub(null, baseMessage, convertHandler(handler), handler, j);
    }

    private <T, U> void sendOrPub(BaseMessage<U> baseMessage, Handler<Message<T>> handler) {
        sendOrPub(null, baseMessage, handler, -1L);
    }

    private <T, U> void sendOrPub(ServerID serverID, BaseMessage<U> baseMessage, Handler<Message<T>> handler, long j) {
        sendOrPub(serverID, baseMessage, handler, null, j);
    }

    private String generateReplyAddress() {
        return this.clusterMgr != null ? UUID.randomUUID().toString() : Long.toString(this.replySequence.incrementAndGet());
    }

    private <T, U> void sendOrPub(ServerID serverID, final BaseMessage<U> baseMessage, final Handler<Message<T>> handler, final Handler<AsyncResult<Message<T>>> handler2, long j) {
        checkStarted();
        DefaultContext orCreateContext = this.vertx.getOrCreateContext();
        if (j == -1) {
            j = this.defaultReplyTimeout;
        }
        try {
            baseMessage.sender = this.serverID;
            long j2 = -1;
            if (handler != null) {
                baseMessage.replyAddress = generateReplyAddress();
                if (j != -1) {
                    j2 = this.vertx.setTimer(j, new Handler<Long>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.4
                        @Override // org.vertx.java.core.Handler
                        public void handle(Long l) {
                            DefaultEventBus.log.warn("Message reply handler timed out as no reply was received - it will be removed");
                            DefaultEventBus.this.unregisterHandler(baseMessage.replyAddress, handler);
                            if (handler2 != null) {
                                handler2.handle(new DefaultFutureResult((Throwable) new ReplyException(ReplyFailure.TIMEOUT, "Timed out waiting for reply")));
                            }
                        }
                    });
                }
                registerHandler(baseMessage.replyAddress, handler, null, true, true, j2);
            }
            if (serverID != null) {
                if (serverID.equals(this.serverID)) {
                    receiveMessage(baseMessage, j2, handler2, handler);
                } else {
                    sendRemote(serverID, baseMessage);
                }
            } else if (this.subs != null) {
                final long j3 = j2;
                this.subs.get(baseMessage.address, new AsyncResultHandler<ChoosableIterable<ServerID>>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.5
                    @Override // org.vertx.java.core.Handler
                    public void handle(AsyncResult<ChoosableIterable<ServerID>> asyncResult) {
                        if (!asyncResult.succeeded()) {
                            DefaultEventBus.log.error("Failed to send message", asyncResult.cause());
                            return;
                        }
                        ChoosableIterable<ServerID> result = asyncResult.result();
                        if (result == null || result.isEmpty()) {
                            DefaultEventBus.this.receiveMessage(baseMessage, j3, handler2, handler);
                        } else {
                            DefaultEventBus.this.sendToSubs(result, baseMessage, j3, handler2, handler);
                        }
                    }
                });
            } else {
                receiveMessage(baseMessage, j2, handler2, handler);
            }
        } finally {
            if (orCreateContext != null) {
                this.vertx.setContext(orCreateContext);
            }
        }
    }

    private <T> Handler<Message<T>> convertHandler(final Handler<AsyncResult<Message<T>>> handler) {
        return new Handler<Message<T>>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.6
            @Override // org.vertx.java.core.Handler
            public void handle(Message<T> message) {
                handler.handle(message.body() instanceof ReplyException ? new DefaultFutureResult((Throwable) message.body()) : new DefaultFutureResult(message));
            }
        };
    }

    private void registerHandler(String str, Handler<? extends Message> handler, Handler<AsyncResult<Void>> handler2, boolean z, boolean z2, long j) {
        checkStarted();
        if (str == null) {
            throw new NullPointerException("address");
        }
        DefaultContext context = this.vertx.getContext();
        boolean z3 = context != null;
        if (!z3) {
            context = this.vertx.createEventLoopContext();
        }
        Handlers handlers = this.handlerMap.get(str);
        if (handlers == null) {
            Handlers handlers2 = new Handlers();
            Handlers putIfAbsent = this.handlerMap.putIfAbsent(str, handlers2);
            if (putIfAbsent != null) {
                handlers2 = putIfAbsent;
            }
            if (handler2 == null) {
                handler2 = new Handler<AsyncResult<Void>>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.7
                    @Override // org.vertx.java.core.Handler
                    public void handle(AsyncResult<Void> asyncResult) {
                        if (asyncResult.failed()) {
                            DefaultEventBus.log.error("Failed to remove entry", asyncResult.cause());
                        }
                    }
                };
            }
            handlers2.list.add(new HandlerHolder(handler, z, z2, context, j));
            if (this.subs == null || z || z2) {
                callCompletionHandler(handler2);
            } else {
                this.subs.add(str, this.serverID, handler2);
            }
        } else {
            handlers.list.add(new HandlerHolder(handler, z, z2, context, j));
            if (handler2 != null) {
                callCompletionHandler(handler2);
            }
        }
        if (z3) {
            context.addCloseHook(new HandlerEntry(str, handler));
        }
    }

    private void callCompletionHandler(Handler<AsyncResult<Void>> handler) {
        handler.handle(new DefaultFutureResult((Void) null));
    }

    private void cleanSubsForServerID(ServerID serverID) {
        if (this.subs != null) {
            this.subs.removeAllForValue(serverID, new Handler<AsyncResult<Void>>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.8
                @Override // org.vertx.java.core.Handler
                public void handle(AsyncResult<Void> asyncResult) {
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupConnection(ServerID serverID, ConnectionHolder connectionHolder, boolean z) {
        if (connectionHolder.timeoutID != -1) {
            this.vertx.cancelTimer(connectionHolder.timeoutID);
        }
        if (connectionHolder.pingTimeoutID != -1) {
            this.vertx.cancelTimer(connectionHolder.pingTimeoutID);
        }
        try {
            connectionHolder.socket.close();
        } catch (Exception e) {
        }
        if (this.connections.remove(serverID, connectionHolder)) {
            log.debug("Cluster connection closed: " + serverID + " holder " + connectionHolder);
            if (z) {
                cleanSubsForServerID(serverID);
            }
        }
    }

    private void sendRemote(ServerID serverID, BaseMessage baseMessage) {
        ConnectionHolder connectionHolder = this.connections.get(serverID);
        if (connectionHolder == null) {
            NetClient createNetClient = this.vertx.createNetClient();
            createNetClient.setConnectTimeout(60000);
            connectionHolder = new ConnectionHolder(createNetClient);
            ConnectionHolder putIfAbsent = this.connections.putIfAbsent(serverID, connectionHolder);
            if (putIfAbsent != null) {
                connectionHolder = putIfAbsent;
            } else {
                connectionHolder.connect(createNetClient, serverID);
            }
        }
        connectionHolder.writeMessage(baseMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedulePing(final ConnectionHolder connectionHolder) {
        connectionHolder.pingTimeoutID = this.vertx.setTimer(20000L, new Handler<Long>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.9
            @Override // org.vertx.java.core.Handler
            public void handle(Long l) {
                connectionHolder.timeoutID = DefaultEventBus.this.vertx.setTimer(20000L, new Handler<Long>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.9.1
                    @Override // org.vertx.java.core.Handler
                    public void handle(Long l2) {
                        DefaultEventBus.log.warn("No pong from server " + DefaultEventBus.this.serverID + " - will consider it dead, timerID: " + l2 + " holder " + connectionHolder);
                        DefaultEventBus.this.cleanupConnection(connectionHolder.theServerID, connectionHolder, true);
                    }
                });
                new PingMessage(DefaultEventBus.this.serverID).write(connectionHolder.socket);
            }
        });
    }

    private void removeSub(String str, ServerID serverID, Handler<AsyncResult<Void>> handler) {
        this.subs.remove(str, serverID, handler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void receiveMessage(BaseMessage baseMessage, long j, Handler<AsyncResult<Message<T>>> handler, Handler<Message<T>> handler2) {
        baseMessage.bus = this;
        Handlers handlers = this.handlerMap.get(baseMessage.address);
        if (handlers == null) {
            if (handler != null) {
                sendNoHandlersFailure(handler);
                if (j != -1) {
                    this.vertx.cancelTimer(j);
                }
                if (handler2 != null) {
                    unregisterHandler(baseMessage.replyAddress, handler2);
                    return;
                }
                return;
            }
            return;
        }
        if (!baseMessage.send) {
            Iterator<HandlerHolder> it = handlers.list.iterator();
            while (it.hasNext()) {
                doReceive(baseMessage, it.next());
            }
        } else {
            HandlerHolder choose = handlers.choose();
            if (choose != null) {
                doReceive(baseMessage, choose);
            }
        }
    }

    private <T> void sendNoHandlersFailure(final Handler<AsyncResult<Message<T>>> handler) {
        this.vertx.runOnContext(new Handler<Void>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.10
            @Override // org.vertx.java.core.Handler
            public void handle(Void r8) {
                handler.handle(new DefaultFutureResult((Throwable) new ReplyException(ReplyFailure.NO_HANDLERS)));
            }
        });
    }

    private <T> void doReceive(final BaseMessage<T> baseMessage, final HandlerHolder<T> handlerHolder) {
        final Message<T> copy = baseMessage.copy();
        handlerHolder.context.execute(new Runnable() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.11
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!handlerHolder.removed) {
                        handlerHolder.handler.handle(copy);
                    }
                } finally {
                    if (handlerHolder.replyHandler) {
                        DefaultEventBus.this.unregisterHandler(baseMessage.address, handlerHolder.handler);
                    }
                }
            }
        });
    }

    private void checkStarted() {
        if (this.serverID == null) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }
}
