package org.apache.activemq.artemis.protocol.amqp.connect;

import io.netty.channel.ChannelHandler;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.class */
public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection {
    private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
    private final ProtonProtocolManager protonProtocolManager;
    private final ActiveMQServer server;
    private final NettyConnector bridgesConnector;
    private NettyConnection connection;
    private Session session;
    private AMQPSessionContext sessionContext;
    private ActiveMQProtonRemotingConnection protonRemotingConnection;
    private final AMQPBrokerConnectionManager bridgeManager;
    private volatile ScheduledFuture reconnectFuture;
    final Executor connectExecutor;
    final ScheduledExecutorService scheduledExecutorService;
    String host;
    int port;
    private static final String EXTERNAL = "EXTERNAL";
    private static final String PLAIN = "PLAIN";
    private static final String ANONYMOUS = "ANONYMOUS";
    private static final Logger logger = Logger.getLogger(AMQPBrokerConnection.class);
    private static final byte[] EMPTY = new byte[0];
    private volatile boolean started = false;
    private int retryCounter = 0;
    private boolean connecting = false;
    private Set<Queue> senders = new HashSet();
    private Set<Queue> receivers = new HashSet();

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$AMQPOutgoingController.class */
    private class AMQPOutgoingController implements SenderController {
        final Queue queue;
        final Sender sender;
        final AMQPSessionCallback sessionSPI;

        AMQPOutgoingController(Queue queue, Sender sender, AMQPSessionCallback aMQPSessionCallback) {
            this.queue = queue;
            this.sessionSPI = aMQPSessionCallback;
            this.sender = sender;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
        public Consumer init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
            return (Consumer) this.sessionSPI.createSender(protonServerSenderContext, this.queue.getName(), null, false);
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$AnonymousSASLMechanism.class */
    private static class AnonymousSASLMechanism implements ClientSASL {
        private AnonymousSASLMechanism() {
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public String getName() {
            return "ANONYMOUS";
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getInitialResponse() {
            return AMQPBrokerConnection.EMPTY;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getResponse(byte[] bArr) {
            return AMQPBrokerConnection.EMPTY;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$ExternalSASLMechanism.class */
    private static class ExternalSASLMechanism implements ClientSASL {
        private ExternalSASLMechanism() {
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public String getName() {
            return "EXTERNAL";
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getInitialResponse() {
            return AMQPBrokerConnection.EMPTY;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getResponse(byte[] bArr) {
            return AMQPBrokerConnection.EMPTY;
        }

        public static boolean isApplicable(NettyConnection nettyConnection) {
            return CertificateUtil.getLocalPrincipalFromConnection(nettyConnection) != null;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$PlainSASLMechanism.class */
    private static class PlainSASLMechanism implements ClientSASL {
        private final byte[] initialResponse;

        PlainSASLMechanism(String str, String str2) {
            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
            byte[] bytes2 = str2.getBytes(StandardCharsets.UTF_8);
            byte[] bArr = new byte[bytes.length + bytes2.length + 2];
            System.arraycopy(bytes, 0, bArr, 1, bytes.length);
            System.arraycopy(bytes2, 0, bArr, bytes.length + 2, bytes2.length);
            this.initialResponse = bArr;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public String getName() {
            return "PLAIN";
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getInitialResponse() {
            return this.initialResponse;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getResponse(byte[] bArr) {
            return AMQPBrokerConnection.EMPTY;
        }

        public static boolean isApplicable(String str, String str2) {
            return str != null && str.length() > 0 && str2 != null && str2.length() > 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$SaslFactory.class */
    public static final class SaslFactory implements ClientSASLFactory {
        private final NettyConnection connection;
        private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;

        SaslFactory(NettyConnection nettyConnection, AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration) {
            this.connection = nettyConnection;
            this.brokerConnectConfiguration = aMQPBrokerConnectConfiguration;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory
        public ClientSASL chooseMechanism(String[] strArr) {
            List emptyList = strArr == null ? Collections.emptyList() : Arrays.asList(strArr);
            if (emptyList.contains("EXTERNAL") && ExternalSASLMechanism.isApplicable(this.connection)) {
                return new ExternalSASLMechanism();
            }
            if (emptyList.contains("PLAIN") && PlainSASLMechanism.isApplicable(this.brokerConnectConfiguration.getUser(), this.brokerConnectConfiguration.getPassword())) {
                return new PlainSASLMechanism(this.brokerConnectConfiguration.getUser(), this.brokerConnectConfiguration.getPassword());
            }
            if (emptyList.contains("ANONYMOUS")) {
                return new AnonymousSASLMechanism();
            }
            return null;
        }
    }

    public AMQPBrokerConnection(AMQPBrokerConnectionManager aMQPBrokerConnectionManager, AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration, ProtonProtocolManager protonProtocolManager, ActiveMQServer activeMQServer, NettyConnector nettyConnector) {
        this.bridgeManager = aMQPBrokerConnectionManager;
        this.brokerConnectConfiguration = aMQPBrokerConnectConfiguration;
        this.protonProtocolManager = protonProtocolManager;
        this.server = activeMQServer;
        this.bridgesConnector = nettyConnector;
        this.connectExecutor = activeMQServer.getExecutorFactory().getExecutor();
        this.scheduledExecutorService = activeMQServer.getScheduledPool();
    }

    public String getName() {
        return this.brokerConnectConfiguration.getName();
    }

    public String getProtocol() {
        return ProtonProtocolManagerFactory.AMQP_PROTOCOL_NAME;
    }

    public boolean isStarted() {
        return this.started;
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            if (this.protonRemotingConnection != null) {
                this.protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
                this.protonRemotingConnection = null;
                this.connection = null;
            }
            ScheduledFuture scheduledFuture = this.reconnectFuture;
            this.reconnectFuture = null;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
        }
    }

    public void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        this.server.getConfiguration().registerBrokerPlugin(this);
        try {
            if (this.brokerConnectConfiguration != null && this.brokerConnectConfiguration.getConnectionElements() != null) {
                for (AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement : this.brokerConnectConfiguration.getConnectionElements()) {
                    if (aMQPMirrorBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) {
                        installMirrorController(this, aMQPMirrorBrokerConnectionElement, this.server);
                    }
                }
            }
            this.connectExecutor.execute(() -> {
                doConnect();
            });
        } catch (Throwable th) {
            logger.warn(th.getMessage(), th);
        }
    }

    public NettyConnection getConnection() {
        return this.connection;
    }

    public void afterCreateQueue(Queue queue) {
        this.connectExecutor.execute(() -> {
            Iterator it = this.brokerConnectConfiguration.getConnectionElements().iterator();
            while (it.hasNext()) {
                validateMatching(queue, (AMQPBrokerConnectionElement) it.next());
            }
        });
    }

    public void validateMatching(Queue queue, AMQPBrokerConnectionElement aMQPBrokerConnectionElement) {
        if (aMQPBrokerConnectionElement.getType() != AMQPBrokerConnectionAddressType.MIRROR) {
            if (aMQPBrokerConnectionElement.getQueueName() != null) {
                if (queue.getName().equals(aMQPBrokerConnectionElement.getQueueName())) {
                    createLink(queue, aMQPBrokerConnectionElement);
                }
            } else if (aMQPBrokerConnectionElement.match(queue.getAddress(), this.server.getConfiguration().getWildcardConfiguration())) {
                createLink(queue, aMQPBrokerConnectionElement);
            }
        }
    }

    public void createLink(Queue queue, AMQPBrokerConnectionElement aMQPBrokerConnectionElement) {
        if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
            connectSender(queue, queue.getAddress().toString(), Symbol.valueOf("qd.waypoint"));
            connectReceiver(this.protonRemotingConnection, this.session, this.sessionContext, queue, Symbol.valueOf("qd.waypoint"));
            return;
        }
        if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
            connectSender(queue, queue.getAddress().toString(), new Symbol[0]);
        }
        if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
            connectReceiver(this.protonRemotingConnection, this.session, this.sessionContext, queue, new Symbol[0]);
        }
    }

    private void doConnect() {
        try {
            this.connecting = true;
            TransportConfiguration transportConfiguration = (TransportConfiguration) this.brokerConnectConfiguration.getTransportConfigurations().get(0);
            String stringProperty = ConfigurationHelper.getStringProperty("host", "localhost", transportConfiguration.getParams());
            int intProperty = ConfigurationHelper.getIntProperty("port", 61616, transportConfiguration.getParams());
            this.host = stringProperty;
            this.port = intProperty;
            this.connection = this.bridgesConnector.createConnection((java.util.function.Consumer) null, stringProperty, intProperty);
            if (this.connection == null) {
                retryConnection();
                return;
            }
            this.reconnectFuture = null;
            this.retryCounter = 0;
            this.senders.clear();
            this.receivers.clear();
            ConnectionEntry createOutgoingConnectionEntry = this.protonProtocolManager.createOutgoingConnectionEntry(this.connection, new SaslFactory(this.connection, this.brokerConnectConfiguration));
            this.server.getRemotingService().addConnectionEntry(this.connection, createOutgoingConnectionEntry);
            this.protonRemotingConnection = createOutgoingConnectionEntry.connection;
            this.connection.getChannel().pipeline().addLast(new ChannelHandler[]{new AMQPBrokerConnectionChannelHandler(this.bridgesConnector.getChannelGroup(), this.protonRemotingConnection.getAmqpConnection().getHandler(), this, this.server.getExecutorFactory().getExecutor())});
            this.session = this.protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
            this.sessionContext = this.protonRemotingConnection.getAmqpConnection().getSessionExtension(this.session);
            this.protonRemotingConnection.getAmqpConnection().runLater(() -> {
                this.protonRemotingConnection.getAmqpConnection().open();
                this.session.open();
                this.protonRemotingConnection.getAmqpConnection().flush();
            });
            if (this.brokerConnectConfiguration.getConnectionElements() != null) {
                this.server.getPostOffice().getAllBindings().forEach(binding -> {
                    if (binding instanceof QueueBinding) {
                        Queue queue = ((QueueBinding) binding).getQueue();
                        Iterator it = this.brokerConnectConfiguration.getConnectionElements().iterator();
                        while (it.hasNext()) {
                            validateMatching(queue, (AMQPBrokerConnectionElement) it.next());
                        }
                    }
                });
                for (AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement : this.brokerConnectConfiguration.getConnectionElements()) {
                    if (aMQPMirrorBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) {
                        connectSender(this.server.locateQueue(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress()), ProtonProtocolManager.MIRROR_ADDRESS, new Symbol[0]);
                    }
                }
            }
            this.protonRemotingConnection.getAmqpConnection().flush();
            this.bridgeManager.connected(this.connection, this);
            this.connecting = false;
        } catch (Throwable th) {
            error(th);
        }
    }

    public void retryConnection() {
        if (this.bridgeManager.isStarted() && this.started) {
            if (this.brokerConnectConfiguration.getReconnectAttempts() < 0 || this.retryCounter < this.brokerConnectConfiguration.getReconnectAttempts()) {
                this.retryCounter++;
                ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(this.brokerConnectConfiguration.getName(), this.host + ":" + this.port, this.retryCounter, this.brokerConnectConfiguration.getReconnectAttempts());
                if (logger.isDebugEnabled()) {
                    logger.debug("Reconnecting in " + this.brokerConnectConfiguration.getRetryInterval() + ", this is the " + this.retryCounter + " of " + this.brokerConnectConfiguration.getReconnectAttempts());
                }
                this.reconnectFuture = this.scheduledExecutorService.schedule(() -> {
                    this.connectExecutor.execute(() -> {
                        doConnect();
                    });
                }, this.brokerConnectConfiguration.getRetryInterval(), TimeUnit.MILLISECONDS);
                return;
            }
            this.connecting = false;
            ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(this.brokerConnectConfiguration.getName(), this.host + ":" + this.port, this.retryCounter);
            if (logger.isDebugEnabled()) {
                logger.debug("no more reconnections as the retry counter reached " + this.retryCounter + " out of " + this.brokerConnectConfiguration.getReconnectAttempts());
            }
        }
    }

    private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement, ActiveMQServer activeMQServer) {
    }

    private static Queue installMirrorController(AMQPBrokerConnection aMQPBrokerConnection, AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement, ActiveMQServer activeMQServer) throws Exception {
        MirrorController mirrorController = activeMQServer.getMirrorController();
        if (mirrorController != null && (mirrorController instanceof AMQPMirrorControllerSource)) {
            Queue checkCurrentMirror = checkCurrentMirror(aMQPBrokerConnection, (AMQPMirrorControllerSource) mirrorController);
            if (checkCurrentMirror != null) {
                return checkCurrentMirror;
            }
        } else if (mirrorController != null && (mirrorController instanceof AMQPMirrorControllerAggregation)) {
            Iterator<AMQPMirrorControllerSource> it = ((AMQPMirrorControllerAggregation) mirrorController).getPartitions().iterator();
            while (it.hasNext()) {
                Queue checkCurrentMirror2 = checkCurrentMirror(aMQPBrokerConnection, it.next());
                if (checkCurrentMirror2 != null) {
                    return checkCurrentMirror2;
                }
            }
        }
        AddressInfo addressInfo = activeMQServer.getAddressInfo(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress());
        if (addressInfo == null) {
            addressInfo = new AddressInfo(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(!aMQPMirrorBrokerConnectionElement.isDurable());
            activeMQServer.addAddressInfo(addressInfo);
        }
        if (addressInfo.getRoutingType() != RoutingType.ANYCAST) {
            throw new IllegalArgumentException("sourceMirrorAddress is not ANYCAST");
        }
        Queue locateQueue = activeMQServer.locateQueue(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress());
        if (locateQueue == null) {
            locateQueue = activeMQServer.createQueue(new QueueConfiguration(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress()).setAddress(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress()).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(aMQPMirrorBrokerConnectionElement.isDurable())), true);
        }
        locateQueue.setMirrorController(true);
        QueueBinding binding = activeMQServer.getPostOffice().getBinding(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress());
        if (binding == null) {
            logger.warn("Queue does not exist even after creation! " + aMQPMirrorBrokerConnectionElement);
            throw new IllegalAccessException("Cannot start replica");
        }
        Queue queue = binding.getQueue();
        if (!queue.getAddress().equals(aMQPMirrorBrokerConnectionElement.getSourceMirrorAddress())) {
            logger.warn("Queue " + queue + " belong to a different address (" + queue.getAddress() + "), while we expected it to be " + addressInfo.getName());
            throw new IllegalAccessException("Cannot start replica");
        }
        AMQPMirrorControllerSource aMQPMirrorControllerSource = new AMQPMirrorControllerSource(queue, activeMQServer, aMQPMirrorBrokerConnectionElement.isMessageAcknowledgements(), aMQPMirrorBrokerConnectionElement.isQueueCreation(), aMQPMirrorBrokerConnectionElement.isQueueRemoval(), aMQPBrokerConnection);
        activeMQServer.scanAddresses(aMQPMirrorControllerSource);
        if (mirrorController == null) {
            activeMQServer.installMirrorController(aMQPMirrorControllerSource);
        } else {
            if (mirrorController instanceof AMQPMirrorControllerSource) {
                AMQPMirrorControllerAggregation aMQPMirrorControllerAggregation = new AMQPMirrorControllerAggregation();
                aMQPMirrorControllerAggregation.addPartition((AMQPMirrorControllerSource) mirrorController);
                mirrorController = aMQPMirrorControllerAggregation;
                activeMQServer.installMirrorController(aMQPMirrorControllerAggregation);
            }
            ((AMQPMirrorControllerAggregation) mirrorController).addPartition(aMQPMirrorControllerSource);
        }
        return queue;
    }

    private static Queue checkCurrentMirror(AMQPBrokerConnection aMQPBrokerConnection, AMQPMirrorControllerSource aMQPMirrorControllerSource) {
        if (aMQPMirrorControllerSource.getBrokerConnection() == aMQPBrokerConnection) {
            return aMQPMirrorControllerSource.getSnfQueue();
        }
        return null;
    }

    private void connectReceiver(ActiveMQProtonRemotingConnection activeMQProtonRemotingConnection, Session session, AMQPSessionContext aMQPSessionContext, Queue queue, Symbol... symbolArr) {
        if (logger.isDebugEnabled()) {
            logger.debug("Connecting inbound for " + queue);
        }
        if (session == null) {
            logger.debug("session is null");
        } else {
            activeMQProtonRemotingConnection.getAmqpConnection().runLater(() -> {
                if (this.receivers.contains(queue)) {
                    logger.debug("Receiver for queue " + queue + " already exists, just giving up");
                    return;
                }
                this.receivers.add(queue);
                Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID());
                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                Target target = new Target();
                target.setAddress(queue.getAddress().toString());
                receiver.setTarget(target);
                Source source = new Source();
                source.setAddress(queue.getAddress().toString());
                receiver.setSource(source);
                if (symbolArr != null) {
                    source.setCapabilities(symbolArr);
                }
                receiver.open();
                activeMQProtonRemotingConnection.getAmqpConnection().flush();
                try {
                    aMQPSessionContext.addReceiver(receiver);
                } catch (Exception e) {
                    error(e);
                }
            });
        }
    }

    private void connectSender(Queue queue, String str, Symbol... symbolArr) {
        if (logger.isDebugEnabled()) {
            logger.debug("Connecting outbound for " + queue);
        }
        if (this.session == null) {
            logger.debug("Session is null");
        } else {
            this.protonRemotingConnection.getAmqpConnection().runLater(() -> {
                try {
                } catch (Exception e) {
                    error(e);
                }
                if (this.senders.contains(queue)) {
                    logger.debug("Sender for queue " + queue + " already exists, just giving up");
                    return;
                }
                this.senders.add(queue);
                Sender sender = this.session.sender(str + ":" + UUIDGenerator.getInstance().generateStringUUID());
                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                Target target = new Target();
                target.setAddress(str);
                if (symbolArr != null) {
                    target.setCapabilities(symbolArr);
                }
                sender.setTarget(target);
                Source source = new Source();
                source.setAddress(queue.getAddress().toString());
                sender.setSource(source);
                this.sessionContext.addSender(sender, new ProtonServerSenderContext(this.protonRemotingConnection.getAmqpConnection(), sender, this.sessionContext, this.sessionContext.getSessionSPI(), new AMQPOutgoingController(queue, sender, this.sessionContext.getSessionSPI())));
                this.protonRemotingConnection.getAmqpConnection().flush();
            });
        }
    }

    protected void error(Throwable th) {
        this.connecting = false;
        logger.warn(th.getMessage(), th);
        redoConnection();
    }

    public void disconnect() throws Exception {
        redoConnection();
    }

    public void connectionCreated(ActiveMQComponent activeMQComponent, Connection connection, ClientProtocolManager clientProtocolManager) {
    }

    public void connectionDestroyed(Object obj) {
        this.server.getRemotingService().removeConnection(obj);
        redoConnection();
    }

    public void connectionException(Object obj, ActiveMQException activeMQException) {
        redoConnection();
    }

    private void redoConnection() {
        this.connectExecutor.execute(() -> {
            if (this.connecting) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Broker connection " + getName() + " was already in retry mode, exception or retry not captured");
                    return;
                }
                return;
            }
            this.connecting = true;
            try {
                if (this.protonRemotingConnection != null) {
                    this.protonRemotingConnection.fail(new ActiveMQException("Connection being recreated"));
                    this.connection = null;
                    this.protonRemotingConnection = null;
                }
            } catch (Throwable th) {
                logger.warn(th.getMessage(), th);
            }
            retryConnection();
        });
    }

    public void connectionReadyForWrites(Object obj, boolean z) {
        this.protonRemotingConnection.flush();
    }
}
