package org.apache.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/TopicSubscription.class */
public class TopicSubscription extends AbstractSubscription {
    private static final Log log;
    protected final LinkedList matched;
    protected final ActiveMQDestination dlqDestination;
    protected final UsageManager usageManager;
    protected AtomicLong dispatched;
    protected AtomicLong delivered;
    private int maximumPendingMessages;
    private MessageEvictionStrategy messageEvictionStrategy;
    private int discarded;
    private final Object matchedListMutex;
    private final AtomicLong enqueueCounter;
    private final AtomicLong dequeueCounter;
    boolean singleDestination;
    Destination destination;
    static Class class$org$apache$activemq$broker$region$TopicSubscription;

    public TopicSubscription(Broker broker, ConnectionContext connectionContext, ConsumerInfo consumerInfo, UsageManager usageManager) throws InvalidSelectorException {
        super(broker, connectionContext, consumerInfo);
        this.matched = new LinkedList();
        this.dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
        this.dispatched = new AtomicLong();
        this.delivered = new AtomicLong();
        this.maximumPendingMessages = -1;
        this.messageEvictionStrategy = new OldestMessageEvictionStrategy();
        this.discarded = 0;
        this.matchedListMutex = new Object();
        this.enqueueCounter = new AtomicLong(0L);
        this.dequeueCounter = new AtomicLong(0L);
        this.singleDestination = true;
        this.usageManager = usageManager;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void add(MessageReference messageReference) throws InterruptedException, IOException {
        this.enqueueCounter.incrementAndGet();
        messageReference.incrementReferenceCount();
        if (!isFull() && !isSlaveBroker()) {
            optimizePrefetch();
            dispatch(messageReference);
            return;
        }
        if (this.maximumPendingMessages != 0) {
            synchronized (this.matchedListMutex) {
                this.matched.addLast(messageReference);
                if (this.maximumPendingMessages > 0) {
                    int evictExpiredMessagesHighWatermark = this.messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
                    if (this.maximumPendingMessages > 0 && this.maximumPendingMessages < evictExpiredMessagesHighWatermark) {
                        evictExpiredMessagesHighWatermark = this.maximumPendingMessages;
                    }
                    if (!this.matched.isEmpty() && this.matched.size() > evictExpiredMessagesHighWatermark) {
                        removeExpiredMessages(this.matched);
                    }
                    while (!this.matched.isEmpty() && this.matched.size() > this.maximumPendingMessages) {
                        MessageReference evictMessage = this.messageEvictionStrategy.evictMessage(this.matched);
                        evictMessage.decrementReferenceCount();
                        this.discarded++;
                        if (log.isDebugEnabled()) {
                            log.debug(new StringBuffer().append("Discarding message ").append(evictMessage).toString());
                        }
                    }
                }
            }
        }
    }

    protected void removeExpiredMessages(LinkedList linkedList) throws IOException {
        Iterator it = this.matched.iterator();
        while (it.hasNext()) {
            MessageReference messageReference = (MessageReference) it.next();
            if (messageReference.isExpired()) {
                it.remove();
                this.dispatched.incrementAndGet();
                messageReference.decrementReferenceCount();
                return;
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void processMessageDispatchNotification(MessageDispatchNotification messageDispatchNotification) {
        synchronized (this.matchedListMutex) {
            Iterator it = this.matched.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MessageReference messageReference = (MessageReference) it.next();
                if (messageReference.getMessageId().equals(messageDispatchNotification.getMessageId())) {
                    it.remove();
                    this.dispatched.incrementAndGet();
                    messageReference.decrementReferenceCount();
                    break;
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void acknowledge(ConnectionContext connectionContext, MessageAck messageAck) throws Exception {
        boolean isFull = isFull();
        if (!messageAck.isStandardAck() && !messageAck.isPoisonAck()) {
            if (!messageAck.isDeliveredAck()) {
                throw new JMSException(new StringBuffer().append("Invalid acknowledgment: ").append(messageAck).toString());
            }
            this.delivered.addAndGet(messageAck.getMessageCount());
            if (!isFull || isFull()) {
                return;
            }
            dispatchMatched();
            return;
        }
        if (connectionContext.isInTransaction()) {
            this.delivered.addAndGet(messageAck.getMessageCount());
            connectionContext.getTransaction().addSynchronization(new Synchronization(this, messageAck) { // from class: org.apache.activemq.broker.region.TopicSubscription.1
                private final MessageAck val$ack;
                private final TopicSubscription this$0;

                {
                    this.this$0 = this;
                    this.val$ack = messageAck;
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    synchronized (this.this$0) {
                        if (this.this$0.singleDestination) {
                            this.this$0.destination.getDestinationStatistics().getDequeues().add(this.val$ack.getMessageCount());
                        }
                    }
                    this.this$0.dequeueCounter.addAndGet(this.val$ack.getMessageCount());
                    this.this$0.dispatched.addAndGet(-this.val$ack.getMessageCount());
                    this.this$0.delivered.set(Math.max(0L, this.this$0.delivered.get() - this.val$ack.getMessageCount()));
                }
            });
        } else {
            if (this.singleDestination) {
                this.destination.getDestinationStatistics().getDequeues().add(messageAck.getMessageCount());
            }
            this.dequeueCounter.addAndGet(messageAck.getMessageCount());
            this.dispatched.addAndGet(-messageAck.getMessageCount());
            this.delivered.set(Math.max(0L, this.delivered.get() - messageAck.getMessageCount()));
        }
        if (!isFull || isFull()) {
            return;
        }
        dispatchMatched();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getPendingQueueSize() {
        return matched();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getDispatchedQueueSize() {
        return (int) (this.dispatched.get() - this.delivered.get());
    }

    public int getMaximumPendingMessages() {
        return this.maximumPendingMessages;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getDispatchedCounter() {
        return this.dispatched.get();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getEnqueueCounter() {
        return this.enqueueCounter.get();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getDequeueCounter() {
        return this.dequeueCounter.get();
    }

    public int discarded() {
        int i;
        synchronized (this.matchedListMutex) {
            i = this.discarded;
        }
        return i;
    }

    public int matched() {
        int size;
        synchronized (this.matchedListMutex) {
            size = this.matched.size();
        }
        return size;
    }

    public void setMaximumPendingMessages(int i) {
        this.maximumPendingMessages = i;
    }

    public MessageEvictionStrategy getMessageEvictionStrategy() {
        return this.messageEvictionStrategy;
    }

    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
        this.messageEvictionStrategy = messageEvictionStrategy;
    }

    private boolean isFull() {
        return this.dispatched.get() - this.delivered.get() >= ((long) this.info.getPrefetchSize());
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isLowWaterMark() {
        return ((double) (this.dispatched.get() - this.delivered.get())) <= ((double) this.info.getPrefetchSize()) * 0.4d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isHighWaterMark() {
        return ((double) (this.dispatched.get() - this.delivered.get())) >= ((double) this.info.getPrefetchSize()) * 0.9d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void updateConsumerPrefetch(int i) {
        if (this.context == null || this.context.getConnection() == null || !this.context.getConnection().isManageable()) {
            return;
        }
        ConsumerControl consumerControl = new ConsumerControl();
        consumerControl.setConsumerId(this.info.getConsumerId());
        consumerControl.setPrefetch(i);
        this.context.getConnection().dispatchAsync(consumerControl);
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void optimizePrefetch() {
    }

    private void dispatchMatched() throws IOException {
        synchronized (this.matchedListMutex) {
            Iterator it = this.matched.iterator();
            while (it.hasNext() && !isFull()) {
                MessageReference messageReference = (MessageReference) it.next();
                it.remove();
                dispatch(messageReference);
            }
        }
    }

    private void dispatch(MessageReference messageReference) throws IOException {
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setMessage((Message) messageReference);
        messageDispatch.setConsumerId(this.info.getConsumerId());
        messageDispatch.setDestination(messageReference.getRegionDestination().getActiveMQDestination());
        this.dispatched.incrementAndGet();
        if (this.singleDestination) {
            if (this.destination == null) {
                this.destination = messageReference.getRegionDestination();
            } else if (this.destination != messageReference.getRegionDestination()) {
                this.singleDestination = false;
            }
        }
        if (this.info.isDispatchAsync()) {
            messageDispatch.setConsumer(new Runnable(this, messageReference) { // from class: org.apache.activemq.broker.region.TopicSubscription.2
                private final MessageReference val$node;
                private final TopicSubscription this$0;

                {
                    this.this$0 = this;
                    this.val$node = messageReference;
                }

                @Override // java.lang.Runnable
                public void run() {
                    this.val$node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
                    this.val$node.decrementReferenceCount();
                }
            });
            this.context.getConnection().dispatchAsync(messageDispatch);
        } else {
            this.context.getConnection().dispatchSync(messageDispatch);
            messageReference.getRegionDestination().getDestinationStatistics().getDispatched().increment();
            messageReference.decrementReferenceCount();
        }
    }

    public String toString() {
        return new StringBuffer().append("TopicSubscription: consumer=").append(this.info.getConsumerId()).append(", destinations=").append(this.destinations.size()).append(", dispatched=").append(getDispatchedQueueSize()).append(", delivered=").append(getDequeueCounter()).append(", matched=").append(matched()).append(", discarded=").append(discarded()).toString();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void destroy() {
        synchronized (this.matchedListMutex) {
            Iterator it = this.matched.iterator();
            while (it.hasNext()) {
                ((MessageReference) it.next()).decrementReferenceCount();
            }
            this.matched.clear();
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$broker$region$TopicSubscription == null) {
            cls = class$("org.apache.activemq.broker.region.TopicSubscription");
            class$org$apache$activemq$broker$region$TopicSubscription = cls;
        } else {
            cls = class$org$apache$activemq$broker$region$TopicSubscription;
        }
        log = LogFactory.getLog(cls);
    }
}
