package org.apache.activemq.broker;

import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.MBeanNetworkListener;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/activemq/broker/TransportConnection.class */
public class TransportConnection implements Connection, Task, CommandVisitor {
    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
    protected final Broker broker;
    protected final TransportConnector connector;
    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
    protected BrokerInfo brokerInfo;
    protected TaskRunner taskRunner;
    private final Transport transport;
    private MessageAuthorizationPolicy messageAuthorizationPolicy;
    private WireFormatInfo wireFormatInfo;
    private boolean inServiceException;
    private boolean manageable;
    private boolean slow;
    private boolean markedCandidate;
    private boolean blockedCandidate;
    private boolean blocked;
    private boolean connected;
    private boolean active;
    private boolean starting;
    private boolean pendingStop;
    private long timeStamp;
    private ConnectionContext context;
    private boolean networkConnection;
    private boolean faultTolerantConnection;
    private DemandForwardingBridge duplexBridge;
    private final TaskRunnerFactory taskRunnerFactory;
    private final TaskRunnerFactory stopTaskRunnerFactory;
    private String duplexNetworkConnectorId;
    protected final List<Command> dispatchQueue = new LinkedList();
    protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
    private final ConnectionStatistics statistics = new ConnectionStatistics();
    private final AtomicBoolean stopping = new AtomicBoolean(false);
    private final CountDownLatch stopped = new CountDownLatch(1);
    private final AtomicBoolean asyncException = new AtomicBoolean(false);
    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap();
    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap();
    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
    private final AtomicInteger protocolVersion = new AtomicInteger(10);
    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();

    public TransportConnection(TransportConnector transportConnector, Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory taskRunnerFactory2) {
        this.connector = transportConnector;
        this.broker = broker;
        this.brokerConnectionStates = ((RegionBroker) broker.getAdaptor(RegionBroker.class)).getConnectionStates();
        if (transportConnector != null) {
            this.statistics.setParent(transportConnector.getStatistics());
            this.messageAuthorizationPolicy = transportConnector.getMessageAuthorizationPolicy();
        }
        this.taskRunnerFactory = taskRunnerFactory;
        this.stopTaskRunnerFactory = taskRunnerFactory2;
        this.transport = transport;
        final BrokerService brokerService = this.broker.getBrokerService();
        if (this.transport instanceof BrokerServiceAware) {
            ((BrokerServiceAware) this.transport).setBrokerService(brokerService);
        }
        this.transport.setTransportListener(new DefaultTransportListener() { // from class: org.apache.activemq.broker.TransportConnection.1
            @Override // org.apache.activemq.transport.DefaultTransportListener, org.apache.activemq.transport.TransportListener
            public void onCommand(Object obj) {
                TransportConnection.this.serviceLock.readLock().lock();
                try {
                    if (!(obj instanceof Command)) {
                        throw new RuntimeException("Protocol violation - Command corrupted: " + obj.toString());
                    }
                    Command command = (Command) obj;
                    if (brokerService.isStopping()) {
                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
                    }
                    Response service = TransportConnection.this.service(command);
                    if (service != null && !brokerService.isStopping()) {
                        TransportConnection.this.dispatchSync(service);
                    }
                } finally {
                    TransportConnection.this.serviceLock.readLock().unlock();
                }
            }

            @Override // org.apache.activemq.transport.DefaultTransportListener, org.apache.activemq.transport.TransportListener
            public void onException(IOException iOException) {
                TransportConnection.this.serviceLock.readLock().lock();
                try {
                    TransportConnection.this.serviceTransportException(iOException);
                    TransportConnection.this.serviceLock.readLock().unlock();
                } catch (Throwable th) {
                    TransportConnection.this.serviceLock.readLock().unlock();
                    throw th;
                }
            }
        });
        this.connected = true;
    }

    @Override // org.apache.activemq.broker.Connection
    public int getDispatchQueueSize() {
        int size;
        synchronized (this.dispatchQueue) {
            size = this.dispatchQueue.size();
        }
        return size;
    }

    public void serviceTransportException(IOException iOException) {
        BrokerService brokerService = this.connector.getBrokerService();
        if (brokerService.isShutdownOnSlaveFailure() && this.brokerInfo != null && this.brokerInfo.isSlaveBroker()) {
            LOG.error("Slave has exception: {} shutting down master now.", iOException.getMessage(), iOException);
            try {
                doStop();
                brokerService.stop();
            } catch (Exception e) {
                LOG.warn("Failed to stop the master", (Throwable) e);
            }
        }
        if (this.stopping.get() || this.pendingStop) {
            return;
        }
        this.transportException.set(iOException);
        if (TRANSPORTLOG.isDebugEnabled()) {
            TRANSPORTLOG.debug(this + " failed: " + iOException, (Throwable) iOException);
        } else if (TRANSPORTLOG.isWarnEnabled() && !expected(iOException)) {
            TRANSPORTLOG.warn(this + " failed: " + iOException);
        }
        stopAsync(iOException);
    }

    private boolean expected(IOException iOException) {
        return isStomp() && (((iOException instanceof SocketException) && iOException.getMessage().indexOf("reset") != -1) || (iOException instanceof EOFException));
    }

    private boolean isStomp() {
        URI uri = this.connector.getUri();
        return (uri == null || uri.getScheme() == null || uri.getScheme().indexOf("stomp") == -1) ? false : true;
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [org.apache.activemq.broker.TransportConnection$2] */
    @Override // org.apache.activemq.broker.Connection
    public void serviceExceptionAsync(final IOException iOException) {
        if (this.asyncException.compareAndSet(false, true)) {
            new Thread("Async Exception Handler") { // from class: org.apache.activemq.broker.TransportConnection.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    TransportConnection.this.serviceException(iOException);
                }
            }.start();
        }
    }

