package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630504.jar:org/apache/activemq/broker/region/TopicSubscription.class */
public class TopicSubscription extends AbstractSubscription {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TopicSubscription.class);
    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
    protected PendingMessageCursor matched;
    protected final SystemUsage usageManager;
    boolean singleDestination;
    Destination destination;
    private final Scheduler scheduler;
    private int maximumPendingMessages;
    private MessageEvictionStrategy messageEvictionStrategy;
    private int discarded;
    private final Object matchedListMutex;
    private int memoryUsageHighWaterMark;
    protected int maxProducersToAudit;
    protected int maxAuditDepth;
    protected boolean enableAudit;
    protected ActiveMQMessageAudit audit;
    protected boolean active;
    protected boolean discarding;
    protected final Object dispatchLock;
    protected final List<MessageReference> dispatched;

    public TopicSubscription(Broker broker, ConnectionContext connectionContext, ConsumerInfo consumerInfo, SystemUsage systemUsage) throws Exception {
        super(broker, connectionContext, consumerInfo);
        this.singleDestination = true;
        this.maximumPendingMessages = -1;
        this.messageEvictionStrategy = new OldestMessageEvictionStrategy();
        this.matchedListMutex = new Object();
        this.memoryUsageHighWaterMark = 95;
        this.maxProducersToAudit = 1024;
        this.maxAuditDepth = 1000;
        this.enableAudit = false;
        this.active = false;
        this.discarding = false;
        this.dispatchLock = new Object();
        this.dispatched = new ArrayList();
        this.usageManager = systemUsage;
        String str = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + PropertyAccessor.PROPERTY_KEY_PREFIX + consumerInfo.getConsumerId().toString() + PropertyAccessor.PROPERTY_KEY_SUFFIX;
        if (consumerInfo.getDestination().isTemporary() || broker.getTempDataStore() == null) {
            this.matched = new VMPendingMessageCursor(false);
        } else {
            this.matched = new FilePendingMessageCursor(broker, str, false);
        }
        this.scheduler = broker.getScheduler();
    }

    public void init() throws Exception {
        this.matched.setSystemUsage(this.usageManager);
        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
        this.matched.start();
        if (this.enableAudit) {
            this.audit = new ActiveMQMessageAudit(this.maxAuditDepth, this.maxProducersToAudit);
        }
        this.active = true;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void add(MessageReference messageReference) throws Exception {
        LinkedList<MessageReference> pageInList;
        MessageReference[] evictMessages;
        if (isDuplicate(messageReference)) {
            return;
        }
        IndirectMessageReference indirectMessageReference = new IndirectMessageReference(messageReference.getMessage());
        getSubscriptionStatistics().getEnqueues().increment();
        synchronized (this.matchedListMutex) {
            if (this.discarding) {
                return;
            }
            if (isFull() || !this.matched.isEmpty()) {
                if (this.info.getPrefetchSize() > 1 && this.matched.size() > this.info.getPrefetchSize() && !isSlowConsumer()) {
                    LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
                    setSlowConsumer(true);
                    Iterator<Destination> it = this.destinations.iterator();
                    while (it.hasNext()) {
                        it.next().slowConsumer(getContext(), this);
                    }
                }
                if (this.maximumPendingMessages != 0) {
                    boolean z = false;
                    while (this.active) {
                        while (this.matched.isFull()) {
                            if (getContext().getStopping().get()) {
                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), indirectMessageReference.getMessageId());
                                getSubscriptionStatistics().getEnqueues().decrement();
                                return;
                            } else {
                                if (!z) {
                                    LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.", toString(), this.matched, Integer.valueOf(this.matched.getSystemUsage().getTempUsage().getPercentUsage()), Integer.valueOf(this.matched.getSystemUsage().getMemoryUsage().getPercentUsage()));
                                    z = true;
                                }
                                this.matchedListMutex.wait(20L);
                            }
                        }
                        if (this.matched.tryAddMessageLast(indirectMessageReference, 10L)) {
                            break;
                        }
                    }
                    if (this.maximumPendingMessages > 0) {
                        int evictExpiredMessagesHighWatermark = this.messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
                        if (this.maximumPendingMessages > 0 && this.maximumPendingMessages < evictExpiredMessagesHighWatermark) {
                            evictExpiredMessagesHighWatermark = this.maximumPendingMessages;
                        }
                        if (!this.matched.isEmpty() && this.matched.size() > evictExpiredMessagesHighWatermark) {
                            removeExpiredMessages();
                        }
                        while (true) {
                            if (this.matched.isEmpty() || this.matched.size() <= this.maximumPendingMessages) {
                                break;
                            }
                            int max = Math.max(1000, this.matched.size() - this.maximumPendingMessages);
                            synchronized (this.matched) {
                                pageInList = this.matched.pageInList(max);
                                evictMessages = this.messageEvictionStrategy.evictMessages(pageInList);
                                Iterator<MessageReference> it2 = pageInList.iterator();
                                while (it2.hasNext()) {
                                    it2.next().decrementReferenceCount();
                                }
                            }
                            int i = 0;
                            if (evictMessages != null) {
                                for (MessageReference messageReference2 : evictMessages) {
                                    discard(messageReference2);
                                }
                            }
                            if (i == 0) {
                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", this.destination, this.messageEvictionStrategy, Integer.valueOf(pageInList.size()));
                                break;
                            }
                        }
                    }
                    dispatchMatched();
                }
            } else {
                dispatch(indirectMessageReference);
                setSlowConsumer(false);
            }
        }
    }

    private boolean isDuplicate(MessageReference messageReference) {
        boolean z = false;
        if (this.enableAudit && this.audit != null) {
            z = this.audit.isDuplicate(messageReference);
            if (LOG.isDebugEnabled() && z) {
                LOG.debug("{}, ignoring duplicate add: {}", this, messageReference.getMessageId());
            }
        }
        return z;
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0053, code lost:
    
        if (r5.broker.isExpired(r0) == false) goto L11;
     */
    /* JADX WARN: Code restructure failed: missing block: B:11:0x0056, code lost:
    
        ((org.apache.activemq.broker.region.Destination) r0.getRegionDestination()).getDestinationStatistics().getExpired().increment();
        r5.broker.messageExpired(getContext(), r0, r5);
     */
    /* JADX WARN: Code restructure failed: missing block: B:9:0x002f, code lost:
    
        r5.matched.remove();
        getSubscriptionStatistics().getDispatched().increment();
        r0.decrementReferenceCount();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void removeExpiredMessages() throws java.io.IOException {
        /*
            r5 = this;
            r0 = r5
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L8b
            r0.reset()     // Catch: java.lang.Throwable -> L8b
        L9:
            r0 = r5
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L8b
            boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> L8b
            if (r0 == 0) goto L7f
            r0 = r5
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L8b
            org.apache.activemq.broker.region.MessageReference r0 = r0.next()     // Catch: java.lang.Throwable -> L8b
            r6 = r0
            r0 = r6
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L8b
            r0 = r6
            boolean r0 = r0.isExpired()     // Catch: java.lang.Throwable -> L8b
            if (r0 == 0) goto L7c
            r0 = r5
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L8b
            r0.remove()     // Catch: java.lang.Throwable -> L8b
            r0 = r5
            org.apache.activemq.broker.region.SubscriptionStatistics r0 = r0.getSubscriptionStatistics()     // Catch: java.lang.Throwable -> L8b
            org.apache.activemq.management.CountStatisticImpl r0 = r0.getDispatched()     // Catch: java.lang.Throwable -> L8b
            r0.increment()     // Catch: java.lang.Throwable -> L8b
            r0 = r6
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L8b
            r0 = r5
            org.apache.activemq.broker.Broker r0 = r0.broker     // Catch: java.lang.Throwable -> L8b
            r1 = r6
            boolean r0 = r0.isExpired(r1)     // Catch: java.lang.Throwable -> L8b
            if (r0 == 0) goto L7f
            r0 = r6
            org.apache.activemq.command.Message$MessageDestination r0 = r0.getRegionDestination()     // Catch: java.lang.Throwable -> L8b
            org.apache.activemq.broker.region.Destination r0 = (org.apache.activemq.broker.region.Destination) r0     // Catch: java.lang.Throwable -> L8b
            org.apache.activemq.broker.region.DestinationStatistics r0 = r0.getDestinationStatistics()     // Catch: java.lang.Throwable -> L8b
            org.apache.activemq.management.CountStatisticImpl r0 = r0.getExpired()     // Catch: java.lang.Throwable -> L8b
            r0.increment()     // Catch: java.lang.Throwable -> L8b
            r0 = r5
            org.apache.activemq.broker.Broker r0 = r0.broker     // Catch: java.lang.Throwable -> L8b
            r1 = r5
            org.apache.activemq.broker.ConnectionContext r1 = r1.getContext()     // Catch: java.lang.Throwable -> L8b
            r2 = r6
            r3 = r5
            r0.messageExpired(r1, r2, r3)     // Catch: java.lang.Throwable -> L8b
            goto L7f
        L7c:
            goto L9
        L7f:
            r0 = r5
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched
            r0.release()
            goto L97
        L8b:
            r7 = move-exception
            r0 = r5
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched
            r0.release()
            r0 = r7
            throw r0
        L97:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.broker.region.TopicSubscription.removeExpiredMessages():void");
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x003d, code lost:
    
        r0 = r4.dispatchLock;
     */
    /* JADX WARN: Code restructure failed: missing block: B:12:0x0044, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0045, code lost:
    
        r4.matched.remove();
        getSubscriptionStatistics().getDispatched().increment();
        r4.dispatched.add(r0);
        getSubscriptionStatistics().getInflightMessageSize().addSize(r0.getSize());
        r0.decrementReferenceCount();
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x007d, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0090, code lost:
    
        r4.matched.release();
     */
    /* JADX WARN: Finally extract failed */
    @Override // org.apache.activemq.broker.region.Subscription
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void processMessageDispatchNotification(org.apache.activemq.command.MessageDispatchNotification r5) {
        /*
            r4 = this;
            r0 = r4
            java.lang.Object r0 = r0.matchedListMutex
            r1 = r0
            r6 = r1
            monitor-enter(r0)
            r0 = r4
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0.reset()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
        L10:
            r0 = r4
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            if (r0 == 0) goto L8f
            r0 = r4
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            org.apache.activemq.broker.region.MessageReference r0 = r0.next()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r7 = r0
            r0 = r7
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r7
            org.apache.activemq.command.MessageId r0 = r0.getMessageId()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r1 = r5
            org.apache.activemq.command.MessageId r1 = r1.getMessageId()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            if (r0 == 0) goto L8c
            r0 = r4
            java.lang.Object r0 = r0.dispatchLock     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r1 = r0
            r8 = r1
            monitor-enter(r0)     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r4
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0.remove()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r4
            org.apache.activemq.broker.region.SubscriptionStatistics r0 = r0.getSubscriptionStatistics()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            org.apache.activemq.management.CountStatisticImpl r0 = r0.getDispatched()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0.increment()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r4
            java.util.List<org.apache.activemq.broker.region.MessageReference> r0 = r0.dispatched     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r1 = r7
            boolean r0 = r0.add(r1)     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r4
            org.apache.activemq.broker.region.SubscriptionStatistics r0 = r0.getSubscriptionStatistics()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            org.apache.activemq.management.SizeStatisticImpl r0 = r0.getInflightMessageSize()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r1 = r7
            int r1 = r1.getSize()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            long r1 = (long) r1     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0.addSize(r1)     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r7
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r8
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            goto L89
        L81:
            r9 = move-exception
            r0 = r8
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L81 java.lang.Throwable -> L9b java.lang.Throwable -> Lae
            r0 = r9
            throw r0     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> Lae
        L89:
            goto L8f
        L8c:
            goto L10
        L8f:
            r0 = r4
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> Lae
            r0.release()     // Catch: java.lang.Throwable -> Lae
            goto La9
        L9b:
            r10 = move-exception
            r0 = r4
            org.apache.activemq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> Lae
            r0.release()     // Catch: java.lang.Throwable -> Lae
            r0 = r10
            throw r0     // Catch: java.lang.Throwable -> Lae
        La9:
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> Lae
            goto Lb5
        Lae:
            r11 = move-exception
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> Lae
            r0 = r11
            throw r0
        Lb5:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.broker.region.TopicSubscription.processMessageDispatchNotification(org.apache.activemq.command.MessageDispatchNotification):void");
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public synchronized void acknowledge(ConnectionContext connectionContext, MessageAck messageAck) throws Exception {
        super.acknowledge(connectionContext, messageAck);
        if (messageAck.isStandardAck()) {
            updateStatsOnAck(connectionContext, messageAck);
        } else if (messageAck.isPoisonAck()) {
            if (messageAck.isInTransaction()) {
                throw new JMSException("Poison ack cannot be transacted: " + messageAck);
            }
            updateStatsOnAck(connectionContext, messageAck);
            contractPrefetchExtension(messageAck.getMessageCount());
        } else if (messageAck.isIndividualAck()) {
            updateStatsOnAck(connectionContext, messageAck);
            if (messageAck.isInTransaction()) {
                expandPrefetchExtension(1);
            }
        } else if (messageAck.isExpiredAck()) {
            updateStatsOnAck(messageAck);
            contractPrefetchExtension(messageAck.getMessageCount());
        } else {
            if (!messageAck.isDeliveredAck()) {
                if (!messageAck.isRedeliveredAck()) {
                    throw new JMSException("Invalid acknowledgment: " + messageAck);
                }
                return;
            }
            expandPrefetchExtension(messageAck.getMessageCount());
        }
        dispatchMatched();
    }

    private void updateStatsOnAck(ConnectionContext connectionContext, final MessageAck messageAck) {
        if (connectionContext.isInTransaction()) {
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.broker.region.TopicSubscription.1
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() {
                    TopicSubscription.this.contractPrefetchExtension(messageAck.getMessageCount());
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    TopicSubscription.this.contractPrefetchExtension(messageAck.getMessageCount());
                    TopicSubscription.this.updateStatsOnAck(messageAck);
                    TopicSubscription.this.dispatchMatched();
                }
            });
        } else {
            updateStatsOnAck(messageAck);
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public Response pullMessage(ConnectionContext connectionContext, final MessagePull messagePull) throws Exception {
        if (getPrefetchSize() != 0) {
            return null;
        }
        final long count = getSubscriptionStatistics().getDispatched().getCount();
        this.prefetchExtension.set(messagePull.getQuantity());
        dispatchMatched();
        if (count != getSubscriptionStatistics().getDispatched().getCount() && !messagePull.isAlwaysSignalDone()) {
            return null;
        }
        if (messagePull.getTimeout() == -1) {
            dispatch(null);
            this.prefetchExtension.set(0);
        }
        if (messagePull.getTimeout() <= 0) {
            return null;
        }
        this.scheduler.executeAfterDelay(new Runnable() { // from class: org.apache.activemq.broker.region.TopicSubscription.2
            @Override // java.lang.Runnable
            public void run() {
                TopicSubscription.this.pullTimeout(count, messagePull.isAlwaysSignalDone());
            }
        }, messagePull.getTimeout());
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void pullTimeout(long j, boolean z) {
        synchronized (this.matchedListMutex) {
            if (j == getSubscriptionStatistics().getDispatched().getCount() || z) {
                try {
                    try {
                        dispatch(null);
                        this.prefetchExtension.set(0);
                    } catch (Exception e) {
                        this.context.getConnection().serviceException(e);
                        this.prefetchExtension.set(0);
                    }
                } catch (Throwable th) {
                    this.prefetchExtension.set(0);
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateStatsOnAck(MessageAck messageAck) {
        synchronized (this.dispatchLock) {
            boolean z = false;
            ArrayList<MessageReference> arrayList = new ArrayList();
            for (MessageReference messageReference : this.dispatched) {
                MessageId messageId = messageReference.getMessageId();
                if (messageAck.getFirstMessageId() == null || messageAck.getFirstMessageId().equals(messageId)) {
                    z = true;
                }
                if (z) {
                    arrayList.add(messageReference);
                    if (messageAck.getLastMessageId().equals(messageId)) {
                        break;
                    }
                }
            }
            for (MessageReference messageReference2 : arrayList) {
                this.dispatched.remove(messageReference2);
                getSubscriptionStatistics().getInflightMessageSize().addSize(-messageReference2.getSize());
                getSubscriptionStatistics().getDequeues().increment();
                ((Destination) messageReference2.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
                ((Destination) messageReference2.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                if (this.info.isNetworkSubscription()) {
                    ((Destination) messageReference2.getRegionDestination()).getDestinationStatistics().getForwards().add(messageAck.getMessageCount());
                }
                if (messageAck.isExpiredAck()) {
                    this.destination.getDestinationStatistics().getExpired().add(messageAck.getMessageCount());
                }
                if (!messageAck.isInTransaction()) {
                    contractPrefetchExtension(1);
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getPendingQueueSize() {
        return matched();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getDispatchedQueueSize() {
        return (int) (getSubscriptionStatistics().getDispatched().getCount() - getSubscriptionStatistics().getDequeues().getCount());
    }

    public int getMaximumPendingMessages() {
        return this.maximumPendingMessages;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getDispatchedCounter() {
        return getSubscriptionStatistics().getDispatched().getCount();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getEnqueueCounter() {
        return getSubscriptionStatistics().getEnqueues().getCount();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getDequeueCounter() {
        return getSubscriptionStatistics().getDequeues().getCount();
    }

    public int discarded() {
        int i;
        synchronized (this.matchedListMutex) {
            i = this.discarded;
        }
        return i;
    }

    public int matched() {
        int size;
        synchronized (this.matchedListMutex) {
            size = this.matched.size();
        }
        return size;
    }

    public void setMaximumPendingMessages(int i) {
        this.maximumPendingMessages = i;
    }

    public MessageEvictionStrategy getMessageEvictionStrategy() {
        return this.messageEvictionStrategy;
    }

    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
        this.messageEvictionStrategy = messageEvictionStrategy;
    }

    public int getMaxProducersToAudit() {
        return this.maxProducersToAudit;
    }

    public synchronized void setMaxProducersToAudit(int i) {
        this.maxProducersToAudit = i;
        if (this.audit != null) {
            this.audit.setMaximumNumberOfProducersToTrack(i);
        }
    }

    public int getMaxAuditDepth() {
        return this.maxAuditDepth;
    }

    public synchronized void setMaxAuditDepth(int i) {
        this.maxAuditDepth = i;
        if (this.audit != null) {
            this.audit.setAuditDepth(i);
        }
    }

    public boolean isEnableAudit() {
        return this.enableAudit;
    }

    public synchronized void setEnableAudit(boolean z) {
        this.enableAudit = z;
        if (z && this.audit == null) {
            this.audit = new ActiveMQMessageAudit(this.maxAuditDepth, this.maxProducersToAudit);
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isFull() {
        return getPrefetchSize() == 0 ? this.prefetchExtension.get() == 0 : getDispatchedQueueSize() - this.prefetchExtension.get() >= this.info.getPrefetchSize();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getInFlightSize() {
        return getDispatchedQueueSize();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isLowWaterMark() {
        return ((double) (getDispatchedQueueSize() - this.prefetchExtension.get())) <= ((double) this.info.getPrefetchSize()) * 0.4d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isHighWaterMark() {
        return ((double) (getDispatchedQueueSize() - this.prefetchExtension.get())) >= ((double) this.info.getPrefetchSize()) * 0.9d;
    }

    public void setMemoryUsageHighWaterMark(int i) {
        this.memoryUsageHighWaterMark = i;
    }

    public int getMemoryUsageHighWaterMark() {
        return this.memoryUsageHighWaterMark;
    }

    public SystemUsage getUsageManager() {
        return this.usageManager;
    }

    public PendingMessageCursor getMatched() {
        return this.matched;
    }

    public void setMatched(PendingMessageCursor pendingMessageCursor) {
        this.matched = pendingMessageCursor;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void updateConsumerPrefetch(int i) {
        if (this.context == null || this.context.getConnection() == null || !this.context.getConnection().isManageable()) {
            return;
        }
        ConsumerControl consumerControl = new ConsumerControl();
        consumerControl.setConsumerId(this.info.getConsumerId());
        consumerControl.setPrefetch(i);
        this.context.getConnection().dispatchAsync(consumerControl);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Finally extract failed */
    public void dispatchMatched() throws IOException {
        synchronized (this.matchedListMutex) {
            if (!this.matched.isEmpty() && !isFull()) {
                try {
                    this.matched.reset();
                    while (this.matched.hasNext() && !isFull()) {
                        MessageReference next = this.matched.next();
                        next.decrementReferenceCount();
                        this.matched.remove();
                        if (next.isExpired()) {
                            discard(next);
                        } else {
                            dispatch(next);
                        }
                    }
                    this.matched.release();
                } catch (Throwable th) {
                    this.matched.release();
                    throw th;
                }
            }
        }
    }

    private void dispatch(final MessageReference messageReference) throws IOException {
        Message message = messageReference != null ? messageReference.getMessage() : null;
        if (messageReference != null) {
            messageReference.incrementReferenceCount();
        }
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setMessage(message);
        messageDispatch.setConsumerId(this.info.getConsumerId());
        if (messageReference != null) {
            messageDispatch.setDestination(((Destination) messageReference.getRegionDestination()).getActiveMQDestination());
            synchronized (this.dispatchLock) {
                getSubscriptionStatistics().getDispatched().increment();
                this.dispatched.add(messageReference);
                getSubscriptionStatistics().getInflightMessageSize().addSize(messageReference.getSize());
            }
            if (this.singleDestination) {
                if (this.destination == null) {
                    this.destination = (Destination) messageReference.getRegionDestination();
                } else if (this.destination != messageReference.getRegionDestination()) {
                    this.singleDestination = false;
                }
            }
            if (getPrefetchSize() == 0) {
                decrementPrefetchExtension(1);
            }
        }
        if (this.info.isDispatchAsync()) {
            if (messageReference != null) {
                messageDispatch.setTransmitCallback(new TransmitCallback() { // from class: org.apache.activemq.broker.region.TopicSubscription.3
                    @Override // org.apache.activemq.transport.TransmitCallback
                    public void onSuccess() {
                        Destination destination = (Destination) messageReference.getRegionDestination();
                        destination.getDestinationStatistics().getDispatched().increment();
                        destination.getDestinationStatistics().getInflight().increment();
                        messageReference.decrementReferenceCount();
                    }

                    @Override // org.apache.activemq.transport.TransmitCallback
                    public void onFailure() {
                        Destination destination = (Destination) messageReference.getRegionDestination();
                        destination.getDestinationStatistics().getDispatched().increment();
                        destination.getDestinationStatistics().getInflight().increment();
                        messageReference.decrementReferenceCount();
                    }
                });
            }
            this.context.getConnection().dispatchAsync(messageDispatch);
            return;
        }
        this.context.getConnection().dispatchSync(messageDispatch);
        if (messageReference != null) {
            Destination destination = (Destination) messageReference.getRegionDestination();
            destination.getDestinationStatistics().getDispatched().increment();
            destination.getDestinationStatistics().getInflight().increment();
            messageReference.decrementReferenceCount();
        }
    }

    private void discard(MessageReference messageReference) {
        this.discarding = true;
        try {
            messageReference.decrementReferenceCount();
            this.matched.remove(messageReference);
            this.discarded++;
            if (this.destination != null) {
                this.destination.getDestinationStatistics().getDequeues().increment();
            }
            LOG.debug("{}, discarding message {}", this, messageReference);
            Destination destination = (Destination) messageReference.getRegionDestination();
            if (destination != null) {
                destination.messageDiscarded(getContext(), this, messageReference);
            }
            this.broker.getRoot().sendToDeadLetterQueue(getContext(), messageReference, this, new Throwable("TopicSubDiscard. ID:" + this.info.getConsumerId()));
            this.discarding = false;
        } catch (Throwable th) {
            this.discarding = false;
            throw th;
        }
    }

    public String toString() {
        return "TopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + this.prefetchExtension.get() + ", usePrefetchExtension=" + isUsePrefetchExtension();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void destroy() {
        this.active = false;
        synchronized (this.matchedListMutex) {
            try {
                this.matched.destroy();
            } catch (Exception e) {
                LOG.warn("Failed to destroy cursor", (Throwable) e);
            }
        }
        setSlowConsumer(false);
        synchronized (this.dispatchLock) {
            this.dispatched.clear();
        }
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public int getPrefetchSize() {
        return this.info.getPrefetchSize();
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription
    public void setPrefetchSize(int i) {
        this.info.setPrefetchSize(i);
        try {
            dispatchMatched();
        } catch (Exception e) {
            LOG.trace("Caught exception on dispatch after prefetch size change.");
        }
    }
}
