package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630310-07.jar:org/apache/activemq/broker/region/DurableTopicSubscription.class */
public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DurableTopicSubscription.class);
    private final ConcurrentMap<MessageId, Integer> redeliveredMessages;
    private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations;
    private final SubscriptionKey subscriptionKey;
    private final boolean keepDurableSubsActive;
    private final AtomicBoolean active;
    private final AtomicLong offlineTimestamp;

    public DurableTopicSubscription(Broker broker, SystemUsage systemUsage, ConnectionContext connectionContext, ConsumerInfo consumerInfo, boolean z) throws JMSException {
        super(broker, systemUsage, connectionContext, consumerInfo);
        this.redeliveredMessages = new ConcurrentHashMap();
        this.durableDestinations = new ConcurrentHashMap();
        this.active = new AtomicBoolean();
        this.offlineTimestamp = new AtomicLong(-1L);
        this.pending = new StoreDurableSubscriberCursor(broker, connectionContext.getClientId(), consumerInfo.getSubscriptionName(), consumerInfo.getPrefetchSize(), this);
        this.pending.setSystemUsage(systemUsage);
        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
        this.keepDurableSubsActive = z;
        this.subscriptionKey = new SubscriptionKey(connectionContext.getClientId(), consumerInfo.getSubscriptionName());
    }

    public final boolean isActive() {
        return this.active.get();
    }

    public final long getOfflineTimestamp() {
        return this.offlineTimestamp.get();
    }

    public void setOfflineTimestamp(long j) {
        this.offlineTimestamp.set(j);
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public boolean isFull() {
        return !this.active.get() || super.isFull();
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public void gc() {
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public void unmatched(MessageReference messageReference) throws IOException {
        MessageAck messageAck = new MessageAck();
        messageAck.setAckType((byte) 5);
        messageAck.setMessageID(messageReference.getMessageId());
        ((Destination) messageReference.getRegionDestination()).acknowledge(getContext(), this, messageAck, messageReference);
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected void setPendingBatchSize(PendingMessageCursor pendingMessageCursor, int i) {
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public void add(ConnectionContext connectionContext, Destination destination) throws Exception {
        if (!this.destinations.contains(destination)) {
            super.add(connectionContext, destination);
        }
        if (this.durableDestinations.containsKey(destination.getActiveMQDestination())) {
            return;
        }
        this.durableDestinations.put(destination.getActiveMQDestination(), destination);
        if (this.active.get() || this.keepDurableSubsActive) {
            ((Topic) destination).activate(connectionContext, this);
            getSubscriptionStatistics().getEnqueues().add(this.pending.size());
        } else if (destination.getMessageStore() != null) {
            try {
                getSubscriptionStatistics().getEnqueues().add(((TopicMessageStore) destination.getMessageStore()).getMessageCount(this.subscriptionKey.getClientId(), this.subscriptionKey.getSubscriptionName()));
            } catch (IOException e) {
                JMSException jMSException = new JMSException("Failed to retrieve enqueueCount from store " + e);
                jMSException.setLinkedException(e);
                throw jMSException;
            }
        }
        dispatchPending();
    }

    public boolean isEmpty(Topic topic) {
        return this.pending.isEmpty(topic);
    }

    public void activate(SystemUsage systemUsage, ConnectionContext connectionContext, ConsumerInfo consumerInfo, RegionBroker regionBroker) throws Exception {
        PolicyEntry entryFor;
        if (this.active.get()) {
            return;
        }
        this.context = connectionContext;
        this.info = consumerInfo;
        LOG.debug("Activating {}", this);
        if (!this.keepDurableSubsActive) {
            Iterator<Destination> it = this.durableDestinations.values().iterator();
            while (it.hasNext()) {
                Topic topic = (Topic) it.next();
                add(connectionContext, topic);
                topic.activate(connectionContext, this);
            }
            ActiveMQDestination destination = this.info.getDestination();
            if (destination != null && regionBroker.getDestinationPolicy() != null && (entryFor = regionBroker.getDestinationPolicy().getEntryFor(destination)) != null) {
                entryFor.configure(this.broker, this.usageManager, this);
            }
        }
        synchronized (this.pendingLock) {
            if (!((AbstractPendingMessageCursor) this.pending).isStarted() || !this.keepDurableSubsActive) {
                this.pending.setSystemUsage(systemUsage);
                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
                this.pending.setMaxAuditDepth(getMaxAuditDepth());
                this.pending.setMaxProducersToAudit(getMaxProducersToAudit());
                this.pending.start();
            }
            Iterator<Destination> it2 = this.durableDestinations.values().iterator();
            while (it2.hasNext()) {
                Topic topic2 = (Topic) it2.next();
                if (topic2.isAlwaysRetroactive() || consumerInfo.isRetroactive()) {
                    topic2.recoverRetroactiveMessages(connectionContext, this);
                }
            }
        }
        this.active.set(true);
        this.offlineTimestamp.set(-1L);
        dispatchPending();
        this.usageManager.getMemoryUsage().addUsageListener(this);
    }

    /* JADX WARN: Finally extract failed */
    public void deactivate(boolean z, long j) throws Exception {
        ArrayList arrayList;
        LOG.debug("Deactivating keepActive={}, {}", Boolean.valueOf(z), this);
        this.active.set(false);
        this.offlineTimestamp.set(System.currentTimeMillis());
        this.usageManager.getMemoryUsage().removeUsageListener(this);
        ArrayList arrayList2 = new ArrayList();
        synchronized (this.pendingLock) {
            if (!z) {
                this.pending.stop();
            }
            synchronized (this.dispatchLock) {
                Iterator<Destination> it = this.durableDestinations.values().iterator();
                while (it.hasNext()) {
                    Topic topic = (Topic) it.next();
                    if (z) {
                        topic.getDestinationStatistics().getInflight().subtract(this.dispatched.size());
                    } else {
                        arrayList2.add(topic);
                    }
                }
                Collections.reverse(this.dispatched);
                for (MessageReference messageReference : this.dispatched) {
                    if (j == 0 || (j > 0 && messageReference.getMessageId().getBrokerSequenceId() <= j)) {
                        Integer num = this.redeliveredMessages.get(messageReference.getMessageId());
                        if (num != null) {
                            this.redeliveredMessages.put(messageReference.getMessageId(), Integer.valueOf(num.intValue() + 1));
                        } else {
                            this.redeliveredMessages.put(messageReference.getMessageId(), 1);
                        }
                    }
                    if (z && this.pending.isTransient()) {
                        this.pending.addMessageFirst(messageReference);
                        this.pending.rollback(messageReference.getMessageId());
                    }
                    messageReference.decrementReferenceCount();
                }
                arrayList = arrayList2.isEmpty() ? null : new ArrayList(this.dispatched);
                this.dispatched.clear();
                getSubscriptionStatistics().getInflightMessageSize().reset();
            }
            if (!z && this.pending.isTransient()) {
                try {
                    this.pending.reset();
                    while (this.pending.hasNext()) {
                        this.pending.next().decrementReferenceCount();
                        this.pending.remove();
                    }
                    this.pending.release();
                } catch (Throwable th) {
                    this.pending.release();
                    throw th;
                }
            }
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ((Topic) it2.next()).deactivate(this.context, this, arrayList);
        }
        this.prefetchExtension.set(0);
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected MessageDispatch createMessageDispatch(MessageReference messageReference, Message message) {
        MessageDispatch createMessageDispatch = super.createMessageDispatch(messageReference, message);
        if (messageReference != QueueMessageReference.NULL_MESSAGE) {
            messageReference.incrementReferenceCount();
            Integer num = this.redeliveredMessages.get(messageReference.getMessageId());
            if (num != null) {
                createMessageDispatch.setRedeliveryCounter(num.intValue());
            }
        }
        return createMessageDispatch;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public void add(MessageReference messageReference) throws Exception {
        if (this.active.get() || this.keepDurableSubsActive) {
            super.add(messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    public void dispatchPending() throws IOException {
        if (isActive()) {
            super.dispatchPending();
        }
    }

    public void removePending(MessageReference messageReference) throws IOException {
        this.pending.remove(messageReference);
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription
    protected void doAddRecoveredMessage(MessageReference messageReference) throws Exception {
        synchronized (this.pending) {
            this.pending.addRecoveredMessage(messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription, org.apache.activemq.broker.region.Subscription
    public int getPendingQueueSize() {
        if (this.active.get() || this.keepDurableSubsActive) {
            return super.getPendingQueueSize();
        }
        return 0;
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public void setSelector(String str) throws InvalidSelectorException {
        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected boolean canDispatch(MessageReference messageReference) {
        return true;
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected void acknowledge(ConnectionContext connectionContext, MessageAck messageAck, MessageReference messageReference) throws IOException {
        setTimeOfLastMessageAck(System.currentTimeMillis());
        ((Destination) messageReference.getRegionDestination()).acknowledge(connectionContext, this, messageAck, messageReference);
        this.redeliveredMessages.remove(messageReference.getMessageId());
        messageReference.decrementReferenceCount();
        ((Destination) messageReference.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
        if (this.info.isNetworkSubscription()) {
            ((Destination) messageReference.getRegionDestination()).getDestinationStatistics().getForwards().add(messageAck.getMessageCount());
        }
    }

    public synchronized String toString() {
        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + this.info.getConsumerId() + ", active=" + isActive() + ", destinations=" + this.durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + this.dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
    }

    public SubscriptionKey getSubscriptionKey() {
        return this.subscriptionKey;
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.activemq.broker.region.Subscription
    public void destroy() {
        synchronized (this.pendingLock) {
            try {
                this.pending.reset();
                while (this.pending.hasNext()) {
                    this.pending.next().decrementReferenceCount();
                }
                this.pending.release();
                this.pending.clear();
            } catch (Throwable th) {
                this.pending.release();
                this.pending.clear();
                throw th;
            }
        }
        synchronized (this.dispatchLock) {
            Iterator<MessageReference> it = this.dispatched.iterator();
            while (it.hasNext()) {
                it.next().decrementReferenceCount();
            }
            this.dispatched.clear();
        }
        setSlowConsumer(false);
    }

    @Override // org.apache.activemq.usage.UsageListener
    public void onUsageChanged(Usage usage, int i, int i2) {
        if (i <= i2 || i < 90) {
            return;
        }
        try {
            dispatchPending();
        } catch (IOException e) {
            LOG.warn("problem calling dispatchMatched", (Throwable) e);
        }
    }

    @Override // org.apache.activemq.broker.region.PrefetchSubscription
    protected boolean isDropped(MessageReference messageReference) {
        return false;
    }

    public boolean isKeepDurableSubsActive() {
        return this.keepDurableSubsActive;
    }
}