    @Override // org.apache.activemq.broker.Connection
    public void serviceException(Throwable th) {
        if (th instanceof IOException) {
            serviceTransportException((IOException) th);
            return;
        }
        if (th.getClass() == BrokerStoppedException.class) {
            if (this.stopping.get()) {
                return;
            }
            SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
            ConnectionError connectionError = new ConnectionError();
            connectionError.setException(th);
            dispatchSync(connectionError);
            this.transportException.set(th);
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            stopAsync();
            return;
        }
        if (this.stopping.get() || this.inServiceException) {
            return;
        }
        this.inServiceException = true;
        try {
            SERVICELOG.warn("Async error occurred: ", th);
            ConnectionError connectionError2 = new ConnectionError();
            connectionError2.setException(th);
            if (this.pendingStop) {
                dispatchSync(connectionError2);
            } else {
                dispatchAsync(connectionError2);
            }
        } finally {
            this.inServiceException = false;
        }
    }

    @Override // org.apache.activemq.broker.Connection
    public Response service(Command command) {
        MDC.put("activemq.connector", this.connector.getUri().toString());
        Response response = null;
        boolean isResponseRequired = command.isResponseRequired();
        int commandId = command.getCommandId();
        try {
            response = !this.pendingStop ? command.visit(this) : new ExceptionResponse(this.transportException.get());
        } catch (Throwable th) {
            if (SERVICELOG.isDebugEnabled() && th.getClass() != BrokerStoppedException.class) {
                SERVICELOG.debug("Error occured while processing " + (isResponseRequired ? "sync" : "async") + " command: " + command + ", exception: " + th, th);
            }
            if ((th instanceof SuppressReplyException) || (th.getCause() instanceof SuppressReplyException)) {
                LOG.info("Suppressing reply to: " + command + " on: " + th + ", cause: " + th.getCause());
                isResponseRequired = false;
            }
            if (isResponseRequired) {
                if ((th instanceof SecurityException) || (th.getCause() instanceof SecurityException)) {
                    SERVICELOG.warn("Security Error occurred: {}", th.getMessage());
                }
                response = new ExceptionResponse(th);
            } else {
                serviceException(th);
            }
        }
        if (isResponseRequired) {
            if (response == null) {
                response = new Response();
            }
            response.setCorrelationId(commandId);
        }
        if (this.context != null) {
            if (this.context.isDontSendReponse()) {
                this.context.setDontSendReponse(false);
                response = null;
            }
            this.context = null;
        }
        MDC.remove("activemq.connector");
        return response;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processKeepAlive(KeepAliveInfo keepAliveInfo) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRemoveSubscription(RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception {
        this.broker.removeSubscription(lookupConnectionState(removeSubscriptionInfo.getConnectionId()).getContext(), removeSubscriptionInfo);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processWireFormat(WireFormatInfo wireFormatInfo) throws Exception {
        this.wireFormatInfo = wireFormatInfo;
        this.protocolVersion.set(wireFormatInfo.getVersion());
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processShutdown(ShutdownInfo shutdownInfo) throws Exception {
        stopAsync();
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processFlush(FlushCommand flushCommand) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processBeginTransaction(TransactionInfo transactionInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(transactionInfo.getConnectionId());
        this.context = null;
        if (lookupConnectionState != null) {
            this.context = lookupConnectionState.getContext();
        }
        if (lookupConnectionState == null) {
            throw new NullPointerException("Context is null");
        }
        if (lookupConnectionState.getTransactionState(transactionInfo.getTransactionId()) != null) {
            return null;
        }
        lookupConnectionState.addTransactionState(transactionInfo.getTransactionId());
        this.broker.beginTransaction(this.context, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.apache.activemq.broker.Connection
    public int getActiveTransactionCount() {
        int i = 0;
        Iterator<TransportConnectionState> it = this.connectionStateRegister.listConnectionStates().iterator();
        while (it.hasNext()) {
            for (TransactionState transactionState : it.next().getTransactionStates()) {
                i++;
            }
        }
        return i;
    }

    @Override // org.apache.activemq.broker.Connection
    public Long getOldestActiveTransactionDuration() {
        TransactionState transactionState = null;
        Iterator<TransportConnectionState> it = this.connectionStateRegister.listConnectionStates().iterator();
        while (it.hasNext()) {
            for (TransactionState transactionState2 : it.next().getTransactionStates()) {
                if (transactionState == null || transactionState.getCreatedAt() < transactionState2.getCreatedAt()) {
                    transactionState = transactionState2;
                }
            }
        }
        if (transactionState == null) {
            return null;
        }
        return Long.valueOf(System.currentTimeMillis() - transactionState.getCreatedAt());
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processEndTransaction(TransactionInfo transactionInfo) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processPrepareTransaction(TransactionInfo transactionInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(transactionInfo.getConnectionId());
        this.context = null;
        if (lookupConnectionState != null) {
            this.context = lookupConnectionState.getContext();
        }
        if (lookupConnectionState == null) {
            throw new NullPointerException("Context is null");
        }
        TransactionState transactionState = lookupConnectionState.getTransactionState(transactionInfo.getTransactionId());
        if (transactionState == null) {
            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " + transactionInfo.getTransactionId());
        }
        if (transactionState.isPrepared()) {
            return new IntegerResponse(transactionState.getPreparedResult());
        }
        transactionState.setPrepared(true);
        int prepareTransaction = this.broker.prepareTransaction(this.context, transactionInfo.getTransactionId());
        transactionState.setPreparedResult(prepareTransaction);
        if (prepareTransaction == 3) {
            lookupConnectionState.removeTransactionState(transactionInfo.getTransactionId());
        }
        return new IntegerResponse(prepareTransaction);
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processCommitTransactionOnePhase(TransactionInfo transactionInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(transactionInfo.getConnectionId());
        this.context = lookupConnectionState.getContext();
        lookupConnectionState.removeTransactionState(transactionInfo.getTransactionId());
        this.broker.commitTransaction(this.context, transactionInfo.getTransactionId(), true);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processCommitTransactionTwoPhase(TransactionInfo transactionInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(transactionInfo.getConnectionId());
        this.context = lookupConnectionState.getContext();
        lookupConnectionState.removeTransactionState(transactionInfo.getTransactionId());
        this.broker.commitTransaction(this.context, transactionInfo.getTransactionId(), false);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRollbackTransaction(TransactionInfo transactionInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(transactionInfo.getConnectionId());
        this.context = lookupConnectionState.getContext();
        lookupConnectionState.removeTransactionState(transactionInfo.getTransactionId());
        this.broker.rollbackTransaction(this.context, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processForgetTransaction(TransactionInfo transactionInfo) throws Exception {
        this.context = lookupConnectionState(transactionInfo.getConnectionId()).getContext();
        this.broker.forgetTransaction(this.context, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRecoverTransactions(TransactionInfo transactionInfo) throws Exception {
        this.context = lookupConnectionState(transactionInfo.getConnectionId()).getContext();
        return new DataArrayResponse(this.broker.getPreparedTransactions(this.context));
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processMessage(Message message) throws Exception {
        ProducerBrokerExchange producerBrokerExchange = getProducerBrokerExchange(message.getProducerId());
        if (!producerBrokerExchange.canDispatch(message)) {
            return null;
        }
        this.broker.send(producerBrokerExchange, message);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processMessageAck(MessageAck messageAck) throws Exception {
        ConsumerBrokerExchange consumerBrokerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
        if (consumerBrokerExchange != null) {
            this.broker.acknowledge(consumerBrokerExchange, messageAck);
            return null;
        }
        if (!messageAck.isInTransaction()) {
            return null;
        }
        LOG.warn("no matching consumer, ignoring ack {}", consumerBrokerExchange, messageAck);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processMessagePull(MessagePull messagePull) throws Exception {
        return this.broker.messagePull(lookupConnectionState(messagePull.getConsumerId()).getContext(), messagePull);
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processMessageDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        this.broker.processDispatchNotification(messageDispatchNotification);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processAddDestination(DestinationInfo destinationInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(destinationInfo.getConnectionId());
        this.broker.addDestinationInfo(lookupConnectionState.getContext(), destinationInfo);
        if (!destinationInfo.getDestination().isTemporary()) {
            return null;
        }
        lookupConnectionState.addTempDestination(destinationInfo);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRemoveDestination(DestinationInfo destinationInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(destinationInfo.getConnectionId());
        this.broker.removeDestinationInfo(lookupConnectionState.getContext(), destinationInfo);
        if (!destinationInfo.getDestination().isTemporary()) {
            return null;
        }
        lookupConnectionState.removeTempDestination(destinationInfo.getDestination());
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processAddProducer(ProducerInfo producerInfo) throws Exception {
        SessionId parentId = producerInfo.getProducerId().getParentId();
        ConnectionId parentId2 = parentId.getParentId();
        TransportConnectionState lookupConnectionState = lookupConnectionState(parentId2);
        if (lookupConnectionState == null) {
            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + parentId2);
        }
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + parentId);
        }
        if (sessionState.getProducerIds().contains(producerInfo.getProducerId())) {
            return null;
        }
        ActiveMQDestination destination = producerInfo.getDestination();
        if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination) && getProducerCount(parentId2) >= this.connector.getMaximumProducersAllowedPerConnection()) {
            throw new IllegalStateException("Can't add producer on connection " + parentId2 + ": at maximum limit: " + this.connector.getMaximumProducersAllowedPerConnection());
        }
        this.broker.addProducer(lookupConnectionState.getContext(), producerInfo);
        try {
            sessionState.addProducer(producerInfo);
            return null;
        } catch (IllegalStateException e) {
            this.broker.removeProducer(lookupConnectionState.getContext(), producerInfo);
            return null;
        }
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRemoveProducer(ProducerId producerId) throws Exception {
        SessionId parentId = producerId.getParentId();
        TransportConnectionState lookupConnectionState = lookupConnectionState(parentId.getParentId());
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + parentId);
        }
        ProducerState removeProducer = sessionState.removeProducer(producerId);
        if (removeProducer == null) {
            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + producerId);
        }
        removeProducerBrokerExchange(producerId);
        this.broker.removeProducer(lookupConnectionState.getContext(), removeProducer.getInfo());
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processAddConsumer(ConsumerInfo consumerInfo) throws Exception {
        SessionId parentId = consumerInfo.getConsumerId().getParentId();
        ConnectionId parentId2 = parentId.getParentId();
        TransportConnectionState lookupConnectionState = lookupConnectionState(parentId2);
        if (lookupConnectionState == null) {
            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + parentId2);
        }
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException(this.broker.getBrokerName() + " Cannot add a consumer to a session that had not been registered: " + parentId);
        }
        if (sessionState.getConsumerIds().contains(consumerInfo.getConsumerId())) {
            return null;
        }
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination) && getConsumerCount(parentId2) >= this.connector.getMaximumConsumersAllowedPerConnection()) {
            throw new IllegalStateException("Can't add consumer on connection " + parentId2 + ": at maximum limit: " + this.connector.getMaximumConsumersAllowedPerConnection());
        }
        this.broker.addConsumer(lookupConnectionState.getContext(), consumerInfo);
        try {
            sessionState.addConsumer(consumerInfo);
            addConsumerBrokerExchange(consumerInfo.getConsumerId());
            return null;
        } catch (IllegalStateException e) {
            this.broker.removeConsumer(lookupConnectionState.getContext(), consumerInfo);
            return null;
        }
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRemoveConsumer(ConsumerId consumerId, long j) throws Exception {
        SessionId parentId = consumerId.getParentId();
        ConnectionId parentId2 = parentId.getParentId();
        TransportConnectionState lookupConnectionState = lookupConnectionState(parentId2);
        if (lookupConnectionState == null) {
            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " + parentId2);
        }
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + parentId);
        }
        ConsumerState removeConsumer = sessionState.removeConsumer(consumerId);
        if (removeConsumer == null) {
            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + consumerId);
        }
        removeConsumer.getInfo().setLastDeliveredSequenceId(j);
        this.broker.removeConsumer(lookupConnectionState.getContext(), removeConsumer.getInfo());
        removeConsumerBrokerExchange(consumerId);
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processAddSession(SessionInfo sessionInfo) throws Exception {
        TransportConnectionState lookupConnectionState = lookupConnectionState(sessionInfo.getSessionId().getParentId());
        if (lookupConnectionState == null || lookupConnectionState.getSessionIds().contains(sessionInfo.getSessionId())) {
            return null;
        }
        this.broker.addSession(lookupConnectionState.getContext(), sessionInfo);
        try {
            lookupConnectionState.addSession(sessionInfo);
            return null;
        } catch (IllegalStateException e) {
            e.printStackTrace();
            this.broker.removeSession(lookupConnectionState.getContext(), sessionInfo);
            return null;
        }
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processRemoveSession(SessionId sessionId, long j) throws Exception {
        ConnectionId parentId = sessionId.getParentId();
        TransportConnectionState lookupConnectionState = lookupConnectionState(parentId);
        if (lookupConnectionState == null) {
            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + parentId);
        }
        SessionState sessionState = lookupConnectionState.getSessionState(sessionId);
        if (sessionState == null) {
            throw new IllegalStateException("Cannot remove session that had not been registered: " + sessionId);
        }
        sessionState.shutdown();
        for (ConsumerId consumerId : sessionState.getConsumerIds()) {
            try {
                processRemoveConsumer(consumerId, j);
            } catch (Throwable th) {
                LOG.warn("Failed to remove consumer: {}", consumerId, th);
            }
        }
        for (ProducerId producerId : sessionState.getProducerIds()) {
            try {
                processRemoveProducer(producerId);
            } catch (Throwable th2) {
                LOG.warn("Failed to remove producer: {}", producerId, th2);
            }
        }
        lookupConnectionState.removeSession(sessionId);
        this.broker.removeSession(lookupConnectionState.getContext(), sessionState.getInfo());
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processAddConnection(ConnectionInfo connectionInfo) throws Exception {
        TransportConnectionState transportConnectionState;
        if (this.wireFormatInfo != null && this.wireFormatInfo.getVersion() <= 2) {
            connectionInfo.setClientMaster(true);
        }
        synchronized (this.brokerConnectionStates) {
            transportConnectionState = (TransportConnectionState) this.brokerConnectionStates.get(connectionInfo.getConnectionId());
            if (transportConnectionState == null) {
                transportConnectionState = new TransportConnectionState(connectionInfo, this);
                this.brokerConnectionStates.put(connectionInfo.getConnectionId(), transportConnectionState);
            }
            transportConnectionState.incrementReference();
        }
        synchronized (transportConnectionState.getConnectionMutex()) {
            if (transportConnectionState.getConnection() != this) {
                LOG.debug("Killing previous stale connection: {}", transportConnectionState.getConnection().getRemoteAddress());
                transportConnectionState.getConnection().stop();
                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), transportConnectionState.getConnection().getRemoteAddress());
                transportConnectionState.setConnection(this);
                transportConnectionState.reset(connectionInfo);
            }
        }
        registerConnectionState(connectionInfo.getConnectionId(), transportConnectionState);
        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", connectionInfo.getConnectionId(), getRemoteAddress(), connectionInfo);
        this.faultTolerantConnection = connectionInfo.isFaultTolerant();
        String clientId = connectionInfo.getClientId();
        this.context = new ConnectionContext();
        this.context.setBroker(this.broker);
        this.context.setClientId(clientId);
        this.context.setClientMaster(connectionInfo.isClientMaster());
        this.context.setConnection(this);
        this.context.setConnectionId(connectionInfo.getConnectionId());
        this.context.setConnector(this.connector);
        this.context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
        this.context.setNetworkConnection(this.networkConnection);
        this.context.setFaultTolerant(this.faultTolerantConnection);
        this.context.setTransactions(new ConcurrentHashMap<>());
        this.context.setUserName(connectionInfo.getUserName());
        this.context.setWireFormatInfo(this.wireFormatInfo);
        this.context.setReconnect(connectionInfo.isFailoverReconnect());
        this.manageable = connectionInfo.isManageable();
        this.context.setConnectionState(transportConnectionState);
        transportConnectionState.setContext(this.context);
        transportConnectionState.setConnection(this);
        if (connectionInfo.getClientIp() == null) {
            connectionInfo.setClientIp(getRemoteAddress());
        }
        try {
            this.broker.addConnection(this.context, connectionInfo);
            if (!connectionInfo.isManageable()) {
                return null;
            }
            ConnectionControl connectionControl = this.connector.getConnectionControl();
            connectionControl.setFaultTolerant(this.broker.isFaultTolerantConfiguration());
            if (connectionInfo.isFailoverReconnect()) {
                connectionControl.setRebalanceConnection(false);
            }
            dispatchAsync(connectionControl);
            return null;
        } catch (Exception e) {
            synchronized (this.brokerConnectionStates) {
                this.brokerConnectionStates.remove(connectionInfo.getConnectionId());
                unregisterConnectionState(connectionInfo.getConnectionId());
                LOG.warn("Failed to add Connection {}", connectionInfo.getConnectionId(), e);
                if (e instanceof SecurityException) {
                    delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
                }
                throw e;
            }
        }
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public synchronized Response processRemoveConnection(ConnectionId connectionId, long j) throws InterruptedException {
        LOG.debug("remove connection id: {}", connectionId);
        TransportConnectionState lookupConnectionState = lookupConnectionState(connectionId);
        if (lookupConnectionState == null) {
            return null;
        }
        lookupConnectionState.shutdown();
        for (SessionId sessionId : lookupConnectionState.getSessionIds()) {
            try {
                processRemoveSession(sessionId, j);
            } catch (Throwable th) {
                SERVICELOG.warn("Failed to remove session {}", sessionId, th);
            }
        }
        Iterator<DestinationInfo> it = lookupConnectionState.getTempDestinations().iterator();
        while (it.hasNext()) {
            DestinationInfo next = it.next();
            try {
                this.broker.removeDestination(lookupConnectionState.getContext(), next.getDestination(), 0L);
            } catch (Throwable th2) {
                SERVICELOG.warn("Failed to remove tmp destination {}", next.getDestination(), th2);
            }
            it.remove();
        }
        try {
            this.broker.removeConnection(lookupConnectionState.getContext(), lookupConnectionState.getInfo(), this.transportException.get());
        } catch (Throwable th3) {
            SERVICELOG.warn("Failed to remove connection {}", lookupConnectionState.getInfo(), th3);
        }
        TransportConnectionState unregisterConnectionState = unregisterConnectionState(connectionId);
        if (unregisterConnectionState == null) {
            return null;
        }
        synchronized (this.brokerConnectionStates) {
            if (unregisterConnectionState.decrementReference() == 0) {
                this.brokerConnectionStates.remove(connectionId);
            }
        }
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processProducerAck(ProducerAck producerAck) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.broker.Connection
    public Connector getConnector() {
        return this.connector;
    }

    @Override // org.apache.activemq.broker.Connection
    public void dispatchSync(Command command) {
        try {
            processDispatch(command);
        } catch (IOException e) {
            serviceExceptionAsync(e);
        }
    }

    @Override // org.apache.activemq.broker.Connection
    public void dispatchAsync(Command command) {
        if (this.stopping.get()) {
            if (command.isMessageDispatch()) {
                MessageDispatch messageDispatch = (MessageDispatch) command;
                TransmitCallback transmitCallback = messageDispatch.getTransmitCallback();
                this.broker.postProcessDispatch(messageDispatch);
                if (transmitCallback != null) {
                    transmitCallback.onFailure();
                    return;
                }
                return;
            }
            return;
        }
        if (this.taskRunner == null) {
            dispatchSync(command);
            return;
        }
        synchronized (this.dispatchQueue) {
            this.dispatchQueue.add(command);
        }
        try {
            this.taskRunner.wakeup();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected void processDispatch(Command command) throws IOException {
        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
        try {
            try {
                if (!this.stopping.get()) {
                    if (messageDispatch != null) {
                        this.broker.preProcessDispatch(messageDispatch);
                    }
                    dispatch(command);
                }
                if (messageDispatch != null) {
                    TransmitCallback transmitCallback = messageDispatch.getTransmitCallback();
                    this.broker.postProcessDispatch(messageDispatch);
                    if (transmitCallback != null) {
                        transmitCallback.onSuccess();
                    }
                }
            } catch (IOException e) {
                if (messageDispatch != null) {
                    TransmitCallback transmitCallback2 = messageDispatch.getTransmitCallback();
                    this.broker.postProcessDispatch(messageDispatch);
                    if (transmitCallback2 != null) {
                        transmitCallback2.onFailure();
                    }
                    throw e;
                }
                if (messageDispatch != null) {
                    TransmitCallback transmitCallback3 = messageDispatch.getTransmitCallback();
                    this.broker.postProcessDispatch(messageDispatch);
                    if (transmitCallback3 != null) {
                        transmitCallback3.onSuccess();
                    }
                }
            }
        } catch (Throwable th) {
            if (messageDispatch != null) {
                TransmitCallback transmitCallback4 = messageDispatch.getTransmitCallback();
                this.broker.postProcessDispatch(messageDispatch);
                if (transmitCallback4 != null) {
                    transmitCallback4.onSuccess();
                }
            }
            throw th;
        }
    }

    @Override // org.apache.activemq.thread.Task
    public boolean iterate() {
        try {
            if (this.pendingStop || this.stopping.get()) {
                if (!this.dispatchStopped.compareAndSet(false, true)) {
                    return false;
                }
                if (this.transportException.get() == null) {
                    try {
                        dispatch(new ShutdownInfo());
                    } catch (Throwable th) {
                    }
                }
                this.dispatchStoppedLatch.countDown();
                return false;
            }
            if (this.dispatchStopped.get()) {
                return false;
            }
            synchronized (this.dispatchQueue) {
                if (this.dispatchQueue.isEmpty()) {
                    return false;
                }
                processDispatch(this.dispatchQueue.remove(0));
                return true;
            }
        } catch (IOException e) {
            if (this.dispatchStopped.compareAndSet(false, true)) {
                this.dispatchStoppedLatch.countDown();
            }
            serviceExceptionAsync(e);
            return false;
        }
    }

    @Override // org.apache.activemq.broker.Connection
    public ConnectionStatistics getStatistics() {
        return this.statistics;
    }

    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
        return this.messageAuthorizationPolicy;
    }

    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
    }

    @Override // org.apache.activemq.broker.Connection
    public boolean isManageable() {
        return this.manageable;
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        try {
            try {
                synchronized (this) {
                    this.starting = true;
                    if (this.taskRunnerFactory != null) {
                        this.taskRunner = this.taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress());
                    } else {
                        this.taskRunner = null;
                    }
                    this.transport.start();
                    this.active = true;
                    BrokerInfo copy = this.connector.getBrokerInfo().copy();
                    if (this.connector.isUpdateClusterClients()) {
                        copy.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
                    } else {
                        copy.setPeerBrokerInfos(null);
                    }
                    dispatchAsync(copy);
                    this.connector.onStarted(this);
                }
            } catch (Exception e) {
                this.pendingStop = true;
                throw e;
            }
        } finally {
            setStarting(false);
            if (isPendingStop()) {
                LOG.debug("Calling the delayed stop() after start() {}", this);
                stop();
            }
        }
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        stopAsync();
        while (!this.stopped.await(5L, TimeUnit.SECONDS)) {
            LOG.info("The connection to '{}' is taking a long time to shutdown.", this.transport.getRemoteAddress());
        }
    }

    public void delayedStop(final int i, final String str, Throwable th) {
        if (i > 0) {
            synchronized (this) {
                this.pendingStop = true;
                this.transportException.set(th);
            }
            try {
                this.stopTaskRunnerFactory.execute(new Runnable() { // from class: org.apache.activemq.broker.TransportConnection.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            Thread.sleep(i);
                            TransportConnection.this.stopAsync();
                            TransportConnection.LOG.info("Stopping {} because {}", TransportConnection.this.transport.getRemoteAddress(), str);
                        } catch (InterruptedException e) {
                        }
                    }
                });
            } catch (Throwable th2) {
                LOG.warn("Cannot create stopAsync. This exception will be ignored.", th2);
            }
        }
    }

    public void stopAsync(Throwable th) {
        this.transportException.set(th);
        stopAsync();
    }

    public void stopAsync() {
        synchronized (this) {
            this.pendingStop = true;
            if (this.starting) {
                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
                return;
            }
            if (this.stopping.compareAndSet(false, true)) {
                Iterator<TransportConnectionState> it = listConnectionStates().iterator();
                while (it.hasNext()) {
                    ConnectionContext context = it.next().getContext();
                    if (context != null) {
                        context.getStopping().set(true);
                    }
                }
                try {
                    this.stopTaskRunnerFactory.execute(new Runnable() { // from class: org.apache.activemq.broker.TransportConnection.4
                        @Override // java.lang.Runnable
                        public void run() {
                            TransportConnection.this.serviceLock.writeLock().lock();
                            try {
                                try {
                                    TransportConnection.this.doStop();
                                    TransportConnection.this.stopped.countDown();
                                    TransportConnection.this.serviceLock.writeLock().unlock();
                                } catch (Throwable th) {
                                    TransportConnection.LOG.debug("Error occurred while shutting down a connection {}", this, th);
                                    TransportConnection.this.stopped.countDown();
                                    TransportConnection.this.serviceLock.writeLock().unlock();
                                }
                            } catch (Throwable th2) {
                                TransportConnection.this.stopped.countDown();
                                TransportConnection.this.serviceLock.writeLock().unlock();
                                throw th2;
                            }
                        }
                    });
                } catch (Throwable th) {
                    LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", th);
                    this.stopped.countDown();
                }
            }
        }
    }

    public String toString() {
        return "Transport Connection to: " + this.transport.getRemoteAddress();
    }

    protected void doStop() throws Exception {
        LOG.debug("Stopping connection: {}", this.transport.getRemoteAddress());
        this.connector.onStopped(this);
        try {
            synchronized (this) {
                if (this.duplexBridge != null) {
                    this.duplexBridge.stop();
                }
            }
        } catch (Exception e) {
            LOG.trace("Exception caught stopping. This exception is ignored.", (Throwable) e);
        }
        try {
            this.transport.stop();
            LOG.debug("Stopped transport: {}", this.transport.getRemoteAddress());
        } catch (Exception e2) {
            LOG.debug("Could not stop transport to {}. This exception is ignored.", this.transport.getRemoteAddress(), e2);
        }
        if (this.taskRunner != null) {
            this.taskRunner.shutdown(1L);
            this.taskRunner = null;
        }
        this.active = false;
        synchronized (this.dispatchQueue) {
            for (Command command : this.dispatchQueue) {
                if (command.isMessageDispatch()) {
                    MessageDispatch messageDispatch = (MessageDispatch) command;
                    TransmitCallback transmitCallback = messageDispatch.getTransmitCallback();
                    this.broker.postProcessDispatch(messageDispatch);
                    if (transmitCallback != null) {
                        transmitCallback.onFailure();
                    }
                }
            }
            this.dispatchQueue.clear();
        }
        if (!this.broker.isStopped()) {
            listConnectionStates();
            for (TransportConnectionState transportConnectionState : listConnectionStates()) {
                transportConnectionState.getContext().getStopping().set(true);
                try {
                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
                    processRemoveConnection(transportConnectionState.getInfo().getConnectionId(), 0L);
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        }
        LOG.debug("Connection Stopped: {}", getRemoteAddress());
    }

    public boolean isBlockedCandidate() {
        return this.blockedCandidate;
    }

    public void setBlockedCandidate(boolean z) {
        this.blockedCandidate = z;
    }

    public boolean isMarkedCandidate() {
        return this.markedCandidate;
    }

    public void setMarkedCandidate(boolean z) {
        this.markedCandidate = z;
        if (z) {
            return;
        }
        this.timeStamp = 0L;
        this.blockedCandidate = false;
    }

    public void setSlow(boolean z) {
        this.slow = z;
    }

    @Override // org.apache.activemq.broker.Connection
    public boolean isSlow() {
        return this.slow;
    }

    public boolean isMarkedBlockedCandidate() {
        return this.markedCandidate;
    }

    public void doMark() {
        if (this.timeStamp == 0) {
            this.timeStamp = System.currentTimeMillis();
        }
    }

    @Override // org.apache.activemq.broker.Connection
    public boolean isBlocked() {
        return this.blocked;
    }

    @Override // org.apache.activemq.broker.Connection
    public boolean isConnected() {
        return this.connected;
    }

    public void setBlocked(boolean z) {
        this.blocked = z;
    }

    public void setConnected(boolean z) {
        this.connected = z;
    }

    @Override // org.apache.activemq.broker.Connection
    public boolean isActive() {
        return this.active;
    }

    public void setActive(boolean z) {
        this.active = z;
    }

    public synchronized boolean isStarting() {
        return this.starting;
    }

    @Override // org.apache.activemq.broker.Connection
    public synchronized boolean isNetworkConnection() {
        return this.networkConnection;
    }

    @Override // org.apache.activemq.broker.Connection
    public boolean isFaultTolerantConnection() {
        return this.faultTolerantConnection;
    }

    protected synchronized void setStarting(boolean z) {
        this.starting = z;
    }

    public synchronized boolean isPendingStop() {
        return this.pendingStop;
    }

    protected synchronized void setPendingStop(boolean z) {
        this.pendingStop = z;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processBrokerInfo(BrokerInfo brokerInfo) {
        if (brokerInfo.isSlaveBroker()) {
            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", brokerInfo.getBrokerName());
        } else if (brokerInfo.isNetworkConnection() && brokerInfo.isDuplexConnection()) {
            try {
                HashMap<String, String> createMap = createMap(MarshallingSupport.stringToProperties(brokerInfo.getNetworkProperties()));
                NetworkBridgeConfiguration networkBridgeConfiguration = new NetworkBridgeConfiguration();
                IntrospectionSupport.setProperties(networkBridgeConfiguration, createMap, Stomp.EMPTY);
                networkBridgeConfiguration.setBrokerName(this.broker.getBrokerName());
                String str = networkBridgeConfiguration.getName() + "@" + brokerInfo.getBrokerId();
                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
                synchronized (connections) {
                    Iterator<TransportConnection> it = connections.iterator();
                    while (it.hasNext()) {
                        TransportConnection next = it.next();
                        if (next != this && str.equals(next.getDuplexNetworkConnectorId())) {
                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", next, str);
                            next.stopAsync();
                            next.getStopped().await(1L, TimeUnit.SECONDS);
                        }
                    }
                    setDuplexNetworkConnectorId(str);
                }
                Transport createLocalTransport = NetworkBridgeFactory.createLocalTransport(this.broker);
                Transport transport = this.transport;
                if (!(transport instanceof ResponseCorrelator)) {
                    transport = new ResponseCorrelator(transport);
                }
                String obj = createLocalTransport.toString();
                if (obj.contains("#")) {
                    obj = obj.substring(obj.lastIndexOf("#"));
                }
                MBeanNetworkListener mBeanNetworkListener = new MBeanNetworkListener(this.broker.getBrokerService(), networkBridgeConfiguration, this.broker.getBrokerService().createDuplexNetworkConnectorObjectName(obj));
                mBeanNetworkListener.setCreatedByDuplex(true);
                this.duplexBridge = NetworkBridgeFactory.createBridge(networkBridgeConfiguration, createLocalTransport, transport, mBeanNetworkListener);
                this.duplexBridge.setBrokerService(this.broker.getBrokerService());
                brokerInfo.setDuplexConnection(false);
                this.duplexBridge.setCreatedByDuplex(true);
                this.duplexBridge.duplexStart(this, this.brokerInfo, brokerInfo);
                LOG.info("Started responder end of duplex bridge {}", str);
                return null;
            } catch (TransportDisposedIOException e) {
                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", this.duplexNetworkConnectorId);
                return null;
            } catch (Exception e2) {
                LOG.error("Failed to create responder end of duplex network bridge {}", this.duplexNetworkConnectorId, e2);
                return null;
            }
        }
        if (this.brokerInfo != null) {
            LOG.warn("Unexpected extra broker info command received: {}", brokerInfo);
        }
        this.brokerInfo = brokerInfo;
        this.networkConnection = true;
        Iterator<TransportConnectionState> it2 = listConnectionStates().iterator();
        while (it2.hasNext()) {
            it2.next().getContext().setNetworkConnection(true);
        }
        return null;
    }

    private HashMap<String, String> createMap(Properties properties) {
        return new HashMap<>(properties);
    }

    protected void dispatch(Command command) throws IOException {
        try {
            setMarkedCandidate(true);
            this.transport.oneway(command);
            setMarkedCandidate(false);
        } catch (Throwable th) {
            setMarkedCandidate(false);
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.Connection
    public String getRemoteAddress() {
        return this.transport.getRemoteAddress();
    }

    public Transport getTransport() {
        return this.transport;
    }

    @Override // org.apache.activemq.broker.Connection
    public String getConnectionId() {
        Iterator<TransportConnectionState> it = listConnectionStates().iterator();
        if (!it.hasNext()) {
            return null;
        }
        TransportConnectionState next = it.next();
        return next.getInfo().getClientId() != null ? next.getInfo().getClientId() : next.getInfo().getConnectionId().toString();
    }

    @Override // org.apache.activemq.broker.Connection
    public void updateClient(ConnectionControl connectionControl) {
        if (!isActive() || isBlocked() || !isFaultTolerantConnection() || this.wireFormatInfo == null || this.wireFormatInfo.getVersion() < 6) {
            return;
        }
        dispatchAsync(connectionControl);
    }

    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo) {
        ProducerBrokerExchange producerBrokerExchange = null;
        if (producerInfo != null && producerInfo.getProducerId() != null) {
            synchronized (this.producerExchanges) {
                producerBrokerExchange = this.producerExchanges.get(producerInfo.getProducerId());
            }
        }
        return producerBrokerExchange;
    }

    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId producerId) throws IOException {
        ProducerBrokerExchange producerBrokerExchange = this.producerExchanges.get(producerId);
        if (producerBrokerExchange == null) {
            synchronized (this.producerExchanges) {
                producerBrokerExchange = new ProducerBrokerExchange();
                TransportConnectionState lookupConnectionState = lookupConnectionState(producerId);
                this.context = lookupConnectionState.getContext();
                producerBrokerExchange.setConnectionContext(this.context);
                if (this.context.isReconnect() || (this.context.isNetworkConnection() && this.connector.isAuditNetworkProducers())) {
                    producerBrokerExchange.setLastStoredSequenceId(this.broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(producerId));
                }
                SessionState sessionState = lookupConnectionState.getSessionState(producerId.getParentId());
                if (sessionState != null) {
                    producerBrokerExchange.setProducerState(sessionState.getProducerState(producerId));
                    ProducerState producerState = sessionState.getProducerState(producerId);
                    if (producerState != null && producerState.getInfo() != null) {
                        ProducerInfo info = producerState.getInfo();
                        producerBrokerExchange.setMutable(info.getDestination() == null || info.getDestination().isComposite());
                    }
                }
                this.producerExchanges.put(producerId, producerBrokerExchange);
            }
        } else {
            this.context = producerBrokerExchange.getConnectionContext();
        }
        return producerBrokerExchange;
    }

    private void removeProducerBrokerExchange(ProducerId producerId) {
        synchronized (this.producerExchanges) {
            this.producerExchanges.remove(producerId);
        }
    }

    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId consumerId) {
        return this.consumerExchanges.get(consumerId);
    }

    private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId consumerId) {
        ConsumerState consumerState;
        ConsumerInfo info;
        ConsumerBrokerExchange consumerBrokerExchange = this.consumerExchanges.get(consumerId);
        if (consumerBrokerExchange == null) {
            synchronized (this.consumerExchanges) {
                consumerBrokerExchange = new ConsumerBrokerExchange();
                TransportConnectionState lookupConnectionState = lookupConnectionState(consumerId);
                this.context = lookupConnectionState.getContext();
                consumerBrokerExchange.setConnectionContext(this.context);
                SessionState sessionState = lookupConnectionState.getSessionState(consumerId.getParentId());
                if (sessionState != null && (consumerState = sessionState.getConsumerState(consumerId)) != null && (info = consumerState.getInfo()) != null && info.getDestination() != null && info.getDestination().isPattern()) {
                    consumerBrokerExchange.setWildcard(true);
                }
                this.consumerExchanges.put(consumerId, consumerBrokerExchange);
            }
        }
        return consumerBrokerExchange;
    }

    private void removeConsumerBrokerExchange(ConsumerId consumerId) {
        synchronized (this.consumerExchanges) {
            this.consumerExchanges.remove(consumerId);
        }
    }

    public int getProtocolVersion() {
        return this.protocolVersion.get();
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processControlCommand(ControlCommand controlCommand) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processMessageDispatch(MessageDispatch messageDispatch) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
        if (connectionControl == null) {
            return null;
        }
        this.faultTolerantConnection = connectionControl.isFaultTolerant();
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processConnectionError(ConnectionError connectionError) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.state.CommandVisitor
    public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
        this.broker.processConsumerControl(getConsumerBrokerExchange(consumerControl.getConsumerId()), consumerControl);
        return null;
    }

    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState transportConnectionState) {
        if (!this.connectionStateRegister.isEmpty() && !this.connectionStateRegister.doesHandleMultipleConnectionStates()) {
            MapTransportConnectionStateRegister mapTransportConnectionStateRegister = new MapTransportConnectionStateRegister();
            mapTransportConnectionStateRegister.intialize(this.connectionStateRegister);
            this.connectionStateRegister = mapTransportConnectionStateRegister;
        }
        return this.connectionStateRegister.registerConnectionState(connectionId, transportConnectionState);
    }

    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
        return this.connectionStateRegister.unregisterConnectionState(connectionId);
    }

    protected synchronized List<TransportConnectionState> listConnectionStates() {
        return this.connectionStateRegister.listConnectionStates();
    }

    protected synchronized TransportConnectionState lookupConnectionState(String str) {
        return this.connectionStateRegister.lookupConnectionState(str);
    }

    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId consumerId) {
        return this.connectionStateRegister.lookupConnectionState(consumerId);
    }

    protected synchronized TransportConnectionState lookupConnectionState(ProducerId producerId) {
        return this.connectionStateRegister.lookupConnectionState(producerId);
    }

    protected synchronized TransportConnectionState lookupConnectionState(SessionId sessionId) {
        return this.connectionStateRegister.lookupConnectionState(sessionId);
    }

    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
        return this.connectionStateRegister.lookupConnectionState(connectionId);
    }

    protected synchronized void setDuplexNetworkConnectorId(String str) {
        this.duplexNetworkConnectorId = str;
    }

    protected synchronized String getDuplexNetworkConnectorId() {
        return this.duplexNetworkConnectorId;
    }

    public boolean isStopping() {
        return this.stopping.get();
    }

    protected CountDownLatch getStopped() {
        return this.stopped;
    }

    private int getProducerCount(ConnectionId connectionId) {
        int i = 0;
        TransportConnectionState lookupConnectionState = lookupConnectionState(connectionId);
        if (lookupConnectionState != null) {
            Iterator<SessionId> it = lookupConnectionState.getSessionIds().iterator();
            while (it.hasNext()) {
                SessionState sessionState = lookupConnectionState.getSessionState(it.next());
                if (sessionState != null) {
                    i += sessionState.getProducerIds().size();
                }
            }
        }
        return i;
    }

    private int getConsumerCount(ConnectionId connectionId) {
        int i = 0;
        TransportConnectionState lookupConnectionState = lookupConnectionState(connectionId);
        if (lookupConnectionState != null) {
            Iterator<SessionId> it = lookupConnectionState.getSessionIds().iterator();
            while (it.hasNext()) {
                SessionState sessionState = lookupConnectionState.getSessionState(it.next());
                if (sessionState != null) {
                    i += sessionState.getConsumerIds().size();
                }
            }
        }
        return i;
    }

    public WireFormatInfo getRemoteWireFormatInfo() {
        return this.wireFormatInfo;
    }
}
