package org.apache.activemq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSConsumerStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-5.4.0-fusesource-SNAPSHOT.jar:org/apache/activemq/ActiveMQMessageConsumer.class */
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
    private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
    protected final Scheduler scheduler;
    protected final ActiveMQSession session;
    protected final ConsumerInfo info;
    protected final MessageDispatchChannel unconsumedMessages;
    private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
    private int deliveredCounter;
    private int additionalWindowSize;
    private long redeliveryDelay;
    private int ackCounter;
    private int dispatchedCount;
    private final JMSConsumerStatsImpl stats;
    private final String selector;
    private boolean synchronizationRegistered;
    private MessageAvailableListener availableListener;
    private RedeliveryPolicy redeliveryPolicy;
    private boolean optimizeAcknowledge;
    private ExecutorService executorService;
    private MessageTransformer transformer;
    private boolean clearDispatchList;
    boolean inProgressClearRequiredFlag;
    private MessageAck pendingAck;
    private long lastDeliveredSequenceId;
    private IOException failureError;
    private long failoverRedeliveryWaitPeriod;
    private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<>();
    private final AtomicReference<MessageListener> messageListener = new AtomicReference<>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
    private long optimizeAckTimestamp = System.currentTimeMillis();
    private final long optimizeAckTimeout = 300;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:activemq-core-5.4.0-fusesource-SNAPSHOT.jar:org/apache/activemq/ActiveMQMessageConsumer$PreviouslyDeliveredMap.class */
    public class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
        final TransactionId transactionId;

        public PreviouslyDeliveredMap(TransactionId transactionId) {
            this.transactionId = transactionId;
        }
    }

    public ActiveMQMessageConsumer(ActiveMQSession activeMQSession, ConsumerId consumerId, ActiveMQDestination activeMQDestination, String str, String str2, int i, int i2, boolean z, boolean z2, boolean z3, MessageListener messageListener) throws JMSException {
        this.failoverRedeliveryWaitPeriod = 0L;
        if (activeMQDestination == null) {
            throw new InvalidDestinationException("Don't understand null destinations");
        }
        if (activeMQDestination.getPhysicalName() == null) {
            throw new InvalidDestinationException("The destination object was not given a physical name.");
        }
        if (activeMQDestination.isTemporary()) {
            String physicalName = activeMQDestination.getPhysicalName();
            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + activeMQDestination);
            }
            if (physicalName.indexOf(activeMQSession.connection.getConnectionInfo().getConnectionId().getValue()) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }
            if (activeMQSession.connection.isDeleted(activeMQDestination)) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
            if (i < 0) {
                throw new JMSException("Cannot have a prefetch size less than zero");
            }
        }
        if (activeMQSession.connection.isMessagePrioritySupported()) {
            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
        } else {
            this.unconsumedMessages = new FifoMessageDispatchChannel();
        }
        this.session = activeMQSession;
        this.scheduler = activeMQSession.getScheduler();
        this.redeliveryPolicy = activeMQSession.connection.getRedeliveryPolicy();
        setTransformer(activeMQSession.getTransformer());
        this.info = new ConsumerInfo(consumerId);
        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
        this.info.setSubscriptionName(str);
        this.info.setPrefetchSize(i);
        this.info.setCurrentPrefetchSize(i);
        this.info.setMaximumPendingMessageLimit(i2);
        this.info.setNoLocal(z);
        this.info.setDispatchAsync(z3);
        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
        this.info.setSelector(null);
        if (activeMQDestination.getOptions() != null) {
            IntrospectionSupport.setProperties(this.info, new HashMap(activeMQDestination.getOptions()), "consumer.");
        }
        this.info.setDestination(activeMQDestination);
        this.info.setBrowser(z2);
        if (str2 != null && str2.trim().length() != 0) {
            SelectorParser.parse(str2);
            this.info.setSelector(str2);
            this.selector = str2;
        } else if (this.info.getSelector() != null) {
            SelectorParser.parse(this.info.getSelector());
            this.selector = this.info.getSelector();
        } else {
            this.selector = null;
        }
        this.stats = new JMSConsumerStatsImpl(activeMQSession.getSessionStats(), activeMQDestination);
        this.optimizeAcknowledge = activeMQSession.connection.isOptimizeAcknowledge() && activeMQSession.isAutoAcknowledge() && !this.info.isBrowser();
        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
        this.failoverRedeliveryWaitPeriod = activeMQSession.connection.getConsumerFailoverRedeliveryWaitPeriod();
        if (messageListener != null) {
            setMessageListener(messageListener);
        }
        try {
            this.session.addConsumer(this);
            this.session.syncSendPacket(this.info);
            if (activeMQSession.connection.isStarted()) {
                start();
            }
        } catch (JMSException e) {
            this.session.removeConsumer(this);
            throw e;
        }
    }

    private boolean isAutoAcknowledgeEach() {
        return this.session.isAutoAcknowledge() || (this.session.isDupsOkAcknowledge() && getDestination().isQueue());
    }

    private boolean isAutoAcknowledgeBatch() {
        return this.session.isDupsOkAcknowledge() && !getDestination().isQueue();
    }

    @Override // org.apache.activemq.management.StatsCapable
    public StatsImpl getStats() {
        return this.stats;
    }

    public JMSConsumerStatsImpl getConsumerStats() {
        return this.stats;
    }

    public RedeliveryPolicy getRedeliveryPolicy() {
        return this.redeliveryPolicy;
    }

    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
        this.redeliveryPolicy = redeliveryPolicy;
    }

    public MessageTransformer getTransformer() {
        return this.transformer;
    }

    public void setTransformer(MessageTransformer messageTransformer) {
        this.transformer = messageTransformer;
    }

    public ConsumerId getConsumerId() {
        return this.info.getConsumerId();
    }

    public String getConsumerName() {
        return this.info.getSubscriptionName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isNoLocal() {
        return this.info.isNoLocal();
    }

    protected boolean isBrowser() {
        return this.info.isBrowser();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActiveMQDestination getDestination() {
        return this.info.getDestination();
    }

    public int getPrefetchNumber() {
        return this.info.getPrefetchSize();
    }

    public boolean isDurableSubscriber() {
        return this.info.getSubscriptionName() != null && this.info.getDestination().isTopic();
    }

    public String getMessageSelector() throws JMSException {
        checkClosed();
        return this.selector;
    }

    public MessageListener getMessageListener() throws JMSException {
        checkClosed();
        return this.messageListener.get();
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        checkClosed();
        if (this.info.getPrefetchSize() == 0) {
            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
        }
        if (messageListener == null) {
            this.messageListener.set(null);
            return;
        }
        boolean isRunning = this.session.isRunning();
        if (isRunning) {
            this.session.stop();
        }
        this.messageListener.set(messageListener);
        this.session.redispatch(this, this.unconsumedMessages);
        if (isRunning) {
            this.session.start();
        }
    }

    @Override // org.apache.activemq.MessageAvailableConsumer
    public MessageAvailableListener getAvailableListener() {
        return this.availableListener;
    }

    @Override // org.apache.activemq.MessageAvailableConsumer
    public void setAvailableListener(MessageAvailableListener messageAvailableListener) {
        this.availableListener = messageAvailableListener;
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x0042, code lost:
    
        if (r5.failureError == null) goto L16;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x004c, code lost:
    
        throw org.apache.activemq.util.JMSExceptionSupport.create((java.lang.Exception) r5.failureError);
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x004d, code lost:
    
        return null;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.activemq.command.MessageDispatch dequeue(long r6) throws javax.jms.JMSException {
        /*
            Method dump skipped, instructions count: 242
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.ActiveMQMessageConsumer.dequeue(long):org.apache.activemq.command.MessageDispatch");
    }

    public javax.jms.Message receive() throws JMSException {
        checkClosed();
        checkMessageListener();
        sendPullCommand(0L);
        MessageDispatch dequeue = dequeue(-1L);
        if (dequeue == null) {
            return null;
        }
        beforeMessageIsConsumed(dequeue);
        afterMessageIsConsumed(dequeue, false);
        return createActiveMQMessage(dequeue);
    }

    private ActiveMQMessage createActiveMQMessage(final MessageDispatch messageDispatch) throws JMSException {
        javax.jms.Message consumerTransform;
        ActiveMQMessage activeMQMessage = (ActiveMQMessage) messageDispatch.getMessage().copy();
        if (activeMQMessage.getDataStructureType() == 29) {
            ((ActiveMQBlobMessage) activeMQMessage).setBlobDownloader(new BlobDownloader(this.session.getBlobTransferPolicy()));
        }
        if (this.transformer != null && (consumerTransform = this.transformer.consumerTransform(this.session, this, activeMQMessage)) != null) {
            activeMQMessage = ActiveMQMessageTransformation.transformMessage(consumerTransform, this.session.connection);
        }
        if (this.session.isClientAcknowledge()) {
            activeMQMessage.setAcknowledgeCallback(new Callback() { // from class: org.apache.activemq.ActiveMQMessageConsumer.1
                @Override // org.apache.activemq.util.Callback
                public void execute() throws Exception {
                    ActiveMQMessageConsumer.this.session.checkClosed();
                    ActiveMQMessageConsumer.this.session.acknowledge();
                }
            });
        } else if (this.session.isIndividualAcknowledge()) {
            activeMQMessage.setAcknowledgeCallback(new Callback() { // from class: org.apache.activemq.ActiveMQMessageConsumer.2
                @Override // org.apache.activemq.util.Callback
                public void execute() throws Exception {
                    ActiveMQMessageConsumer.this.session.checkClosed();
                    ActiveMQMessageConsumer.this.acknowledge(messageDispatch);
                }
            });
        }
        return activeMQMessage;
    }

    public javax.jms.Message receive(long j) throws JMSException {
        checkClosed();
        checkMessageListener();
        if (j == 0) {
            return receive();
        }
        sendPullCommand(j);
        if (j <= 0) {
            return null;
        }
        MessageDispatch dequeue = this.info.getPrefetchSize() == 0 ? dequeue(-1L) : dequeue(j);
        if (dequeue == null) {
            return null;
        }
        beforeMessageIsConsumed(dequeue);
        afterMessageIsConsumed(dequeue, false);
        return createActiveMQMessage(dequeue);
    }

    public javax.jms.Message receiveNoWait() throws JMSException {
        checkClosed();
        checkMessageListener();
        sendPullCommand(-1L);
        MessageDispatch dequeue = this.info.getPrefetchSize() == 0 ? dequeue(-1L) : dequeue(0L);
        if (dequeue == null) {
            return null;
        }
        beforeMessageIsConsumed(dequeue);
        afterMessageIsConsumed(dequeue, false);
        return createActiveMQMessage(dequeue);
    }

    public void close() throws JMSException {
        if (this.unconsumedMessages.isClosed()) {
            return;
        }
        if (this.session.getTransactionContext().isInTransaction()) {
            this.session.getTransactionContext().addSynchronization(new Synchronization() { // from class: org.apache.activemq.ActiveMQMessageConsumer.3
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    ActiveMQMessageConsumer.this.doClose();
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    ActiveMQMessageConsumer.this.doClose();
                }
            });
        } else {
            doClose();
        }
    }

    void doClose() throws JMSException {
        dispose();
        RemoveInfo createRemoveCommand = this.info.createRemoveCommand();
        if (LOG.isDebugEnabled()) {
            LOG.debug("remove: " + getConsumerId() + ", lastDeliveredSequenceId:" + this.lastDeliveredSequenceId);
        }
        createRemoveCommand.setLastDeliveredSequenceId(this.lastDeliveredSequenceId);
        this.session.asyncSendPacket(createRemoveCommand);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void inProgressClearRequired() {
        this.inProgressClearRequiredFlag = true;
        this.clearDispatchList = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearMessagesInProgress() {
        if (this.inProgressClearRequiredFlag) {
            synchronized (this.unconsumedMessages.getMutex()) {
                if (this.inProgressClearRequiredFlag) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(getConsumerId() + " clearing dispatched list (" + this.unconsumedMessages.size() + ") on transport interrupt");
                    }
                    List<MessageDispatch> removeAll = this.unconsumedMessages.removeAll();
                    if (!this.info.isBrowser()) {
                        Iterator<MessageDispatch> it = removeAll.iterator();
                        while (it.hasNext()) {
                            this.session.connection.rollbackDuplicate(this, it.next().getMessage());
                        }
                    }
                    this.session.connection.transportInterruptionProcessingComplete();
                    this.inProgressClearRequiredFlag = false;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deliverAcks() {
        MessageAck messageAck = null;
        if (this.deliveryingAcknowledgements.compareAndSet(false, true)) {
            if (isAutoAcknowledgeEach()) {
                synchronized (this.deliveredMessages) {
                    messageAck = makeAckForAllDeliveredMessages((byte) 2);
                    if (messageAck != null) {
                        this.deliveredMessages.clear();
                        this.ackCounter = 0;
                    } else {
                        messageAck = this.pendingAck;
                        this.pendingAck = null;
                    }
                }
            } else if (this.pendingAck != null && this.pendingAck.isStandardAck()) {
                messageAck = this.pendingAck;
                this.pendingAck = null;
            }
            if (messageAck == null) {
                this.deliveryingAcknowledgements.set(false);
                return;
            }
            final MessageAck messageAck2 = messageAck;
            if (this.executorService == null) {
                this.executorService = Executors.newSingleThreadExecutor();
            }
            this.executorService.submit(new Runnable() { // from class: org.apache.activemq.ActiveMQMessageConsumer.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            ActiveMQMessageConsumer.this.session.sendAck(messageAck2, true);
                            ActiveMQMessageConsumer.this.deliveryingAcknowledgements.set(false);
                        } catch (JMSException e) {
                            ActiveMQMessageConsumer.LOG.error(ActiveMQMessageConsumer.this.getConsumerId() + " failed to delivered acknowledgements", e);
                            ActiveMQMessageConsumer.this.deliveryingAcknowledgements.set(false);
                        }
                    } catch (Throwable th) {
                        ActiveMQMessageConsumer.this.deliveryingAcknowledgements.set(false);
                        throw th;
                    }
                }
            });
        }
    }

    public void dispose() throws JMSException {
        ArrayList arrayList;
        if (this.unconsumedMessages.isClosed()) {
            return;
        }
        if (!this.session.getTransacted()) {
            deliverAcks();
            if (isAutoAcknowledgeBatch()) {
                acknowledge();
            }
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.session.isClientAcknowledge() && !this.info.isBrowser()) {
            synchronized (this.deliveredMessages) {
                arrayList = new ArrayList(this.deliveredMessages);
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.session.connection.rollbackDuplicate(this, ((MessageDispatch) it.next()).getMessage());
            }
            arrayList.clear();
        }
        if (!this.session.isTransacted()) {
            synchronized (this.deliveredMessages) {
                this.deliveredMessages.clear();
            }
        }
        this.unconsumedMessages.close();
        this.session.removeConsumer(this);
        List<MessageDispatch> removeAll = this.unconsumedMessages.removeAll();
        if (this.info.isBrowser()) {
            return;
        }
        Iterator<MessageDispatch> it2 = removeAll.iterator();
        while (it2.hasNext()) {
            this.session.connection.rollbackDuplicate(this, it2.next().getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkClosed() throws IllegalStateException {
        if (this.unconsumedMessages.isClosed()) {
            throw new IllegalStateException("The Consumer is closed");
        }
    }

    protected void sendPullCommand(long j) throws JMSException {
        clearDispatchList();
        if (this.info.getPrefetchSize() == 0 && this.unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(this.info);
            messagePull.setTimeout(j);
            this.session.asyncSendPacket(messagePull);
        }
    }

    protected void checkMessageListener() throws JMSException {
        this.session.checkMessageListener();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setOptimizeAcknowledge(boolean z) {
        if (this.optimizeAcknowledge && !z) {
            deliverAcks();
        }
        this.optimizeAcknowledge = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setPrefetchSize(int i) {
        deliverAcks();
        this.info.setCurrentPrefetchSize(i);
    }

    private void beforeMessageIsConsumed(MessageDispatch messageDispatch) throws JMSException {
        messageDispatch.setDeliverySequenceId(this.session.getNextDeliveryId());
        this.lastDeliveredSequenceId = messageDispatch.getMessage().getMessageId().getBrokerSequenceId();
        if (isAutoAcknowledgeBatch()) {
            return;
        }
        synchronized (this.deliveredMessages) {
            this.deliveredMessages.addFirst(messageDispatch);
        }
        if (this.session.getTransacted()) {
            ackLater(messageDispatch, (byte) 0);
        }
    }

    private void afterMessageIsConsumed(MessageDispatch messageDispatch, boolean z) throws JMSException {
        boolean contains;
        if (this.unconsumedMessages.isClosed()) {
            return;
        }
        if (z) {
            synchronized (this.deliveredMessages) {
                this.deliveredMessages.remove(messageDispatch);
            }
            this.stats.getExpiredMessageCount().increment();
            ackLater(messageDispatch, (byte) 0);
            return;
        }
        this.stats.onMessage();
        if (this.session.getTransacted()) {
            return;
        }
        if (!isAutoAcknowledgeEach()) {
            if (isAutoAcknowledgeBatch()) {
                ackLater(messageDispatch, (byte) 2);
                return;
            }
            if (!this.session.isClientAcknowledge() && !this.session.isIndividualAcknowledge()) {
                throw new IllegalStateException("Invalid session state.");
            }
            synchronized (this.deliveredMessages) {
                contains = this.deliveredMessages.contains(messageDispatch);
            }
            if (contains) {
                ackLater(messageDispatch, (byte) 0);
                return;
            }
            return;
        }
        if (this.deliveryingAcknowledgements.compareAndSet(false, true)) {
            synchronized (this.deliveredMessages) {
                if (!this.deliveredMessages.isEmpty()) {
                    if (this.optimizeAcknowledge) {
                        this.ackCounter++;
                        if (this.ackCounter >= this.info.getPrefetchSize() * 0.65d || System.currentTimeMillis() >= this.optimizeAckTimestamp + 300) {
                            MessageAck makeAckForAllDeliveredMessages = makeAckForAllDeliveredMessages((byte) 2);
                            if (makeAckForAllDeliveredMessages != null) {
                                this.deliveredMessages.clear();
                                this.ackCounter = 0;
                                this.session.sendAck(makeAckForAllDeliveredMessages);
                                this.optimizeAckTimestamp = System.currentTimeMillis();
                            }
                        }
                    } else {
                        MessageAck makeAckForAllDeliveredMessages2 = makeAckForAllDeliveredMessages((byte) 2);
                        if (makeAckForAllDeliveredMessages2 != null) {
                            this.deliveredMessages.clear();
                            this.session.sendAck(makeAckForAllDeliveredMessages2);
                        }
                    }
                }
            }
            this.deliveryingAcknowledgements.set(false);
        }
    }

    private MessageAck makeAckForAllDeliveredMessages(byte b) {
        synchronized (this.deliveredMessages) {
            if (this.deliveredMessages.isEmpty()) {
                return null;
            }
            MessageAck messageAck = new MessageAck(this.deliveredMessages.getFirst(), b, this.deliveredMessages.size());
            messageAck.setFirstMessageId(this.deliveredMessages.getLast().getMessage().getMessageId());
            return messageAck;
        }
    }

    private void ackLater(MessageDispatch messageDispatch, byte b) throws JMSException {
        if (this.session.getTransacted()) {
            this.session.doStartTransaction();
            if (!this.synchronizationRegistered) {
                this.synchronizationRegistered = true;
                this.session.getTransactionContext().addSynchronization(new Synchronization() { // from class: org.apache.activemq.ActiveMQMessageConsumer.5
                    @Override // org.apache.activemq.transaction.Synchronization
                    public void beforeEnd() throws Exception {
                        ActiveMQMessageConsumer.this.acknowledge();
                        ActiveMQMessageConsumer.this.synchronizationRegistered = false;
                    }

                    @Override // org.apache.activemq.transaction.Synchronization
                    public void afterCommit() throws Exception {
                        ActiveMQMessageConsumer.this.commit();
                        ActiveMQMessageConsumer.this.synchronizationRegistered = false;
                    }

                    @Override // org.apache.activemq.transaction.Synchronization
                    public void afterRollback() throws Exception {
                        ActiveMQMessageConsumer.this.rollback();
                        ActiveMQMessageConsumer.this.synchronizationRegistered = false;
                    }
                });
            }
        }
        this.deliveredCounter++;
        MessageAck messageAck = this.pendingAck;
        this.pendingAck = new MessageAck(messageDispatch, b, this.deliveredCounter);
        this.pendingAck.setTransactionId(this.session.getTransactionContext().getTransactionId());
        if (messageAck == null) {
            this.pendingAck.setFirstMessageId(this.pendingAck.getLastMessageId());
        } else if (messageAck.getAckType() == this.pendingAck.getAckType()) {
            this.pendingAck.setFirstMessageId(messageAck.getFirstMessageId());
        } else if (!messageAck.isDeliveredAck()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending old pending ack " + messageAck + ", new pending: " + this.pendingAck);
            }
            this.session.sendAck(messageAck);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("dropping old pending ack " + messageAck + ", new pending: " + this.pendingAck);
        }
        if (0.5d * this.info.getPrefetchSize() <= this.deliveredCounter - this.additionalWindowSize) {
            this.session.sendAck(this.pendingAck);
            this.pendingAck = null;
            this.deliveredCounter = 0;
            this.additionalWindowSize = 0;
        }
    }

    public void acknowledge() throws JMSException {
        clearDispatchList();
        waitForRedeliveries();
        synchronized (this.deliveredMessages) {
            MessageAck makeAckForAllDeliveredMessages = makeAckForAllDeliveredMessages((byte) 2);
            if (makeAckForAllDeliveredMessages == null) {
                return;
            }
            if (this.session.getTransacted()) {
                rollbackOnFailedRecoveryRedelivery();
                this.session.doStartTransaction();
                makeAckForAllDeliveredMessages.setTransactionId(this.session.getTransactionContext().getTransactionId());
            }
            this.session.sendAck(makeAckForAllDeliveredMessages);
            this.pendingAck = null;
            this.deliveredCounter = Math.max(0, this.deliveredCounter - this.deliveredMessages.size());
            this.additionalWindowSize = Math.max(0, this.additionalWindowSize - this.deliveredMessages.size());
            if (!this.session.getTransacted()) {
                this.deliveredMessages.clear();
            }
        }
    }

    private void waitForRedeliveries() {
        if (this.failoverRedeliveryWaitPeriod <= 0 || this.previouslyDeliveredMessages == null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis() + this.failoverRedeliveryWaitPeriod;
        do {
            int i = 0;
            synchronized (this.deliveredMessages) {
                if (this.previouslyDeliveredMessages != null) {
                    Iterator<Map.Entry<MessageId, Boolean>> it = this.previouslyDeliveredMessages.entrySet().iterator();
                    while (it.hasNext()) {
                        if (!it.next().getValue().booleanValue()) {
                            i++;
                        }
                    }
                }
            }
            if (i > 0) {
                LOG.info("waiting for redelivery of " + i + " in transaction: " + this.previouslyDeliveredMessages.transactionId + ", to consumer :" + getConsumerId());
                try {
                    Thread.sleep(Math.max(500L, this.failoverRedeliveryWaitPeriod / 4));
                } catch (InterruptedException e) {
                    return;
                }
            }
            if (i <= 0) {
                return;
            }
        } while (currentTimeMillis < System.currentTimeMillis());
    }

    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
        if (this.previouslyDeliveredMessages != null) {
            int i = 0;
            for (Map.Entry<MessageId, Boolean> entry : this.previouslyDeliveredMessages.entrySet()) {
                if (!entry.getValue().booleanValue()) {
                    i++;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("previously delivered message has not been replayed in transaction: " + this.previouslyDeliveredMessages.transactionId + " , messageId: " + entry.getKey());
                    }
                }
            }
            if (i > 0) {
                String str = "rolling back transaction (" + this.previouslyDeliveredMessages.transactionId + ") post failover recovery. " + i + " previously delivered message(s) not replayed to consumer: " + getConsumerId();
                LOG.warn(str);
                throw new TransactionRolledBackException(str);
            }
        }
    }

    void acknowledge(MessageDispatch messageDispatch) throws JMSException {
        this.session.sendAck(new MessageAck(messageDispatch, (byte) 4, 1));
        synchronized (this.deliveredMessages) {
            this.deliveredMessages.remove(messageDispatch);
        }
    }

    public void commit() throws JMSException {
        synchronized (this.deliveredMessages) {
            this.deliveredMessages.clear();
            clearPreviouslyDelivered();
        }
        this.redeliveryDelay = 0L;
    }

    public void rollback() throws JMSException {
        synchronized (this.unconsumedMessages.getMutex()) {
            if (this.optimizeAcknowledge && !this.info.isBrowser()) {
                synchronized (this.deliveredMessages) {
                    for (int i = 0; i < this.deliveredMessages.size() && i < this.ackCounter; i++) {
                        this.session.connection.rollbackDuplicate(this, this.deliveredMessages.removeLast().getMessage());
                    }
                }
            }
            synchronized (this.deliveredMessages) {
                rollbackPreviouslyDeliveredAndNotRedelivered();
                if (this.deliveredMessages.isEmpty()) {
                    return;
                }
                MessageDispatch first = this.deliveredMessages.getFirst();
                int redeliveryCounter = first.getMessage().getRedeliveryCounter();
                if (redeliveryCounter > 0) {
                    this.redeliveryDelay = this.redeliveryPolicy.getNextRedeliveryDelay(this.redeliveryDelay);
                } else {
                    this.redeliveryDelay = this.redeliveryPolicy.getInitialRedeliveryDelay();
                }
                MessageId messageId = this.deliveredMessages.getLast().getMessage().getMessageId();
                Iterator<MessageDispatch> it = this.deliveredMessages.iterator();
                while (it.hasNext()) {
                    MessageDispatch next = it.next();
                    next.getMessage().onMessageRolledBack();
                    this.session.connection.rollbackDuplicate(this, next.getMessage());
                }
                if (this.redeliveryPolicy.getMaximumRedeliveries() == -1 || first.getMessage().getRedeliveryCounter() <= this.redeliveryPolicy.getMaximumRedeliveries()) {
                    if (redeliveryCounter > 0) {
                        MessageAck messageAck = new MessageAck(first, (byte) 3, this.deliveredMessages.size());
                        messageAck.setFirstMessageId(messageId);
                        this.session.sendAck(messageAck, true);
                    }
                    this.unconsumedMessages.stop();
                    Iterator<MessageDispatch> it2 = this.deliveredMessages.iterator();
                    while (it2.hasNext()) {
                        this.unconsumedMessages.enqueueFirst(it2.next());
                    }
                    if (this.redeliveryDelay <= 0 || this.unconsumedMessages.isClosed()) {
                        start();
                    } else {
                        this.scheduler.executeAfterDelay(new Runnable() { // from class: org.apache.activemq.ActiveMQMessageConsumer.6
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    if (ActiveMQMessageConsumer.this.started.get()) {
                                        ActiveMQMessageConsumer.this.start();
                                    }
                                } catch (JMSException e) {
                                    ActiveMQMessageConsumer.this.session.connection.onAsyncException(e);
                                }
                            }
                        }, this.redeliveryDelay);
                    }
                } else {
                    MessageAck messageAck2 = new MessageAck(first, (byte) 1, this.deliveredMessages.size());
                    messageAck2.setFirstMessageId(messageId);
                    this.session.sendAck(messageAck2, true);
                    this.additionalWindowSize = Math.max(0, this.additionalWindowSize - this.deliveredMessages.size());
                    this.redeliveryDelay = 0L;
                }
                this.deliveredCounter -= this.deliveredMessages.size();
                this.deliveredMessages.clear();
                if (this.messageListener.get() != null) {
                    this.session.redispatch(this, this.unconsumedMessages);
                }
            }
        }
    }

    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
        if (this.previouslyDeliveredMessages != null) {
            for (Map.Entry<MessageId, Boolean> entry : this.previouslyDeliveredMessages.entrySet()) {
                if (!entry.getValue().booleanValue()) {
                    removeFromDeliveredMessages(entry.getKey());
                }
            }
            clearPreviouslyDelivered();
        }
    }

    private void removeFromDeliveredMessages(MessageId messageId) {
        Iterator<MessageDispatch> it = this.deliveredMessages.iterator();
        while (it.hasNext()) {
            MessageDispatch next = it.next();
            if (messageId.equals(next.getMessage().getMessageId())) {
                this.session.connection.rollbackDuplicate(this, next.getMessage());
                it.remove();
                return;
            }
        }
    }

    private void clearPreviouslyDelivered() {
        if (this.previouslyDeliveredMessages != null) {
            this.previouslyDeliveredMessages.clear();
            this.previouslyDeliveredMessages = null;
        }
    }

    @Override // org.apache.activemq.ActiveMQDispatcher
    public void dispatch(MessageDispatch messageDispatch) {
        MessageListener messageListener = this.messageListener.get();
        try {
            clearMessagesInProgress();
            clearDispatchList();
            synchronized (this.unconsumedMessages.getMutex()) {
                if (!this.unconsumedMessages.isClosed()) {
                    if (this.info.isBrowser() || !this.session.connection.isDuplicate(this, messageDispatch.getMessage())) {
                        if (messageListener == null || !this.unconsumedMessages.isRunning()) {
                            if (!this.unconsumedMessages.isRunning()) {
                                this.session.connection.rollbackDuplicate(this, messageDispatch.getMessage());
                            }
                            this.unconsumedMessages.enqueue(messageDispatch);
                            if (this.availableListener != null) {
                                this.availableListener.onMessageAvailable(this);
                            }
                        } else {
                            ActiveMQMessage createActiveMQMessage = createActiveMQMessage(messageDispatch);
                            beforeMessageIsConsumed(messageDispatch);
                            try {
                                boolean isExpired = createActiveMQMessage.isExpired();
                                if (!isExpired) {
                                    messageListener.onMessage(createActiveMQMessage);
                                }
                                afterMessageIsConsumed(messageDispatch, isExpired);
                            } catch (RuntimeException e) {
                                LOG.error(getConsumerId() + " Exception while processing message: " + messageDispatch.getMessage().getMessageId(), e);
                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || this.session.isIndividualAcknowledge()) {
                                    rollback();
                                } else {
                                    afterMessageIsConsumed(messageDispatch, false);
                                }
                            }
                        }
                    } else if (this.session.isTransacted()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + messageDispatch.getMessage());
                        }
                        boolean z = false;
                        synchronized (this.deliveredMessages) {
                            if (this.previouslyDeliveredMessages != null) {
                                this.previouslyDeliveredMessages.put(messageDispatch.getMessage().getMessageId(), true);
                            } else {
                                z = true;
                            }
                        }
                        if (z) {
                            LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another consumer on this connection, failoverRedeliveryWaitPeriod=" + this.failoverRedeliveryWaitPeriod + ". Message: " + messageDispatch);
                            MessageAck messageAck = new MessageAck(messageDispatch, (byte) 1, 1);
                            messageAck.setFirstMessageId(messageDispatch.getMessage().getMessageId());
                            this.session.sendAck(messageAck);
                        } else {
                            ackLater(messageDispatch, (byte) 0);
                        }
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + messageDispatch.getMessage());
                        }
                        this.session.sendAck(new MessageAck(messageDispatch, (byte) 2, 1));
                    }
                }
            }
            int i = this.dispatchedCount + 1;
            this.dispatchedCount = i;
            if (i % 1000 == 0) {
                this.dispatchedCount = 0;
                Thread.yield();
            }
        } catch (Exception e2) {
            this.session.connection.onClientInternalException(e2);
        }
    }

    private void clearDispatchList() {
        if (this.clearDispatchList) {
            synchronized (this.deliveredMessages) {
                if (this.clearDispatchList) {
                    if (!this.deliveredMessages.isEmpty()) {
                        if (this.session.isTransacted()) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + this.deliveredMessages.size() + ") on transport interrupt");
                            }
                            if (this.previouslyDeliveredMessages == null) {
                                this.previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(this.session.getTransactionContext().getTransactionId());
                            }
                            Iterator<MessageDispatch> it = this.deliveredMessages.iterator();
                            while (it.hasNext()) {
                                this.previouslyDeliveredMessages.put(it.next().getMessage().getMessageId(), false);
                            }
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(getConsumerId() + " clearing delivered list (" + this.deliveredMessages.size() + ") on transport interrupt");
                            }
                            this.deliveredMessages.clear();
                            this.pendingAck = null;
                        }
                    }
                    this.clearDispatchList = false;
                }
            }
        }
    }

    public int getMessageSize() {
        return this.unconsumedMessages.size();
    }

    public void start() throws JMSException {
        if (this.unconsumedMessages.isClosed()) {
            return;
        }
        this.started.set(true);
        this.unconsumedMessages.start();
        this.session.executor.wakeup();
    }

    public void stop() {
        this.started.set(false);
        this.unconsumedMessages.stop();
    }

    public String toString() {
        return "ActiveMQMessageConsumer { value=" + this.info.getConsumerId() + ", started=" + this.started.get() + " }";
    }

    public boolean iterate() {
        MessageDispatch dequeueNoWait;
        if (this.messageListener.get() == null || (dequeueNoWait = this.unconsumedMessages.dequeueNoWait()) == null) {
            return false;
        }
        dispatch(dequeueNoWait);
        return true;
    }

    public boolean isInUse(ActiveMQTempDestination activeMQTempDestination) {
        return this.info.getDestination().equals(activeMQTempDestination);
    }

    public long getLastDeliveredSequenceId() {
        return this.lastDeliveredSequenceId;
    }

    public IOException getFailureError() {
        return this.failureError;
    }

    public void setFailureError(IOException iOException) {
        this.failureError = iOException;
    }
}
