package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-fuse-4.1.0.10.jar:org/apache/activemq/broker/region/PrefetchSubscription.class */
public abstract class PrefetchSubscription extends AbstractSubscription {
    private static final Log log;
    protected final LinkedList pending;
    protected final LinkedList dispatched;
    protected int prefetchExtension;
    boolean dispatching;
    long enqueueCounter;
    long dispatchCounter;
    long dequeueCounter;
    static Class class$org$apache$activemq$broker$region$PrefetchSubscription;

    public PrefetchSubscription(Broker broker, ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws InvalidSelectorException {
        super(broker, connectionContext, consumerInfo);
        this.pending = new LinkedList();
        this.dispatched = new LinkedList();
        this.prefetchExtension = 0;
        this.dispatching = false;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void add(MessageReference messageReference) throws Exception {
        this.enqueueCounter++;
        if (!isFull()) {
            dispatch(messageReference);
            return;
        }
        optimizePrefetch();
        synchronized (this.pending) {
            if (this.pending.isEmpty()) {
                log.debug("Prefetch limit.");
            }
            this.pending.addLast(messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void processMessageDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        synchronized (this.pending) {
            Iterator it = this.pending.iterator();
            while (it.hasNext()) {
                MessageReference messageReference = (MessageReference) it.next();
                if (messageReference.getMessageId().equals(messageDispatchNotification.getMessageId())) {
                    it.remove();
                    createMessageDispatch(messageReference, messageReference.getMessage());
                    this.dispatched.addLast(messageReference);
                }
            }
            throw new JMSException(new StringBuffer().append("Slave broker out of sync with master: Dispatched message (").append(messageDispatchNotification.getMessageId()).append(") was not in the pending list: ").append(this.pending).toString());
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void acknowledge(ConnectionContext connectionContext, MessageAck messageAck) throws Exception {
        if (messageAck.isStandardAck()) {
            int i = 0;
            boolean z = false;
            Iterator it = this.dispatched.iterator();
            while (it.hasNext()) {
                MessageReference messageReference = (MessageReference) it.next();
                MessageId messageId = messageReference.getMessageId();
                if (messageAck.getFirstMessageId() == null || messageAck.getFirstMessageId().equals(messageId)) {
                    z = true;
                }
                if (z) {
                    if (connectionContext.isInTransaction()) {
                        connectionContext.getTransaction().addSynchronization(new Synchronization(this, messageReference) { // from class: org.apache.activemq.broker.region.PrefetchSubscription.1
                            private final MessageReference val$node;
                            private final PrefetchSubscription this$0;

                            {
                                this.this$0 = this;
                                this.val$node = messageReference;
                            }

                            @Override // org.apache.activemq.transaction.Synchronization
                            public void afterCommit() throws Exception {
                                synchronized (this.this$0) {
                                    this.this$0.dequeueCounter++;
                                    this.this$0.dispatched.remove(this.val$node);
                                    this.val$node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                                    this.this$0.prefetchExtension--;
                                }
                            }

                            @Override // org.apache.activemq.transaction.Synchronization
                            public void afterRollback() throws Exception {
                                super.afterRollback();
                            }
                        });
                    } else {
                        this.dequeueCounter++;
                        messageReference.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                        it.remove();
                    }
                    i++;
                    acknowledge(connectionContext, messageAck, messageReference);
                    if (messageAck.getLastMessageId().equals(messageId)) {
                        if (connectionContext.isInTransaction()) {
                            this.prefetchExtension = Math.max(this.prefetchExtension, i + 1);
                        } else {
                            this.prefetchExtension = Math.max(0, this.prefetchExtension - (i + 1));
                        }
                        dispatchMatched();
                        return;
                    }
                }
            }
        } else if (messageAck.isDeliveredAck()) {
            int i2 = 0;
            Iterator it2 = this.dispatched.iterator();
            while (it2.hasNext()) {
                if (messageAck.getLastMessageId().equals(((MessageReference) it2.next()).getMessageId())) {
                    this.prefetchExtension = Math.max(this.prefetchExtension, i2 + 1);
                    dispatchMatched();
                    return;
                }
                i2++;
            }
        } else if (messageAck.isPoisonAck()) {
            if (messageAck.isInTransaction()) {
                throw new JMSException(new StringBuffer().append("Poison ack cannot be transacted: ").append(messageAck).toString());
            }
            int i3 = 0;
            boolean z2 = false;
            Iterator it3 = this.dispatched.iterator();
            while (it3.hasNext()) {
                MessageReference messageReference2 = (MessageReference) it3.next();
                MessageId messageId2 = messageReference2.getMessageId();
                if (messageAck.getFirstMessageId() == null || messageAck.getFirstMessageId().equals(messageId2)) {
                    z2 = true;
                }
                if (z2) {
                    sendToDLQ(connectionContext, messageReference2);
                    messageReference2.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                    it3.remove();
                    this.dequeueCounter++;
                    i3++;
                    acknowledge(connectionContext, messageAck, messageReference2);
                    if (messageAck.getLastMessageId().equals(messageId2)) {
                        this.prefetchExtension = Math.max(0, this.prefetchExtension - (i3 + 1));
                        dispatchMatched();
                        return;
                    }
                }
            }
        }
        if (isSlaveBroker()) {
            throw new JMSException(new StringBuffer().append("Slave broker out of sync with master: Acknowledgment (").append(messageAck).append(") was not in the dispatch list: ").append(this.dispatched).toString());
        }
        log.debug(new StringBuffer().append("Acknowledgment out of sync (Normally occurs when failover connection reconnects): ").append(messageAck).toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendToDLQ(ConnectionContext connectionContext, MessageReference messageReference) throws IOException, Exception {
        Message message = messageReference.getMessage();
        if (message != null) {
            BrokerSupport.resend(connectionContext, message, messageReference.getRegionDestination().getDeadLetterStrategy().getDeadLetterQueueFor(message.getDestination()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isFull() {
        return isSlaveBroker() || this.dispatched.size() - this.prefetchExtension >= this.info.getPrefetchSize();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isLowWaterMark() {
        return ((double) (this.dispatched.size() - this.prefetchExtension)) <= ((double) this.info.getPrefetchSize()) * 0.4d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isHighWaterMark() {
        return ((double) (this.dispatched.size() - this.prefetchExtension)) >= ((double) this.info.getPrefetchSize()) * 0.9d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getPendingQueueSize() {
        int size;
        synchronized (this.pending) {
            size = this.pending.size();
        }
        return size;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized int getDispatchedQueueSize() {
        return this.dispatched.size();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized long getDequeueCounter() {
        return this.dequeueCounter;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized long getDispatchedCounter() {
        return this.dispatchCounter;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized long getEnqueueCounter() {
        return this.enqueueCounter;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void optimizePrefetch() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatchMatched() throws IOException {
        if (this.dispatching) {
            return;
        }
        this.dispatching = true;
        try {
            Iterator it = this.pending.iterator();
            while (it.hasNext() && !isFull()) {
                MessageReference messageReference = (MessageReference) it.next();
                it.remove();
                dispatch(messageReference);
            }
        } finally {
            this.dispatching = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean dispatch(MessageReference messageReference) throws IOException {
        Message message = messageReference.getMessage();
        if (message == null || !canDispatch(messageReference) || isSlaveBroker()) {
            return false;
        }
        this.dispatchCounter++;
        MessageDispatch createMessageDispatch = createMessageDispatch(messageReference, message);
        this.dispatched.addLast(messageReference);
        if (this.info.isDispatchAsync()) {
            createMessageDispatch.setConsumer(new Runnable(this, messageReference, message) { // from class: org.apache.activemq.broker.region.PrefetchSubscription.2
                private final MessageReference val$node;
                private final Message val$message;
                private final PrefetchSubscription this$0;

                {
                    this.this$0 = this;
                    this.val$node = messageReference;
                    this.val$message = message;
                }

                @Override // java.lang.Runnable
                public void run() {
                    this.this$0.onDispatch(this.val$node, this.val$message);
                }
            });
            this.context.getConnection().dispatchAsync(createMessageDispatch);
            return true;
        }
        this.context.getConnection().dispatchSync(createMessageDispatch);
        onDispatch(messageReference, message);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void onDispatch(MessageReference messageReference, Message message) {
        if (messageReference.getRegionDestination() != null) {
            messageReference.getRegionDestination().getDestinationStatistics().getDispatched().increment();
            try {
                dispatchMatched();
            } catch (IOException e) {
                this.context.getConnection().serviceExceptionAsync(e);
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void updateConsumerPrefetch(int i) {
        if (this.context == null || this.context.getConnection() == null || !this.context.getConnection().isManageable()) {
            return;
        }
        ConsumerControl consumerControl = new ConsumerControl();
        consumerControl.setConsumerId(this.info.getConsumerId());
        consumerControl.setPrefetch(i);
        this.context.getConnection().dispatchAsync(consumerControl);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageDispatch createMessageDispatch(MessageReference messageReference, Message message) {
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setConsumerId(this.info.getConsumerId());
        messageDispatch.setDestination(messageReference.getRegionDestination().getActiveMQDestination());
        messageDispatch.setMessage(message);
        messageDispatch.setRedeliveryCounter(messageReference.getRedeliveryCounter());
        return messageDispatch;
    }

    protected abstract boolean canDispatch(MessageReference messageReference) throws IOException;

    protected void acknowledge(ConnectionContext connectionContext, MessageAck messageAck, MessageReference messageReference) throws IOException {
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$broker$region$PrefetchSubscription == null) {
            cls = class$("org.apache.activemq.broker.region.PrefetchSubscription");
            class$org$apache$activemq$broker$region$PrefetchSubscription = cls;
        } else {
            cls = class$org$apache$activemq$broker$region$PrefetchSubscription;
        }
        log = LogFactory.getLog(cls);
    }
}
