package org.apache.activemq.broker.region;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.autoproxy.target.QuickTargetSourceCreator;

/* loaded from: input_file:WEB-INF/lib/activemq-broker-5.11.0.redhat-630415.jar:org/apache/activemq/broker/region/TopicRegion.class */
public class TopicRegion extends AbstractRegion {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TopicRegion.class);
    protected final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
    private final LongSequenceGenerator recoveredDurableSubIdGenerator;
    private final SessionId recoveredDurableSubSessionId;
    private boolean keepDurableSubsActive;
    private Timer cleanupTimer;
    private TimerTask cleanupTask;

    public TopicRegion(RegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage systemUsage, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
        super(regionBroker, destinationStatistics, systemUsage, taskRunnerFactory, destinationFactory);
        this.durableSubscriptions = new ConcurrentHashMap();
        this.recoveredDurableSubIdGenerator = new LongSequenceGenerator();
        this.recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), this.recoveredDurableSubIdGenerator.getNextSequenceId());
        if (regionBroker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() == -1 || regionBroker.getBrokerService().getOfflineDurableSubscriberTimeout() == -1) {
            return;
        }
        this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
        this.cleanupTask = new TimerTask() { // from class: org.apache.activemq.broker.region.TopicRegion.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                TopicRegion.this.doCleanup();
            }
        };
        this.cleanupTimer.schedule(this.cleanupTask, regionBroker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), regionBroker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
    }

    @Override // org.apache.activemq.broker.region.AbstractRegion, org.apache.activemq.Service
    public void stop() throws Exception {
        super.stop();
        if (this.cleanupTimer != null) {
            this.cleanupTimer.cancel();
        }
    }

    public void doCleanup() {
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : this.durableSubscriptions.entrySet()) {
            DurableTopicSubscription value = entry.getValue();
            if (!value.isActive()) {
                long offlineTimestamp = value.getOfflineTimestamp();
                if (offlineTimestamp != -1 && currentTimeMillis - offlineTimestamp >= this.broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
                    LOG.info("Destroying durable subscriber due to inactivity: {}", value);
                    try {
                        RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
                        removeSubscriptionInfo.setClientId(entry.getKey().getClientId());
                        removeSubscriptionInfo.setSubscriptionName(entry.getKey().getSubscriptionName());
                        ConnectionContext connectionContext = new ConnectionContext();
                        connectionContext.setBroker(this.broker);
                        connectionContext.setClientId(entry.getKey().getClientId());
                        removeSubscription(connectionContext, removeSubscriptionInfo);
                    } catch (Exception e) {
                        LOG.error("Failed to remove inactive durable subscriber", (Throwable) e);
                    }
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.AbstractRegion, org.apache.activemq.broker.region.Region
    public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        if (!consumerInfo.isDurable()) {
            return super.addConsumer(connectionContext, consumerInfo);
        }
        if (this.broker.getBrokerService().isRejectDurableConsumers()) {
            throw new JMSException("Durable Consumers are not allowed");
        }
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (!destination.isPattern()) {
            lookup(connectionContext, destination, true);
        }
        String clientId = connectionContext.getClientId();
        String subscriptionName = consumerInfo.getSubscriptionName();
        SubscriptionKey subscriptionKey = new SubscriptionKey(clientId, subscriptionName);
        DurableTopicSubscription durableTopicSubscription = this.durableSubscriptions.get(subscriptionKey);
        if (durableTopicSubscription == null) {
            super.addConsumer(connectionContext, consumerInfo);
            durableTopicSubscription = this.durableSubscriptions.get(subscriptionKey);
            if (durableTopicSubscription == null) {
                throw new JMSException("Cannot use the same consumerId: " + consumerInfo.getConsumerId() + " for two different durable subscriptions clientID: " + subscriptionKey.getClientId() + " subscriberName: " + subscriptionKey.getSubscriptionName());
            }
        } else {
            if (!connectionContext.isAllowLinkStealing() && durableTopicSubscription.isActive()) {
                throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
            }
            if (hasDurableSubChanged(consumerInfo, durableTopicSubscription.getConsumerInfo())) {
                this.durableSubscriptions.remove(subscriptionKey);
                this.destinationsLock.readLock().lock();
                try {
                    for (Destination destination2 : this.destinations.values()) {
                        if (destination2 instanceof Topic) {
                            ((Topic) destination2).deleteSubscription(connectionContext, subscriptionKey);
                        }
                    }
                    super.removeConsumer(connectionContext, durableTopicSubscription.getConsumerInfo());
                    super.addConsumer(connectionContext, consumerInfo);
                    durableTopicSubscription = this.durableSubscriptions.get(subscriptionKey);
                } finally {
                    this.destinationsLock.readLock().unlock();
                }
            } else {
                if (durableTopicSubscription.getConsumerInfo().getConsumerId() != null) {
                    this.subscriptions.remove(durableTopicSubscription.getConsumerInfo().getConsumerId());
                }
                if (durableTopicSubscription.context != connectionContext || durableTopicSubscription.info != consumerInfo) {
                    durableTopicSubscription.info = consumerInfo;
                    durableTopicSubscription.context = connectionContext;
                    durableTopicSubscription.deactivate(this.keepDurableSubsActive, consumerInfo.getLastDeliveredSequenceId());
                }
                this.subscriptions.put(consumerInfo.getConsumerId(), durableTopicSubscription);
            }
        }
        durableTopicSubscription.activate(this.usageManager, connectionContext, consumerInfo, this.broker);
        return durableTopicSubscription;
    }

    @Override // org.apache.activemq.broker.region.AbstractRegion, org.apache.activemq.broker.region.Region
    public void removeConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        if (!consumerInfo.isDurable()) {
            super.removeConsumer(connectionContext, consumerInfo);
            return;
        }
        DurableTopicSubscription durableTopicSubscription = this.durableSubscriptions.get(new SubscriptionKey(connectionContext.getClientId(), consumerInfo.getSubscriptionName()));
        if (durableTopicSubscription == null || durableTopicSubscription.getContext() != connectionContext) {
            return;
        }
        durableTopicSubscription.deactivate(this.keepDurableSubsActive, consumerInfo.getLastDeliveredSequenceId());
    }

    @Override // org.apache.activemq.broker.region.AbstractRegion, org.apache.activemq.broker.region.Region
    public void removeSubscription(ConnectionContext connectionContext, RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception {
        SubscriptionKey subscriptionKey = new SubscriptionKey(removeSubscriptionInfo.getClientId(), removeSubscriptionInfo.getSubscriptionName());
        DurableTopicSubscription durableTopicSubscription = this.durableSubscriptions.get(subscriptionKey);
        if (durableTopicSubscription == null) {
            throw new InvalidDestinationException("No durable subscription exists for clientID: " + removeSubscriptionInfo.getClientId() + " and subscriptionName: " + removeSubscriptionInfo.getSubscriptionName());
        }
        if (durableTopicSubscription.isActive()) {
            throw new JMSException("Durable consumer is in use");
        }
        this.durableSubscriptions.remove(subscriptionKey);
        this.destinationsLock.readLock().lock();
        try {
            for (Destination destination : this.destinations.values()) {
                if (destination instanceof Topic) {
                    ((Topic) destination).deleteSubscription(connectionContext, subscriptionKey);
                } else if (destination instanceof DestinationFilter) {
                    ((DestinationFilter) destination).deleteSubscription(connectionContext, subscriptionKey);
                }
            }
            if (this.subscriptions.get(durableTopicSubscription.getConsumerInfo().getConsumerId()) != null) {
                super.removeConsumer(connectionContext, durableTopicSubscription.getConsumerInfo());
            } else {
                destroySubscription(durableTopicSubscription);
            }
        } finally {
            this.destinationsLock.readLock().unlock();
        }
    }

    public String toString() {
        return "TopicRegion: destinations=" + this.destinations.size() + ", subscriptions=" + this.subscriptions.size() + ", memory=" + this.usageManager.getMemoryUsage().getPercentUsage() + QuickTargetSourceCreator.PREFIX_THREAD_LOCAL;
    }

    @Override // org.apache.activemq.broker.region.AbstractRegion
    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext connectionContext, Destination destination) throws Exception {
        List<Subscription> addSubscriptionsForDestination = super.addSubscriptionsForDestination(connectionContext, destination);
        HashSet hashSet = new HashSet(addSubscriptionsForDestination);
        TopicMessageStore topicMessageStore = (TopicMessageStore) destination.getMessageStore();
        if (topicMessageStore != null) {
            for (SubscriptionInfo subscriptionInfo : topicMessageStore.getAllSubscriptions()) {
                LOG.debug("Restoring durable subscription: {}", subscriptionInfo);
                SubscriptionKey subscriptionKey = new SubscriptionKey(subscriptionInfo);
                DurableTopicSubscription durableTopicSubscription = this.durableSubscriptions.get(subscriptionKey);
                ConsumerInfo createInactiveConsumerInfo = createInactiveConsumerInfo(subscriptionInfo);
                if (durableTopicSubscription == null) {
                    ConnectionContext connectionContext2 = new ConnectionContext();
                    connectionContext2.setBroker(connectionContext.getBroker());
                    connectionContext2.setClientId(subscriptionKey.getClientId());
                    connectionContext2.setConnectionId(createInactiveConsumerInfo.getConsumerId().getParentId().getParentId());
                    durableTopicSubscription = (DurableTopicSubscription) createSubscription(connectionContext2, createInactiveConsumerInfo);
                    durableTopicSubscription.setOfflineTimestamp(System.currentTimeMillis());
                }
                if (!hashSet.contains(durableTopicSubscription)) {
                    hashSet.add(durableTopicSubscription);
                    addSubscriptionsForDestination.add(durableTopicSubscription);
                    destination.addSubscription(connectionContext, durableTopicSubscription);
                }
            }
            this.durableSubscriptions.values();
            for (DurableTopicSubscription durableTopicSubscription2 : this.durableSubscriptions.values()) {
                if (!hashSet.contains(durableTopicSubscription2) && durableTopicSubscription2.matches(destination.getActiveMQDestination())) {
                    addSubscriptionsForDestination.add(durableTopicSubscription2);
                    destination.addSubscription(connectionContext, durableTopicSubscription2);
                }
            }
        }
        return addSubscriptionsForDestination;
    }

    public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo subscriptionInfo) {
        ConsumerInfo consumerInfo = new ConsumerInfo();
        consumerInfo.setSelector(subscriptionInfo.getSelector());
        consumerInfo.setSubscriptionName(subscriptionInfo.getSubscriptionName());
        consumerInfo.setDestination(subscriptionInfo.getSubscribedDestination());
        consumerInfo.setConsumerId(createConsumerId());
        return consumerInfo;
    }

    private ConsumerId createConsumerId() {
        return new ConsumerId(this.recoveredDurableSubSessionId, this.recoveredDurableSubIdGenerator.getNextSequenceId());
    }

    protected void configureTopic(Topic topic, ActiveMQDestination activeMQDestination) {
        PolicyEntry entryFor;
        if (this.broker.getDestinationPolicy() == null || (entryFor = this.broker.getDestinationPolicy().getEntryFor(activeMQDestination)) == null) {
            return;
        }
        entryFor.configure(this.broker, topic);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.region.AbstractRegion
    public Subscription createSubscription(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws JMSException {
        PolicyEntry entryFor;
        PolicyEntry entryFor2;
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (consumerInfo.isDurable()) {
            if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination())) {
                throw new JMSException("Cannot create a durable subscription for an advisory Topic");
            }
            SubscriptionKey subscriptionKey = new SubscriptionKey(connectionContext.getClientId(), consumerInfo.getSubscriptionName());
            if (this.durableSubscriptions.get(subscriptionKey) != null) {
                throw new JMSException("Durable subscription is already active for clientID: " + connectionContext.getClientId() + " and subscriptionName: " + consumerInfo.getSubscriptionName());
            }
            DurableTopicSubscription durableTopicSubscription = new DurableTopicSubscription(this.broker, this.usageManager, connectionContext, consumerInfo, this.keepDurableSubsActive);
            if (destination != null && this.broker.getDestinationPolicy() != null && (entryFor2 = this.broker.getDestinationPolicy().getEntryFor(destination)) != null) {
                entryFor2.configure(this.broker, this.usageManager, durableTopicSubscription);
            }
            this.durableSubscriptions.put(subscriptionKey, durableTopicSubscription);
            return durableTopicSubscription;
        }
        try {
            TopicSubscription topicSubscription = new TopicSubscription(this.broker, connectionContext, consumerInfo, this.usageManager);
            if (destination != null && this.broker.getDestinationPolicy() != null && (entryFor = this.broker.getDestinationPolicy().getEntryFor(destination)) != null) {
                entryFor.configure(this.broker, this.usageManager, topicSubscription);
            }
            topicSubscription.init();
            return topicSubscription;
        } catch (Exception e) {
            LOG.error("Failed to create TopicSubscription ", (Throwable) e);
            JMSException jMSException = new JMSException("Couldn't create TopicSubscription");
            jMSException.setLinkedException(e);
            throw jMSException;
        }
    }

    private boolean hasDurableSubChanged(ConsumerInfo consumerInfo, ConsumerInfo consumerInfo2) {
        if ((consumerInfo.getSelector() != null) ^ (consumerInfo2.getSelector() != null)) {
            return true;
        }
        return ((consumerInfo.getSelector() == null || consumerInfo.getSelector().equals(consumerInfo2.getSelector())) && consumerInfo.getDestination().equals(consumerInfo2.getDestination())) ? false : true;
    }

    @Override // org.apache.activemq.broker.region.AbstractRegion
    protected Set<ActiveMQDestination> getInactiveDestinations() {
        Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
        Iterator<ActiveMQDestination> it = inactiveDestinations.iterator();
        while (it.hasNext()) {
            if (!it.next().isTopic()) {
                it.remove();
            }
        }
        return inactiveDestinations;
    }

    public DurableTopicSubscription lookupSubscription(String str, String str2) {
        SubscriptionKey subscriptionKey = new SubscriptionKey(str2, str);
        if (this.durableSubscriptions.containsKey(subscriptionKey)) {
            return this.durableSubscriptions.get(subscriptionKey);
        }
        return null;
    }

    public List<DurableTopicSubscription> lookupSubscriptions(String str) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : this.durableSubscriptions.entrySet()) {
            if (entry.getKey().getClientId().equals(str)) {
                arrayList.add(entry.getValue());
            }
        }
        return arrayList;
    }

    public boolean isKeepDurableSubsActive() {
        return this.keepDurableSubsActive;
    }

    public void setKeepDurableSubsActive(boolean z) {
        this.keepDurableSubsActive = z;
    }

    public boolean durableSubscriptionExists(SubscriptionKey subscriptionKey) {
        return this.durableSubscriptions.containsKey(subscriptionKey);
    }

    public DurableTopicSubscription getDurableSubscription(SubscriptionKey subscriptionKey) {
        return this.durableSubscriptions.get(subscriptionKey);
    }

    public Map<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() {
        return this.durableSubscriptions;
    }
}
