package org.apache.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.util.Iterator;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey;

/* loaded from: input_file:WEB-INF/lib/activemq-core-fuse-4.1.2.4.jar:org/apache/activemq/broker/region/DurableTopicSubscription.class */
public class DurableTopicSubscription extends PrefetchSubscription {
    private final ConcurrentHashMap redeliveredMessages;
    private final ConcurrentHashMap destinations;
    private final SubscriptionKey subscriptionKey;
    private final boolean keepDurableSubsActive;
    private boolean active;

    public DurableTopicSubscription(Broker broker, ConnectionContext connectionContext, ConsumerInfo consumerInfo, boolean z, PendingMessageCursor pendingMessageCursor) throws InvalidSelectorException {
        super(broker, connectionContext, consumerInfo, pendingMessageCursor);
        this.redeliveredMessages = new ConcurrentHashMap();
        this.destinations = new ConcurrentHashMap();
        this.active = false;
        this.keepDurableSubsActive = z;
        this.subscriptionKey = new SubscriptionKey(connectionContext.getClientId(), consumerInfo.getSubscriptionName());
    }

    public synchronized boolean isActive() {
        return this.active;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected boolean isFull() {
        return !this.active || super.isFull();
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void gc() {
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void add(ConnectionContext connectionContext, Destination destination) throws Exception {
        super.add(connectionContext, destination);
        this.destinations.put(destination.getActiveMQDestination(), destination);
        if (this.active || this.keepDurableSubsActive) {
            ((Topic) destination).activate(connectionContext, this);
        }
        dispatchMatched();
    }

    public synchronized void activate(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
        if (this.active) {
            return;
        }
        this.active = true;
        this.context = connectionContext;
        this.info = consumerInfo;
        if (!this.keepDurableSubsActive) {
            Iterator it = this.destinations.values().iterator();
            while (it.hasNext()) {
                ((Topic) it.next()).activate(connectionContext, this);
            }
        }
        this.pending.start();
        dispatchMatched();
    }

    public synchronized void deactivate(boolean z) throws Exception {
        this.active = false;
        this.pending.stop();
        if (!z) {
            Iterator it = this.destinations.values().iterator();
            while (it.hasNext()) {
                ((Topic) it.next()).deactivate(this.context, this);
            }
        }
        Iterator it2 = this.dispatched.iterator();
        while (it2.hasNext()) {
            MessageReference messageReference = (MessageReference) it2.next();
            Integer num = (Integer) this.redeliveredMessages.get(messageReference.getMessageId());
            if (num != null) {
                this.redeliveredMessages.put(messageReference.getMessageId(), new Integer(num.intValue() + 1));
            } else {
                this.redeliveredMessages.put(messageReference.getMessageId(), new Integer(1));
            }
            if (z) {
                synchronized (this.pending) {
                    this.pending.addMessageFirst(messageReference);
                }
            } else {
                messageReference.decrementReferenceCount();
            }
            it2.remove();
        }
        if (!z) {
            synchronized (this.pending) {
                this.pending.reset();
                while (this.pending.hasNext()) {
                    this.pending.next().decrementReferenceCount();
                    this.pending.remove();
                }
            }
        }
        this.prefetchExtension = 0;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected MessageDispatch createMessageDispatch(MessageReference messageReference, Message message) {
        MessageDispatch createMessageDispatch = super.createMessageDispatch(messageReference, message);
        Integer num = (Integer) this.redeliveredMessages.get(messageReference.getMessageId());
        if (num != null) {
            createMessageDispatch.setRedeliveryCounter(num.intValue());
        }
        return createMessageDispatch;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void add(MessageReference messageReference) throws Exception {
        if (this.active || this.keepDurableSubsActive) {
            messageReference.incrementReferenceCount();
            super.add(messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public int getPendingQueueSize() {
        if (this.active || this.keepDurableSubsActive) {
            return super.getPendingQueueSize();
        }
        return 0;
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public void setSelector(String str) throws InvalidSelectorException {
        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected boolean canDispatch(MessageReference messageReference) {
        return this.active;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected void acknowledge(ConnectionContext connectionContext, MessageAck messageAck, MessageReference messageReference) throws IOException {
        messageReference.getRegionDestination().acknowledge(connectionContext, this, messageAck, messageReference);
        this.redeliveredMessages.remove(messageReference.getMessageId());
        messageReference.decrementReferenceCount();
    }

    public String getSubscriptionName() {
        return this.subscriptionKey.getSubscriptionName();
    }

    public String toString() {
        return new StringBuffer().append("DurableTopicSubscription: consumer=").append(this.info.getConsumerId()).append(", destinations=").append(this.destinations.size()).append(", dispatched=").append(this.dispatched.size()).append(", delivered=").append(this.prefetchExtension).append(", pending=").append(getPendingQueueSize()).toString();
    }

    public String getClientId() {
        return this.subscriptionKey.getClientId();
    }

    public SubscriptionKey getSubscriptionKey() {
        return this.subscriptionKey;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void destroy() {
        synchronized (this.pending) {
            this.pending.reset();
            while (this.pending.hasNext()) {
                this.pending.next().decrementReferenceCount();
            }
            this.pending.clear();
        }
        Iterator it = this.dispatched.iterator();
        while (it.hasNext()) {
            ((MessageReference) it.next()).decrementReferenceCount();
        }
        this.dispatched.clear();
    }
}
