package org.apache.activemq.broker.ft;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/activemq-web-5.3.1-fuse-00-00.jar:org/apache/activemq/broker/ft/MasterBroker.class
 */
/* loaded from: input_file:WEB-INF/lib/activemq-core-5.3.1-fuse-00-00.jar:org/apache/activemq/broker/ft/MasterBroker.class */
public class MasterBroker extends InsertableMutableBrokerFilter {
    private static final Log LOG = LogFactory.getLog(MasterBroker.class);
    private Transport slave;
    private AtomicBoolean started;
    private Map<ConsumerId, ConsumerId> consumers;

    public MasterBroker(MutableBrokerFilter mutableBrokerFilter, Transport transport) {
        super(mutableBrokerFilter);
        this.started = new AtomicBoolean(false);
        this.consumers = new ConcurrentHashMap();
        this.slave = transport;
        this.slave = new MutexTransport(this.slave);
        this.slave = new ResponseCorrelator(this.slave);
        this.slave.setTransportListener(transport.getTransportListener());
    }

    public void startProcessing() {
        this.started.set(true);
        try {
            Connection[] clients = getClients();
            ConnectionControl connectionControl = new ConnectionControl();
            connectionControl.setFaultTolerant(true);
            if (clients != null) {
                for (int i = 0; i < clients.length; i++) {
                    if (clients[i].isActive() && clients[i].isManageable()) {
                        clients[i].dispatchAsync(connectionControl);
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to get Connections", e);
        }
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.Service
    public void stop() throws Exception {
        stopProcessing();
    }

    public void stopProcessing() {
        if (this.started.compareAndSet(true, false)) {
            remove();
        }
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void addConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo) throws Exception {
        super.addConnection(connectionContext, connectionInfo);
        sendAsyncToSlave(connectionInfo);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void removeConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo, Throwable th) throws Exception {
        super.removeConnection(connectionContext, connectionInfo, th);
        sendAsyncToSlave(new RemoveInfo(connectionInfo.getConnectionId()));
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void addSession(ConnectionContext connectionContext, SessionInfo sessionInfo) throws Exception {
        super.addSession(connectionContext, sessionInfo);
        sendAsyncToSlave(sessionInfo);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void removeSession(ConnectionContext connectionContext, SessionInfo sessionInfo) throws Exception {
        super.removeSession(connectionContext, sessionInfo);
        sendAsyncToSlave(new RemoveInfo(sessionInfo.getSessionId()));
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker, org.apache.activemq.broker.region.Region
    public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        super.addProducer(connectionContext, producerInfo);
        sendAsyncToSlave(producerInfo);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker, org.apache.activemq.broker.region.Region
    public void removeProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        super.removeProducer(connectionContext, producerInfo);
        sendAsyncToSlave(new RemoveInfo(producerInfo.getProducerId()));
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.region.Region
    public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        sendSyncToSlave(consumerInfo);
        this.consumers.put(consumerInfo.getConsumerId(), consumerInfo.getConsumerId());
        return super.addConsumer(connectionContext, consumerInfo);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.region.Region
    public void removeConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        super.removeConsumer(connectionContext, consumerInfo);
        this.consumers.remove(consumerInfo.getConsumerId());
        sendSyncToSlave(new RemoveInfo(consumerInfo.getConsumerId()));
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.region.Region
    public void removeSubscription(ConnectionContext connectionContext, RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception {
        super.removeSubscription(connectionContext, removeSubscriptionInfo);
        sendAsyncToSlave(removeSubscriptionInfo);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void addDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception {
        super.addDestinationInfo(connectionContext, destinationInfo);
        if (destinationInfo.getDestination().isTemporary()) {
            sendAsyncToSlave(destinationInfo);
        }
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void removeDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception {
        super.removeDestinationInfo(connectionContext, destinationInfo);
        if (destinationInfo.getDestination().isTemporary()) {
            sendAsyncToSlave(destinationInfo);
        }
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void beginTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        sendAsyncToSlave(new TransactionInfo(connectionContext.getConnectionId(), transactionId, (byte) 0));
        super.beginTransaction(connectionContext, transactionId);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public int prepareTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        sendSyncToSlave(new TransactionInfo(connectionContext.getConnectionId(), transactionId, (byte) 1));
        return super.prepareTransaction(connectionContext, transactionId);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void rollbackTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        sendAsyncToSlave(new TransactionInfo(connectionContext.getConnectionId(), transactionId, (byte) 4));
        super.rollbackTransaction(connectionContext, transactionId);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
        sendSyncToSlave(new TransactionInfo(connectionContext.getConnectionId(), transactionId, (byte) 2));
        super.commitTransaction(connectionContext, transactionId, z);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void forgetTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        sendAsyncToSlave(new TransactionInfo(connectionContext.getConnectionId(), transactionId, (byte) 6));
        super.forgetTransaction(connectionContext, transactionId);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        super.preProcessDispatch(messageDispatch);
        MessageDispatchNotification messageDispatchNotification = new MessageDispatchNotification();
        messageDispatchNotification.setConsumerId(messageDispatch.getConsumerId());
        messageDispatchNotification.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
        messageDispatchNotification.setDestination(messageDispatch.getDestination());
        if (messageDispatch.getMessage() != null) {
            messageDispatchNotification.setMessageId(messageDispatch.getMessage().getMessageId());
            if (this.consumers.containsKey(messageDispatch.getConsumerId())) {
                sendSyncToSlave(messageDispatchNotification);
            }
        }
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.region.Region
    public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
        sendSyncToSlave(message);
        super.send(producerBrokerExchange, message);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.region.Region
    public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
        sendToSlave(messageAck);
        super.acknowledge(consumerBrokerExchange, messageAck);
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public boolean isFaultTolerantConfiguration() {
        return true;
    }

    protected void sendToSlave(Message message) {
        if (message.isResponseRequired()) {
            sendSyncToSlave(message);
        } else {
            sendAsyncToSlave(message);
        }
    }

    protected void sendToSlave(MessageAck messageAck) {
        if (messageAck.isResponseRequired()) {
            sendAsyncToSlave(messageAck);
        } else {
            sendSyncToSlave(messageAck);
        }
    }

    protected void sendAsyncToSlave(Command command) {
        try {
            this.slave.oneway(command);
        } catch (Throwable th) {
            LOG.error("Slave Failed", th);
            stopProcessing();
        }
    }

    protected void sendSyncToSlave(Command command) {
        try {
            Response response = (Response) this.slave.request(command);
            if (response.isException()) {
                LOG.error("Slave Failed", ((ExceptionResponse) response).getException());
            }
        } catch (Throwable th) {
            LOG.error("Slave Failed", th);
        }
    }
}
