package org.apache.activemq.broker.region;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-broker-5.11-SNAPSHOT.jar:org/apache/activemq/broker/region/RegionBroker.class */
public class RegionBroker extends EmptyBroker {
    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    protected DestinationFactory destinationFactory;
    private final Region queueRegion;
    private final Region topicRegion;
    private final Region tempQueueRegion;
    private final Region tempTopicRegion;
    protected final BrokerService brokerService;
    private boolean started;
    private boolean keepDurableSubsActive;
    private BrokerId brokerId;
    private String brokerName;
    private final DestinationInterceptor destinationInterceptor;
    private ConnectionContext adminConnectionContext;
    private final Scheduler scheduler;
    private final ThreadPoolExecutor executor;
    private boolean allowTempAutoCreationOnSend;
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap());
    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<>();
    private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap();
    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap();
    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap();
    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
    private final Map<String, ConnectionContext> clientIdSet = new HashMap();
    private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
    private final Runnable purgeInactiveDestinationsTask = new Runnable() { // from class: org.apache.activemq.broker.region.RegionBroker.1
        @Override // java.lang.Runnable
        public void run() {
            RegionBroker.this.purgeInactiveDestinations();
        }
    };

    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage systemUsage, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor threadPoolExecutor) throws IOException {
        this.brokerService = brokerService;
        this.executor = threadPoolExecutor;
        this.scheduler = scheduler;
        if (destinationFactory == null) {
            throw new IllegalArgumentException("null destinationFactory");
        }
        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
        this.destinationFactory = destinationFactory;
        this.queueRegion = createQueueRegion(systemUsage, taskRunnerFactory, destinationFactory);
        this.topicRegion = createTopicRegion(systemUsage, taskRunnerFactory, destinationFactory);
        this.destinationInterceptor = destinationInterceptor;
        this.tempQueueRegion = createTempQueueRegion(systemUsage, taskRunnerFactory, destinationFactory);
        this.tempTopicRegion = createTempTopicRegion(systemUsage, taskRunnerFactory, destinationFactory);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public Map<ActiveMQDestination, Destination> getDestinationMap() {
        HashMap hashMap = new HashMap(getQueueRegion().getDestinationMap());
        hashMap.putAll(getTopicRegion().getDestinationMap());
        return hashMap;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination activeMQDestination) {
        try {
            return getRegion(activeMQDestination).getDestinationMap();
        } catch (JMSException e) {
            return Collections.emptyMap();
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public Set<Destination> getDestinations(ActiveMQDestination activeMQDestination) {
        try {
            return getRegion(activeMQDestination).getDestinations(activeMQDestination);
        } catch (JMSException e) {
            return Collections.emptySet();
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public Broker getAdaptor(Class cls) {
        if (cls.isInstance(this)) {
            return this;
        }
        return null;
    }

    public Region getQueueRegion() {
        return this.queueRegion;
    }

    public Region getTempQueueRegion() {
        return this.tempQueueRegion;
    }

    public Region getTempTopicRegion() {
        return this.tempTopicRegion;
    }

    public Region getTopicRegion() {
        return this.topicRegion;
    }

    protected Region createTempTopicRegion(SystemUsage systemUsage, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new TempTopicRegion(this, this.destinationStatistics, systemUsage, taskRunnerFactory, destinationFactory);
    }

    protected Region createTempQueueRegion(SystemUsage systemUsage, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new TempQueueRegion(this, this.destinationStatistics, systemUsage, taskRunnerFactory, destinationFactory);
    }

    protected Region createTopicRegion(SystemUsage systemUsage, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new TopicRegion(this, this.destinationStatistics, systemUsage, taskRunnerFactory, destinationFactory);
    }

    protected Region createQueueRegion(SystemUsage systemUsage, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        return new QueueRegion(this, this.destinationStatistics, systemUsage, taskRunnerFactory, destinationFactory);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.Service
    public void start() throws Exception {
        this.started = true;
        this.queueRegion.start();
        this.topicRegion.start();
        this.tempQueueRegion.start();
        this.tempTopicRegion.start();
        int schedulePeriodForDestinationPurge = this.brokerService.getSchedulePeriodForDestinationPurge();
        if (schedulePeriodForDestinationPurge > 0) {
            this.scheduler.executePeriodically(this.purgeInactiveDestinationsTask, schedulePeriodForDestinationPurge);
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.Service
    public void stop() throws Exception {
        this.started = false;
        this.scheduler.cancel(this.purgeInactiveDestinationsTask);
        ServiceStopper serviceStopper = new ServiceStopper();
        doStop(serviceStopper);
        serviceStopper.throwFirstException();
        this.clientIdSet.clear();
        this.connections.clear();
        this.destinations.clear();
        this.brokerInfos.clear();
    }

    public PolicyMap getDestinationPolicy() {
        if (this.brokerService != null) {
            return this.brokerService.getDestinationPolicy();
        }
        return null;
    }

    public ConnectionContext getConnectionContext(String str) {
        return this.clientIdSet.get(str);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void addConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo) throws Exception {
        String clientId = connectionInfo.getClientId();
        if (clientId == null) {
            throw new InvalidClientIDException("No clientID specified for connection request");
        }
        synchronized (this.clientIdSet) {
            ConnectionContext connectionContext2 = this.clientIdSet.get(clientId);
            if (connectionContext2 == null) {
                this.clientIdSet.put(clientId, connectionContext);
            } else {
                if (!connectionContext.isAllowLinkStealing()) {
                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + connectionContext2.getConnection().getRemoteAddress());
                }
                this.clientIdSet.remove(clientId);
                if (connectionContext2.getConnection() != null) {
                    Connection connection = connectionContext2.getConnection();
                    LOG.warn("Stealing link for clientId {} From Connection {}", clientId, connectionContext2.getConnection());
                    if (connection instanceof TransportConnection) {
                        ((TransportConnection) connection).stopAsync();
                    } else {
                        connection.stop();
                    }
                } else {
                    LOG.error("Not Connection for {}", connectionContext2);
                }
            }
        }
        this.connections.add(connectionContext.getConnection());
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void removeConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo, Throwable th) throws Exception {
        String clientId = connectionInfo.getClientId();
        if (clientId == null) {
            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
        }
        synchronized (this.clientIdSet) {
            ConnectionContext connectionContext2 = this.clientIdSet.get(clientId);
            if (connectionContext2 == connectionContext && isEqual(connectionContext2.getConnectionId(), connectionInfo.getConnectionId())) {
                this.clientIdSet.remove(clientId);
            }
        }
        this.connections.remove(connectionContext.getConnection());
    }

    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public Connection[] getClients() throws Exception {
        ArrayList arrayList = new ArrayList(this.connections);
        Connection[] connectionArr = new Connection[arrayList.size()];
        arrayList.toArray(connectionArr);
        return connectionArr;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public Destination addDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, boolean z) throws Exception {
        Destination destination = this.destinations.get(activeMQDestination);
        if (destination != null) {
            return destination;
        }
        synchronized (this.destinationGate) {
            Destination destination2 = this.destinations.get(activeMQDestination);
            if (destination2 != null) {
                return destination2;
            }
            if (this.destinationGate.get(activeMQDestination) != null) {
                while (this.destinationGate.containsKey(activeMQDestination)) {
                    this.destinationGate.wait();
                }
                Destination destination3 = this.destinations.get(activeMQDestination);
                if (destination3 != null) {
                    return destination3;
                }
                this.destinationGate.put(activeMQDestination, activeMQDestination);
            }
            try {
                boolean z2 = true;
                if (activeMQDestination.isTemporary()) {
                    z2 = z;
                }
                Destination addDestination = getRegion(activeMQDestination).addDestination(connectionContext, activeMQDestination, z2);
                this.destinations.put(activeMQDestination, addDestination);
                synchronized (this.destinationGate) {
                    this.destinationGate.remove(activeMQDestination);
                    this.destinationGate.notifyAll();
                }
                return addDestination;
            } catch (Throwable th) {
                synchronized (this.destinationGate) {
                    this.destinationGate.remove(activeMQDestination);
                    this.destinationGate.notifyAll();
                    throw th;
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void removeDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, long j) throws Exception {
        if (this.destinations.containsKey(activeMQDestination)) {
            getRegion(activeMQDestination).removeDestination(connectionContext, activeMQDestination, j);
            this.destinations.remove(activeMQDestination);
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void addDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception {
        addDestination(connectionContext, destinationInfo.getDestination(), true);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void removeDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception {
        removeDestination(connectionContext, destinationInfo.getDestination(), destinationInfo.getTimeout());
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public ActiveMQDestination[] getDestinations() throws Exception {
        ArrayList arrayList = new ArrayList(getDestinationMap().keySet());
        ActiveMQDestination[] activeMQDestinationArr = new ActiveMQDestination[arrayList.size()];
        arrayList.toArray(activeMQDestinationArr);
        return activeMQDestinationArr;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker, org.apache.activemq.broker.region.Region
    public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        ActiveMQDestination destination = producerInfo.getDestination();
        if (destination != null) {
            this.inactiveDestinationsPurgeLock.readLock().lock();
            try {
                connectionContext.getBroker().addDestination(connectionContext, destination, isAllowTempAutoCreationOnSend());
                getRegion(destination).addProducer(connectionContext, producerInfo);
                this.inactiveDestinationsPurgeLock.readLock().unlock();
            } catch (Throwable th) {
                this.inactiveDestinationsPurgeLock.readLock().unlock();
                throw th;
            }
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker, org.apache.activemq.broker.region.Region
    public void removeProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        ActiveMQDestination destination = producerInfo.getDestination();
        if (destination != null) {
            this.inactiveDestinationsPurgeLock.readLock().lock();
            try {
                getRegion(destination).removeProducer(connectionContext, producerInfo);
                this.inactiveDestinationsPurgeLock.readLock().unlock();
            } catch (Throwable th) {
                this.inactiveDestinationsPurgeLock.readLock().unlock();
                throw th;
            }
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (this.destinationInterceptor != null) {
            this.destinationInterceptor.create(this, connectionContext, destination);
        }
        this.inactiveDestinationsPurgeLock.readLock().lock();
        try {
            Subscription addConsumer = getRegion(destination).addConsumer(connectionContext, consumerInfo);
            this.inactiveDestinationsPurgeLock.readLock().unlock();
            return addConsumer;
        } catch (Throwable th) {
            this.inactiveDestinationsPurgeLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void removeConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        ActiveMQDestination destination = consumerInfo.getDestination();
        this.inactiveDestinationsPurgeLock.readLock().lock();
        try {
            getRegion(destination).removeConsumer(connectionContext, consumerInfo);
            this.inactiveDestinationsPurgeLock.readLock().unlock();
        } catch (Throwable th) {
            this.inactiveDestinationsPurgeLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void removeSubscription(ConnectionContext connectionContext, RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception {
        this.inactiveDestinationsPurgeLock.readLock().lock();
        try {
            this.topicRegion.removeSubscription(connectionContext, removeSubscriptionInfo);
            this.inactiveDestinationsPurgeLock.readLock().unlock();
        } catch (Throwable th) {
            this.inactiveDestinationsPurgeLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
        ActiveMQDestination destination = message.getDestination();
        message.setBrokerInTime(System.currentTimeMillis());
        if (producerBrokerExchange.isMutable() || producerBrokerExchange.getRegion() == null || (producerBrokerExchange.getRegionDestination() != null && producerBrokerExchange.getRegionDestination().isDisposed())) {
            producerBrokerExchange.getConnectionContext().getBroker().addDestination(producerBrokerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
            producerBrokerExchange.setRegion(getRegion(destination));
            producerBrokerExchange.setRegionDestination(null);
        }
        producerBrokerExchange.getRegion().send(producerBrokerExchange, message);
        if (producerBrokerExchange.isMutable()) {
            producerBrokerExchange.setRegionDestination(null);
            producerBrokerExchange.setRegion(null);
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
        if (consumerBrokerExchange.isWildcard() || consumerBrokerExchange.getRegion() == null) {
            consumerBrokerExchange.setRegion(getRegion(messageAck.getDestination()));
        }
        consumerBrokerExchange.getRegion().acknowledge(consumerBrokerExchange, messageAck);
    }

    protected Region getRegion(ActiveMQDestination activeMQDestination) throws JMSException {
        switch (activeMQDestination.getDestinationType()) {
            case 1:
                return this.queueRegion;
            case 2:
                return this.topicRegion;
            case 3:
            case 4:
            default:
                throw createUnknownDestinationTypeException(activeMQDestination);
            case 5:
                return this.tempQueueRegion;
            case 6:
                return this.tempTopicRegion;
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public Response messagePull(ConnectionContext connectionContext, MessagePull messagePull) throws Exception {
        return getRegion(messagePull.getDestination()).messagePull(connectionContext, messagePull);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public TransactionId[] getPreparedTransactions(ConnectionContext connectionContext) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void beginTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public int prepareTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void rollbackTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void forgetTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void gc() {
        this.queueRegion.gc();
        this.topicRegion.gc();
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public BrokerId getBrokerId() {
        if (this.brokerId == null) {
            this.brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
        }
        return this.brokerId;
    }

    public void setBrokerId(BrokerId brokerId) {
        this.brokerId = brokerId;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public String getBrokerName() {
        if (this.brokerName == null) {
            try {
                this.brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
            } catch (Exception e) {
                this.brokerName = BrokerService.DEFAULT_BROKER_NAME;
            }
        }
        return this.brokerName;
    }

    public void setBrokerName(String str) {
        this.brokerName = str;
    }

    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination activeMQDestination) {
        return new JMSException("Unknown destination type: " + ((int) activeMQDestination.getDestinationType()));
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public synchronized void addBroker(Connection connection, BrokerInfo brokerInfo) {
        BrokerInfo brokerInfo2 = this.brokerInfos.get(brokerInfo.getBrokerId());
        if (brokerInfo2 == null) {
            brokerInfo2 = brokerInfo.copy();
            brokerInfo2.setPeerBrokerInfos(null);
            this.brokerInfos.put(brokerInfo.getBrokerId(), brokerInfo2);
        }
        brokerInfo2.incrementRefCount();
        LOG.debug("{} addBroker: {} brokerInfo size: {}", getBrokerName(), brokerInfo.getBrokerName(), Integer.valueOf(this.brokerInfos.size()));
        addBrokerInClusterUpdate(brokerInfo);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public synchronized void removeBroker(Connection connection, BrokerInfo brokerInfo) {
        if (brokerInfo != null) {
            BrokerInfo brokerInfo2 = this.brokerInfos.get(brokerInfo.getBrokerId());
            if (brokerInfo2 != null && brokerInfo2.decrementRefCount() == 0) {
                this.brokerInfos.remove(brokerInfo.getBrokerId());
            }
            LOG.debug("{} removeBroker: {} brokerInfo size: {}", getBrokerName(), brokerInfo.getBrokerName(), Integer.valueOf(this.brokerInfos.size()));
            if (this.brokerService.isStopping()) {
                return;
            }
            removeBrokerInClusterUpdate(brokerInfo);
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public synchronized BrokerInfo[] getPeerBrokerInfos() {
        return (BrokerInfo[]) this.brokerInfos.values().toArray(new BrokerInfo[this.brokerInfos.size()]);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        Message message = messageDispatch.getMessage();
        if (message != null) {
            long currentTimeMillis = System.currentTimeMillis();
            message.setBrokerOutTime(currentTimeMillis);
            if (getBrokerService().isEnableStatistics()) {
                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(currentTimeMillis - message.getBrokerInTime());
            }
            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
                int redeliveryCounter = message.getRedeliveryCounter();
                message.incrementRedeliveryCounter();
                try {
                    try {
                        ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
                        message.setRedeliveryCounter(redeliveryCounter);
                    } catch (IOException e) {
                        LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), e);
                        message.setRedeliveryCounter(redeliveryCounter);
                    }
                } catch (Throwable th) {
                    message.setRedeliveryCounter(redeliveryCounter);
                    throw th;
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void postProcessDispatch(MessageDispatch messageDispatch) {
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        getRegion(messageDispatchNotification.getDestination()).processDispatchNotification(messageDispatchNotification);
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public boolean isStopped() {
        return !this.started;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public Set<ActiveMQDestination> getDurableDestinations() {
        return this.destinationFactory.getDestinations();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doStop(ServiceStopper serviceStopper) {
        serviceStopper.stop(this.queueRegion);
        serviceStopper.stop(this.topicRegion);
        serviceStopper.stop(this.tempQueueRegion);
        serviceStopper.stop(this.tempTopicRegion);
    }

    public boolean isKeepDurableSubsActive() {
        return this.keepDurableSubsActive;
    }

    public void setKeepDurableSubsActive(boolean z) {
        this.keepDurableSubsActive = z;
        ((TopicRegion) this.topicRegion).setKeepDurableSubsActive(z);
    }

    public DestinationInterceptor getDestinationInterceptor() {
        return this.destinationInterceptor;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public ConnectionContext getAdminConnectionContext() {
        return this.adminConnectionContext;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void setAdminConnectionContext(ConnectionContext connectionContext) {
        this.adminConnectionContext = connectionContext;
    }

    public Map<ConnectionId, ConnectionState> getConnectionStates() {
        return this.connectionStates;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public PListStore getTempDataStore() {
        return this.brokerService.getTempDataStore();
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public URI getVmConnectorURI() {
        return this.brokerService.getVmConnectorURI();
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void brokerServiceStarted() {
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public boolean isExpired(MessageReference messageReference) {
        boolean z = false;
        if (messageReference.isExpired()) {
            try {
                Message message = messageReference.getMessage();
                synchronized (message) {
                    z = stampAsExpired(message);
                }
            } catch (IOException e) {
                LOG.warn("unexpected exception on message expiry determination for: {}", messageReference, e);
            }
        }
        return z;
    }

    private boolean stampAsExpired(Message message) throws IOException {
        boolean z = false;
        if (message.getProperty("originalExpiration") == null) {
            message.setProperty("originalExpiration", new Long(message.getExpiration()));
            z = true;
        }
        return z;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public void messageExpired(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription) {
        LOG.debug("Message expired {}", messageReference);
        getRoot().sendToDeadLetterQueue(connectionContext, messageReference, subscription, new Throwable("Message Expired. Expiration:" + messageReference.getExpiration()));
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public boolean sendToDeadLetterQueue(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription, Throwable th) {
        if (messageReference != null) {
            try {
                Message message = messageReference.getMessage();
                if (message != null && messageReference.getRegionDestination() != null) {
                    DeadLetterStrategy deadLetterStrategy = ((Destination) messageReference.getRegionDestination()).getDeadLetterStrategy();
                    if (deadLetterStrategy == null) {
                        LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination());
                    } else if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
                        Message copy = message.copy();
                        stampAsExpired(copy);
                        copy.setExpiration(0L);
                        if (!copy.isPersistent()) {
                            copy.setPersistent(true);
                            copy.setProperty("originalDeliveryMode", "NON_PERSISTENT");
                        }
                        if (th != null) {
                            copy.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, th.toString());
                        }
                        ActiveMQDestination deadLetterQueueFor = deadLetterStrategy.getDeadLetterQueueFor(copy, subscription);
                        ConnectionContext connectionContext2 = connectionContext;
                        if (connectionContext.getSecurityContext() == null || !connectionContext.getSecurityContext().isBrokerContext()) {
                            connectionContext2 = BrokerSupport.getConnectionContext(this);
                        }
                        BrokerSupport.resendNoCopy(connectionContext2, copy, deadLetterQueueFor);
                        return true;
                    }
                }
            } catch (Exception e) {
                LOG.warn("Caught an exception sending to DLQ: {}", messageReference, e);
                return false;
            }
        }
        return false;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public Broker getRoot() {
        try {
            return getBrokerService().getBroker();
        } catch (Exception e) {
            LOG.error("Trying to get Root Broker", (Throwable) e);
            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
        }
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public long getBrokerSequenceId() {
        long nextSequenceId;
        synchronized (this.sequenceGenerator) {
            nextSequenceId = this.sequenceGenerator.getNextSequenceId();
        }
        return nextSequenceId;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.Broker
    public ThreadPoolExecutor getExecutor() {
        return this.executor;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void processConsumerControl(ConsumerBrokerExchange consumerBrokerExchange, ConsumerControl consumerControl) {
        ActiveMQDestination destination = consumerControl.getDestination();
        try {
            getRegion(destination).processConsumerControl(consumerBrokerExchange, consumerControl);
        } catch (JMSException e) {
            LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, consumerControl);
        }
    }

    protected void addBrokerInClusterUpdate(BrokerInfo brokerInfo) {
        for (TransportConnector transportConnector : this.brokerService.getTransportConnectors()) {
            if (transportConnector.isUpdateClusterClients()) {
                transportConnector.addPeerBroker(brokerInfo);
                transportConnector.updateClientClusterInfo();
            }
        }
    }

    protected void removeBrokerInClusterUpdate(BrokerInfo brokerInfo) {
        for (TransportConnector transportConnector : this.brokerService.getTransportConnectors()) {
            if (transportConnector.isUpdateClusterClients() && transportConnector.isUpdateClusterClientsOnRemove()) {
                transportConnector.removePeerBroker(brokerInfo);
                transportConnector.updateClientClusterInfo();
            }
        }
    }

    protected void purgeInactiveDestinations() {
        this.inactiveDestinationsPurgeLock.writeLock().lock();
        try {
            ArrayList<Destination> arrayList = new ArrayList();
            Map<ActiveMQDestination, Destination> destinationMap = getDestinationMap();
            if (isAllowTempAutoCreationOnSend()) {
                destinationMap.putAll(this.tempQueueRegion.getDestinationMap());
                destinationMap.putAll(this.tempTopicRegion.getDestinationMap());
            }
            long maxPurgedDestinationsPerSweep = this.brokerService.getMaxPurgedDestinationsPerSweep();
            long currentTimeMillis = System.currentTimeMillis();
            for (Destination destination : destinationMap.values()) {
                destination.markForGC(currentTimeMillis);
                if (destination.canGC()) {
                    arrayList.add(destination);
                    if (maxPurgedDestinationsPerSweep > 0 && arrayList.size() == maxPurgedDestinationsPerSweep) {
                        break;
                    }
                }
            }
            if (!arrayList.isEmpty()) {
                ConnectionContext connectionContext = BrokerSupport.getConnectionContext(this);
                connectionContext.setBroker(this);
                for (Destination destination2 : arrayList) {
                    Logger logger = LOG;
                    if (destination2 instanceof BaseDestination) {
                        logger = ((BaseDestination) destination2).getLog();
                    }
                    logger.info("{} Inactive for longer than {} ms - removing ...", destination2.getName(), Long.valueOf(destination2.getInactiveTimoutBeforeGC()));
                    try {
                        getRoot().removeDestination(connectionContext, destination2.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1L : 0L);
                    } catch (Exception e) {
                        LOG.error("Failed to remove inactive destination {}", destination2, e);
                    }
                }
            }
        } finally {
            this.inactiveDestinationsPurgeLock.writeLock().unlock();
        }
    }

    public boolean isAllowTempAutoCreationOnSend() {
        return this.allowTempAutoCreationOnSend;
    }

    public void setAllowTempAutoCreationOnSend(boolean z) {
        this.allowTempAutoCreationOnSend = z;
    }

    @Override // org.apache.activemq.broker.EmptyBroker, org.apache.activemq.broker.region.Region
    public void reapplyInterceptor() {
        this.queueRegion.reapplyInterceptor();
        this.topicRegion.reapplyInterceptor();
        this.tempQueueRegion.reapplyInterceptor();
        this.tempTopicRegion.reapplyInterceptor();
    }
}
