package org.apache.activemq.broker.region;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-broker-5.11.0.redhat-630495.jar:org/apache/activemq/broker/region/AbstractRegion.class */
public abstract class AbstractRegion implements Region {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractRegion.class);
    protected final SystemUsage usageManager;
    protected final DestinationFactory destinationFactory;
    protected final DestinationStatistics destinationStatistics;
    protected final RegionBroker broker;
    protected final TaskRunnerFactory taskRunnerFactory;
    protected boolean started;
    protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap();
    protected final DestinationMap destinationMap = new DestinationMap();
    protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap();
    protected boolean autoCreateDestinations = true;
    protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
    protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap();

    public AbstractRegion(RegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage systemUsage, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        if (regionBroker == null) {
            throw new IllegalArgumentException("null broker");
        }
        this.broker = regionBroker;
        this.destinationStatistics = destinationStatistics;
        this.usageManager = systemUsage;
        this.taskRunnerFactory = taskRunnerFactory;
        if (destinationFactory == null) {
            throw new IllegalArgumentException("null destinationFactory");
        }
        this.destinationFactory = destinationFactory;
    }

    @Override // org.apache.activemq.Service
    public final void start() throws Exception {
        this.started = true;
        for (ActiveMQDestination activeMQDestination : getInactiveDestinations()) {
            ConnectionContext connectionContext = new ConnectionContext();
            connectionContext.setBroker(this.broker.getBrokerService().getBroker());
            connectionContext.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
            connectionContext.getBroker().addDestination(connectionContext, activeMQDestination, false);
        }
        this.destinationsLock.readLock().lock();
        try {
            Iterator<Destination> it = this.destinations.values().iterator();
            while (it.hasNext()) {
                it.next().start();
            }
        } finally {
            this.destinationsLock.readLock().unlock();
        }
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        this.started = false;
        this.destinationsLock.readLock().lock();
        try {
            Iterator<Destination> it = this.destinations.values().iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
            this.destinations.clear();
        } finally {
            this.destinationsLock.readLock().unlock();
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public Destination addDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, boolean z) throws Exception {
        this.destinationsLock.writeLock().lock();
        try {
            Destination destination = this.destinations.get(activeMQDestination);
            if (destination == null) {
                if (!activeMQDestination.isTemporary() || z) {
                    LOG.debug("{} adding destination: {}", this.broker.getBrokerName(), activeMQDestination);
                    destination = createDestination(connectionContext, activeMQDestination);
                    DestinationInterceptor destinationInterceptor = this.broker.getDestinationInterceptor();
                    if (destinationInterceptor != null) {
                        destination = destinationInterceptor.intercept(destination);
                    }
                    destination.start();
                    addSubscriptionsForDestination(connectionContext, destination);
                    this.destinations.put(activeMQDestination, destination);
                    this.destinationMap.unsynchronizedPut(activeMQDestination, destination);
                }
                if (destination == null) {
                    throw new DestinationDoesNotExistException(activeMQDestination.getQualifiedName());
                }
            }
            return destination;
        } finally {
            this.destinationsLock.writeLock().unlock();
        }
    }

    public Map<ConsumerId, Subscription> getSubscriptions() {
        return this.subscriptions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Subscription> addSubscriptionsForDestination(ConnectionContext connectionContext, Destination destination) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Subscription subscription : this.subscriptions.values()) {
            if (subscription.matches(destination.getActiveMQDestination())) {
                try {
                    destination.addSubscription(subscription.getContext() != null ? subscription.getContext() : connectionContext, subscription);
                    arrayList.add(subscription);
                } catch (SecurityException e) {
                    if (!subscription.isWildcard()) {
                        throw e;
                    }
                    LOG.debug("Subscription denied for " + subscription + " to destination " + destination.getActiveMQDestination() + ": " + e.getMessage());
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.activemq.broker.region.Region
    public void removeDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, long j) throws Exception {
        if (j == 0) {
            for (Subscription subscription : this.subscriptions.values()) {
                if (subscription.matches(activeMQDestination)) {
                    throw new JMSException("Destination: " + activeMQDestination + " still has an active subscription: " + subscription);
                }
            }
        }
        if (j > 0) {
        }
        LOG.debug("{} removing destination: {}", this.broker.getBrokerName(), activeMQDestination);
        this.destinationsLock.writeLock().lock();
        try {
            Destination remove = this.destinations.remove(activeMQDestination);
            if (remove != null) {
                for (Subscription subscription2 : this.subscriptions.values()) {
                    if (subscription2.matches(activeMQDestination)) {
                        remove.removeSubscription(connectionContext, subscription2, 0L);
                    }
                }
                this.destinationMap.unsynchronizedRemove(activeMQDestination, remove);
                dispose(connectionContext, remove);
                DestinationInterceptor destinationInterceptor = this.broker.getDestinationInterceptor();
                if (destinationInterceptor != null) {
                    destinationInterceptor.remove(remove);
                }
            } else {
                LOG.debug("Cannot remove a destination that doesn't exist: {}", activeMQDestination);
            }
        } finally {
            this.destinationsLock.writeLock().unlock();
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public Set<Destination> getDestinations(ActiveMQDestination activeMQDestination) {
        this.destinationsLock.readLock().lock();
        try {
            Set<Destination> unsynchronizedGet = this.destinationMap.unsynchronizedGet(activeMQDestination);
            this.destinationsLock.readLock().unlock();
            return unsynchronizedGet;
        } catch (Throwable th) {
            this.destinationsLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public Map<ActiveMQDestination, Destination> getDestinationMap() {
        return this.destinations;
    }

    @Override // org.apache.activemq.broker.region.Region
    public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        Object obj;
        LOG.debug("{} adding consumer: {} for destination: {}", this.broker.getBrokerName(), consumerInfo.getConsumerId(), consumerInfo.getDestination());
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (destination != null && !destination.isPattern() && !destination.isComposite()) {
            lookup(connectionContext, destination, true);
        }
        synchronized (this.consumerChangeMutexMap) {
            obj = this.consumerChangeMutexMap.get(consumerInfo.getConsumerId());
            if (obj == null) {
                obj = new Object();
                this.consumerChangeMutexMap.put(consumerInfo.getConsumerId(), obj);
            }
        }
        synchronized (obj) {
            Subscription subscription = this.subscriptions.get(consumerInfo.getConsumerId());
            if (subscription != null) {
                LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
                return subscription;
            }
            org.apache.activemq.filter.DestinationFilter.parseFilter(consumerInfo.getDestination());
            Subscription createSubscription = createSubscription(connectionContext, consumerInfo);
            ArrayList<Destination> arrayList = new ArrayList();
            this.destinationsLock.readLock().lock();
            try {
                Iterator it = this.destinationMap.unsynchronizedGet(consumerInfo.getDestination()).iterator();
                while (it.hasNext()) {
                    arrayList.add((Destination) it.next());
                }
                this.subscriptions.put(consumerInfo.getConsumerId(), createSubscription);
                this.destinationsLock.readLock().unlock();
                ArrayList<Destination> arrayList2 = new ArrayList();
                for (Destination destination2 : arrayList) {
                    try {
                        destination2.addSubscription(connectionContext, createSubscription);
                        arrayList2.add(destination2);
                    } catch (SecurityException e) {
                        if (!createSubscription.isWildcard()) {
                            for (Destination destination3 : arrayList2) {
                                try {
                                    destination3.removeSubscription(connectionContext, createSubscription, consumerInfo.getLastDeliveredSequenceId());
                                } catch (Exception e2) {
                                    LOG.error("Error unsubscribing " + createSubscription + " from " + destination3 + ": " + e2.getMessage(), (Throwable) e2);
                                }
                            }
                            this.subscriptions.remove(consumerInfo.getConsumerId());
                            arrayList2.clear();
                            throw e;
                        }
                        LOG.debug("Subscription denied for " + createSubscription + " to destination " + destination2.getActiveMQDestination() + ": " + e.getMessage());
                    }
                }
                arrayList2.clear();
                if (consumerInfo.isBrowser()) {
                    ((QueueBrowserSubscription) createSubscription).destinationsAdded();
                }
                return createSubscription;
            } catch (Throwable th) {
                this.destinationsLock.readLock().unlock();
                throw th;
            }
        }
    }

    public Set getDurableDestinations() {
        return this.destinationFactory.getDestinations();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<ActiveMQDestination> getInactiveDestinations() {
        Set<ActiveMQDestination> destinations = this.destinationFactory.getDestinations();
        this.destinationsLock.readLock().lock();
        try {
            destinations.removeAll(this.destinations.keySet());
            this.destinationsLock.readLock().unlock();
            return destinations;
        } catch (Throwable th) {
            this.destinationsLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public void removeConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        LOG.debug("{} removing consumer: {} for destination: {}", this.broker.getBrokerName(), consumerInfo.getConsumerId(), consumerInfo.getDestination());
        Subscription remove = this.subscriptions.remove(consumerInfo.getConsumerId());
        if (remove != null) {
            ArrayList arrayList = new ArrayList();
            this.destinationsLock.readLock().lock();
            try {
                Iterator it = this.destinationMap.unsynchronizedGet(consumerInfo.getDestination()).iterator();
                while (it.hasNext()) {
                    arrayList.add((Destination) it.next());
                }
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((Destination) it2.next()).removeSubscription(connectionContext, remove, consumerInfo.getLastDeliveredSequenceId());
                }
                destroySubscription(remove);
            } finally {
                this.destinationsLock.readLock().unlock();
            }
        }
        synchronized (this.consumerChangeMutexMap) {
            this.consumerChangeMutexMap.remove(consumerInfo.getConsumerId());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void destroySubscription(Subscription subscription) {
        subscription.destroy();
    }

    @Override // org.apache.activemq.broker.region.Region
    public void removeSubscription(ConnectionContext connectionContext, RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception {
        throw new JMSException("Invalid operation.");
    }

    @Override // org.apache.activemq.broker.region.Region
    public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
        ConnectionContext connectionContext = producerBrokerExchange.getConnectionContext();
        if (producerBrokerExchange.isMutable() || producerBrokerExchange.getRegionDestination() == null) {
            producerBrokerExchange.setRegionDestination(lookup(connectionContext, message.getDestination(), false));
        }
        producerBrokerExchange.getRegionDestination().send(producerBrokerExchange, message);
        if (producerBrokerExchange.getProducerState() == null || producerBrokerExchange.getProducerState().getInfo() == null) {
            return;
        }
        producerBrokerExchange.getProducerState().getInfo().incrementSentCount();
    }

    @Override // org.apache.activemq.broker.region.Region
    public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
        Subscription subscription = consumerBrokerExchange.getSubscription();
        if (subscription == null) {
            subscription = this.subscriptions.get(messageAck.getConsumerId());
            if (subscription == null) {
                if (consumerBrokerExchange.getConnectionContext().isInRecoveryMode()) {
                    LOG.debug("Ack for non existent subscription in recovery, ack: {}", messageAck);
                    return;
                } else {
                    LOG.warn("Ack for non existent subscription, ack: {}", messageAck);
                    throw new IllegalArgumentException("The subscription does not exist: " + messageAck.getConsumerId());
                }
            }
            consumerBrokerExchange.setSubscription(subscription);
        }
        subscription.acknowledge(consumerBrokerExchange.getConnectionContext(), messageAck);
    }

    @Override // org.apache.activemq.broker.region.Region
    public Response messagePull(ConnectionContext connectionContext, MessagePull messagePull) throws Exception {
        Subscription subscription = this.subscriptions.get(messagePull.getConsumerId());
        if (subscription == null) {
            throw new IllegalArgumentException("The subscription does not exist: " + messagePull.getConsumerId());
        }
        return subscription.pullMessage(connectionContext, messagePull);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Destination lookup(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, boolean z) throws Exception {
        this.destinationsLock.readLock().lock();
        try {
            Destination destination = this.destinations.get(activeMQDestination);
            this.destinationsLock.readLock().unlock();
            if (destination == null) {
                if (isAutoCreateDestinations()) {
                    destination = connectionContext.getBroker().addDestination(connectionContext, activeMQDestination, z);
                }
                if (destination == null) {
                    throw new JMSException("The destination " + activeMQDestination + " does not exist.");
                }
            }
            return destination;
        } catch (Throwable th) {
            this.destinationsLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        Subscription subscription = this.subscriptions.get(messageDispatchNotification.getConsumerId());
        if (subscription == null) {
            throw new JMSException("Slave broker out of sync with master - Subscription: " + messageDispatchNotification.getConsumerId() + " on " + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " + messageDispatchNotification.getMessageId());
        }
        subscription.processMessageDispatchNotification(messageDispatchNotification);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception {
        this.destinationsLock.readLock().lock();
        try {
            Destination destination = this.destinations.get(messageDispatchNotification.getDestination());
            this.destinationsLock.readLock().unlock();
            if (destination == null) {
                throw new JMSException("Slave broker out of sync with master - Destination: " + messageDispatchNotification.getDestination() + " does not exist for consumer " + messageDispatchNotification.getConsumerId() + " with message: " + messageDispatchNotification.getMessageId());
            }
            destination.processDispatchNotification(messageDispatchNotification);
        } catch (Throwable th) {
            this.destinationsLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public void gc() {
        Iterator<Subscription> it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            it.next().gc();
        }
        this.destinationsLock.readLock().lock();
        try {
            Iterator<Destination> it2 = this.destinations.values().iterator();
            while (it2.hasNext()) {
                it2.next().gc();
            }
        } finally {
            this.destinationsLock.readLock().unlock();
        }
    }

    protected abstract Subscription createSubscription(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public Destination createDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination) throws Exception {
        return this.destinationFactory.createDestination(connectionContext, activeMQDestination, this.destinationStatistics);
    }

    public boolean isAutoCreateDestinations() {
        return this.autoCreateDestinations;
    }

    public void setAutoCreateDestinations(boolean z) {
        this.autoCreateDestinations = z;
    }

    @Override // org.apache.activemq.broker.region.Region
    public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        this.destinationsLock.readLock().lock();
        try {
            Iterator it = this.destinationMap.unsynchronizedGet(producerInfo.getDestination()).iterator();
            while (it.hasNext()) {
                ((Destination) it.next()).addProducer(connectionContext, producerInfo);
            }
        } finally {
            this.destinationsLock.readLock().unlock();
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public void removeProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        this.destinationsLock.readLock().lock();
        try {
            Iterator it = this.destinationMap.unsynchronizedGet(producerInfo.getDestination()).iterator();
            while (it.hasNext()) {
                ((Destination) it.next()).removeProducer(connectionContext, producerInfo);
            }
        } finally {
            this.destinationsLock.readLock().unlock();
        }
    }

    protected void dispose(ConnectionContext connectionContext, Destination destination) throws Exception {
        destination.dispose(connectionContext);
        destination.stop();
        this.destinationFactory.removeDestination(destination);
    }

    @Override // org.apache.activemq.broker.region.Region
    public void processConsumerControl(ConsumerBrokerExchange consumerBrokerExchange, ConsumerControl consumerControl) {
        PolicyEntry entryFor;
        Subscription subscription = this.subscriptions.get(consumerControl.getConsumerId());
        if (subscription == null || !(subscription instanceof AbstractSubscription)) {
            return;
        }
        ((AbstractSubscription) subscription).setPrefetchSize(consumerControl.getPrefetch());
        if (this.broker.getDestinationPolicy() != null && (entryFor = this.broker.getDestinationPolicy().getEntryFor(consumerControl.getDestination())) != null) {
            entryFor.configurePrefetch(subscription);
        }
        LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", Integer.valueOf(consumerControl.getPrefetch()), consumerControl.getConsumerId(), Integer.valueOf(subscription.getConsumerInfo().getPrefetchSize()));
        try {
            lookup(consumerBrokerExchange.getConnectionContext(), consumerControl.getDestination(), false).wakeup();
        } catch (Exception e) {
            LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", consumerControl.getDestination(), e);
        }
    }

    @Override // org.apache.activemq.broker.region.Region
    public void reapplyInterceptor() {
        this.destinationsLock.writeLock().lock();
        try {
            DestinationInterceptor destinationInterceptor = this.broker.getDestinationInterceptor();
            Map<ActiveMQDestination, Destination> destinationMap = getDestinationMap();
            for (ActiveMQDestination activeMQDestination : destinationMap.keySet()) {
                Destination destination = destinationMap.get(activeMQDestination);
                if (destination instanceof CompositeDestinationFilter) {
                    destination = ((CompositeDestinationFilter) destination).next;
                }
                if (destinationInterceptor != null) {
                    destination = destinationInterceptor.intercept(destination);
                }
                getDestinationMap().put(activeMQDestination, destination);
                this.destinations.put(activeMQDestination, destination);
            }
        } finally {
            this.destinationsLock.writeLock().unlock();
        }
    }
}
