package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.0.0-fuse-SNAPSHOT.jar:org/apache/activemq/broker/region/DurableTopicSubscription.class */
public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
    private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
    private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages;
    private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations;
    private final SubscriptionKey subscriptionKey;
    private final boolean keepDurableSubsActive;
    private final SystemUsage usageManager;
    private boolean active;

    public DurableTopicSubscription(Broker broker, SystemUsage systemUsage, ConnectionContext connectionContext, ConsumerInfo consumerInfo, boolean z) throws InvalidSelectorException {
        super(broker, connectionContext, consumerInfo);
        this.redeliveredMessages = new ConcurrentHashMap<>();
        this.destinations = new ConcurrentHashMap<>();
        this.pending = new StoreDurableSubscriberCursor(connectionContext.getClientId(), consumerInfo.getSubscriptionName(), broker.getTempDataStore(), consumerInfo.getPrefetchSize(), this);
        this.usageManager = systemUsage;
        this.keepDurableSubsActive = z;
        this.subscriptionKey = new SubscriptionKey(connectionContext.getClientId(), consumerInfo.getSubscriptionName());
    }

    public synchronized boolean isActive() {
        return this.active;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected synchronized boolean isFull() {
        return !this.active || super.isFull();
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void gc() {
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void add(ConnectionContext connectionContext, Destination destination) throws Exception {
        super.add(connectionContext, destination);
        this.destinations.put(destination.getActiveMQDestination(), destination);
        if (this.active || this.keepDurableSubsActive) {
            Topic topic = (Topic) destination;
            topic.activate(connectionContext, this);
            if (this.pending.isEmpty(topic)) {
                topic.recoverRetroactiveMessages(connectionContext, this);
            }
        }
        dispatchMatched();
    }

    public synchronized void activate(SystemUsage systemUsage, ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        LOG.debug("Activating " + this);
        if (this.active) {
            return;
        }
        this.active = true;
        this.context = connectionContext;
        this.info = consumerInfo;
        if (!this.keepDurableSubsActive) {
            Iterator<Destination> it = this.destinations.values().iterator();
            while (it.hasNext()) {
                ((Topic) it.next()).activate(connectionContext, this);
            }
        }
        this.pending.setSystemUsage(systemUsage);
        this.pending.start();
        if (this.pending.isEmpty()) {
            Iterator<Destination> it2 = this.destinations.values().iterator();
            while (it2.hasNext()) {
                ((Topic) it2.next()).recoverRetroactiveMessages(connectionContext, this);
            }
        }
        dispatchMatched();
        this.usageManager.getMemoryUsage().addUsageListener(this);
    }

    /* JADX WARN: Finally extract failed */
    public synchronized void deactivate(boolean z) throws Exception {
        this.active = false;
        this.usageManager.getMemoryUsage().removeUsageListener(this);
        synchronized (this.pending) {
            this.pending.stop();
        }
        if (!z) {
            Iterator<Destination> it = this.destinations.values().iterator();
            while (it.hasNext()) {
                ((Topic) it.next()).deactivate(this.context, this);
            }
        }
        Iterator<MessageReference> it2 = this.dispatched.iterator();
        while (it2.hasNext()) {
            MessageReference next = it2.next();
            Integer num = this.redeliveredMessages.get(next.getMessageId());
            if (num != null) {
                this.redeliveredMessages.put(next.getMessageId(), Integer.valueOf(num.intValue() + 1));
            } else {
                this.redeliveredMessages.put(next.getMessageId(), 1);
            }
            if (z && this.pending.isTransient()) {
                synchronized (this.pending) {
                    this.pending.addMessageFirst(next);
                }
            } else {
                next.decrementReferenceCount();
            }
            it2.remove();
        }
        if (!z && this.pending.isTransient()) {
            synchronized (this.pending) {
                try {
                    this.pending.reset();
                    while (this.pending.hasNext()) {
                        this.pending.next().decrementReferenceCount();
                        this.pending.remove();
                    }
                    this.pending.release();
                } catch (Throwable th) {
                    this.pending.release();
                    throw th;
                }
            }
        }
        this.prefetchExtension = 0;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected MessageDispatch createMessageDispatch(MessageReference messageReference, Message message) {
        MessageDispatch createMessageDispatch = super.createMessageDispatch(messageReference, message);
        Integer num = this.redeliveredMessages.get(messageReference.getMessageId());
        if (num != null) {
            createMessageDispatch.setRedeliveryCounter(num.intValue());
        }
        return createMessageDispatch;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void add(MessageReference messageReference) throws Exception {
        if (this.active || this.keepDurableSubsActive) {
            messageReference.incrementReferenceCount();
            super.add(messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription
    protected synchronized void doAddRecoveredMessage(MessageReference messageReference) throws Exception {
        this.pending.addRecoveredMessage(messageReference);
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized int getPendingQueueSize() {
        if (this.active || this.keepDurableSubsActive) {
            return super.getPendingQueueSize();
        }
        return 0;
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public void setSelector(String str) throws InvalidSelectorException {
        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected synchronized boolean canDispatch(MessageReference messageReference) {
        return this.active;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected synchronized void acknowledge(ConnectionContext connectionContext, MessageAck messageAck, MessageReference messageReference) throws IOException {
        messageReference.getRegionDestination().acknowledge(connectionContext, this, messageAck, messageReference);
        this.redeliveredMessages.remove(messageReference.getMessageId());
        messageReference.decrementReferenceCount();
    }

    public String getSubscriptionName() {
        return this.subscriptionKey.getSubscriptionName();
    }

    public synchronized String toString() {
        return "DurableTopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", total=" + this.enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + this.dispatchCounter + ", inflight=" + this.dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
    }

    public String getClientId() {
        return this.subscriptionKey.getClientId();
    }

    public SubscriptionKey getSubscriptionKey() {
        return this.subscriptionKey;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void destroy() {
        try {
            synchronized (this.pending) {
                this.pending.reset();
                while (this.pending.hasNext()) {
                    this.pending.next().decrementReferenceCount();
                }
            }
            Iterator<MessageReference> it = this.dispatched.iterator();
            while (it.hasNext()) {
                it.next().decrementReferenceCount();
            }
            this.dispatched.clear();
        } finally {
            this.pending.release();
            this.pending.clear();
        }
    }

    @Override // org.apache.activemq.usage.UsageListener
    public void onUsageChanged(Usage usage, int i, int i2) {
        if (i <= i2 || i < 90) {
            return;
        }
        try {
            dispatchMatched();
        } catch (IOException e) {
            LOG.warn("problem calling dispatchMatched", e);
        }
    }
}
