package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.1.0.1-fuse.jar:org/apache/activemq/broker/region/Queue.class */
public class Queue extends BaseDestination implements Task {
    protected TaskRunnerFactory taskFactory;
    protected TaskRunner taskRunner;
    protected final List<Subscription> consumers;
    protected PendingMessageCursor messages;
    private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages;
    private List<QueueMessageReference> pagedInPendingDispatch;
    private MessageGroupMap messageGroupOwners;
    private DispatchPolicy dispatchPolicy;
    private MessageGroupMapFactory messageGroupMapFactory;
    private final Object sendLock;
    private ExecutorService executor;
    protected final LinkedList<Runnable> messagesWaitingForSpace;
    private final ReentrantLock dispatchLock;
    private boolean useConsumerPriority;
    private boolean strictOrderDispatch;
    private QueueDispatchSelector dispatchSelector;
    private boolean optimizedDispatch;
    private final Runnable sendMessagesWaitingForSpaceTask;
    private final Object iteratingMutex;
    LinkedList<RecoveryDispatch> recoveries;
    protected static final Log LOG = LogFactory.getLog(Queue.class);
    private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { // from class: org.apache.activemq.broker.region.Queue.3
        @Override // java.util.Comparator
        public int compare(Subscription subscription, Subscription subscription2) {
            return subscription2.getConsumerInfo().getPriority() - subscription.getConsumerInfo().getPriority();
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.1.0.1-fuse.jar:org/apache/activemq/broker/region/Queue$RecoveryDispatch.class */
    public class RecoveryDispatch {
        public ArrayList<QueueMessageReference> messages;
        public Subscription subscription;

        RecoveryDispatch() {
        }
    }

    public Queue(BrokerService brokerService, ActiveMQDestination activeMQDestination, MessageStore messageStore, DestinationStatistics destinationStatistics, TaskRunnerFactory taskRunnerFactory) throws Exception {
        super(brokerService, messageStore, activeMQDestination, destinationStatistics);
        this.consumers = new ArrayList(50);
        this.pagedInMessages = new LinkedHashMap<>();
        this.pagedInPendingDispatch = new ArrayList(100);
        this.dispatchPolicy = new RoundRobinDispatchPolicy();
        this.messageGroupMapFactory = new MessageGroupHashBucketFactory();
        this.sendLock = new Object();
        this.messagesWaitingForSpace = new LinkedList<>();
        this.dispatchLock = new ReentrantLock();
        this.useConsumerPriority = true;
        this.strictOrderDispatch = false;
        this.optimizedDispatch = false;
        this.sendMessagesWaitingForSpaceTask = new Runnable() { // from class: org.apache.activemq.broker.region.Queue.1
            @Override // java.lang.Runnable
            public void run() {
                Queue.this.wakeup();
            }
        };
        this.iteratingMutex = new Object() { // from class: org.apache.activemq.broker.region.Queue.2
        };
        this.recoveries = new LinkedList<>();
        this.taskFactory = taskRunnerFactory;
        this.dispatchSelector = new QueueDispatchSelector(activeMQDestination);
    }

    @Override // org.apache.activemq.broker.region.BaseDestination
    public void initialize() throws Exception {
        if (this.messages == null) {
            if (this.destination.isTemporary() || this.broker == null || this.store == null) {
                this.messages = new VMPendingMessageCursor();
            } else {
                this.messages = new StoreQueueCursor(this.broker, this);
            }
        }
        if (this.messages instanceof VMPendingMessageCursor) {
            this.systemUsage = this.brokerService.getSystemUsage();
            this.memoryUsage.setParent(this.systemUsage.getMemoryUsage());
        }
        if (isOptimizedDispatch()) {
            this.taskRunner = this.taskFactory.createTaskRunner(this, "TempQueue:  " + this.destination.getPhysicalName());
        } else {
            this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: org.apache.activemq.broker.region.Queue.4
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "QueueThread:" + Queue.this.destination);
                    thread.setDaemon(true);
                    thread.setPriority(5);
                    return thread;
                }
            });
            this.taskRunner = new DeterministicTaskRunner(this.executor, this);
        }
        super.initialize();
        if (this.store != null) {
            this.messages.setSystemUsage(this.systemUsage);
            this.messages.setEnableAudit(isEnableAudit());
            this.messages.setMaxAuditDepth(getMaxAuditDepth());
            this.messages.setMaxProducersToAudit(getMaxProducersToAudit());
            this.messages.setUseCache(isUseCache());
            if (this.messages.isRecoveryRequired()) {
                this.store.recover(new MessageRecoveryListener() { // from class: org.apache.activemq.broker.region.Queue.5
                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean recoverMessage(Message message) {
                        if (Queue.this.broker.isExpired(message)) {
                            Queue.this.messageExpired(Queue.this.createConnectionContext(), message);
                            return true;
                        }
                        if (!hasSpace()) {
                            return false;
                        }
                        message.setRegionDestination(Queue.this);
                        synchronized (Queue.this.messages) {
                            try {
                                Queue.this.messages.addMessageLast(message);
                            } catch (Exception e) {
                                Queue.LOG.fatal("Failed to add message to cursor", e);
                            }
                        }
                        Queue.this.destinationStatistics.getMessages().increment();
                        return true;
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean recoverMessageReference(MessageId messageId) throws Exception {
                        throw new RuntimeException("Should not be called.");
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean hasSpace() {
                        return true;
                    }
                });
            } else {
                this.destinationStatistics.getMessages().setCount(this.store.getMessageCount());
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        this.dispatchLock.lock();
        try {
            subscription.add(connectionContext, this);
            this.destinationStatistics.getConsumers().increment();
            synchronized (this.consumers) {
                addToConsumerList(subscription);
                if (subscription.getConsumerInfo().isExclusive()) {
                    Subscription exclusiveConsumer = this.dispatchSelector.getExclusiveConsumer();
                    if (exclusiveConsumer == null) {
                        exclusiveConsumer = subscription;
                    } else if (subscription.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
                        exclusiveConsumer = subscription;
                    }
                    this.dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                }
            }
            doPageIn(false);
            synchronized (this.pagedInMessages) {
                RecoveryDispatch recoveryDispatch = new RecoveryDispatch();
                recoveryDispatch.messages = new ArrayList<>(this.pagedInMessages.values());
                recoveryDispatch.subscription = subscription;
                this.recoveries.addLast(recoveryDispatch);
            }
            if (subscription instanceof QueueBrowserSubscription) {
                ((QueueBrowserSubscription) subscription).incrementQueueRef();
            }
            if (!this.optimizedDispatch) {
                wakeup();
            }
            if (this.optimizedDispatch) {
                wakeup();
            }
        } finally {
            this.dispatchLock.unlock();
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        this.destinationStatistics.getConsumers().decrement();
        this.dispatchLock.lock();
        try {
            synchronized (this.consumers) {
                removeFromConsumerList(subscription);
                if (subscription.getConsumerInfo().isExclusive() && this.dispatchSelector.getExclusiveConsumer() == subscription) {
                    Subscription subscription2 = null;
                    for (Subscription subscription3 : this.consumers) {
                        if (subscription3.getConsumerInfo().isExclusive() && (subscription2 == null || subscription3.getConsumerInfo().getPriority() > subscription2.getConsumerInfo().getPriority())) {
                            subscription2 = subscription3;
                        }
                    }
                    this.dispatchSelector.setExclusiveConsumer(subscription2);
                }
                getMessageGroupOwners().removeConsumer(subscription.getConsumerInfo().getConsumerId());
                ArrayList arrayList = new ArrayList();
                Iterator<MessageReference> it = subscription.remove(connectionContext, this).iterator();
                while (it.hasNext()) {
                    QueueMessageReference queueMessageReference = (QueueMessageReference) it.next();
                    queueMessageReference.incrementRedeliveryCounter();
                    if (queueMessageReference.getLockOwner() == subscription) {
                        queueMessageReference.unlock();
                        queueMessageReference.incrementRedeliveryCounter();
                    }
                    arrayList.add(queueMessageReference);
                }
                if (!arrayList.isEmpty() && !this.consumers.isEmpty()) {
                    doDispatch(arrayList);
                }
            }
            if (this.consumers.isEmpty()) {
                this.messages.gc();
            }
            if (!this.optimizedDispatch) {
                wakeup();
            }
            if (this.optimizedDispatch) {
                wakeup();
            }
        } finally {
            this.dispatchLock.unlock();
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void send(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws Exception {
        final ConnectionContext connectionContext = producerBrokerExchange.getConnectionContext();
        message.setRegionDestination(this);
        final ProducerInfo info = producerBrokerExchange.getProducerState().getInfo();
        final boolean z = (message.isResponseRequired() || info.getWindowSize() <= 0 || connectionContext.isInRecoveryMode()) ? false : true;
        if (message.isExpired()) {
            this.broker.getRoot().messageExpired(connectionContext, message);
            if (z) {
                connectionContext.getConnection().dispatchAsync(new ProducerAck(info.getProducerId(), message.getSize()));
                return;
            }
            return;
        }
        if (this.memoryUsage.isFull()) {
            isFull(connectionContext, this.memoryUsage);
            fastProducer(connectionContext, info);
            if (isProducerFlowControl() && connectionContext.isProducerFlowControl()) {
                if (this.systemUsage.isSendFailIfNoSpace()) {
                    throw new ResourceAllocationException("SystemUsage memory limit reached");
                }
                if (info.getWindowSize() > 0 || message.isResponseRequired()) {
                    synchronized (this.messagesWaitingForSpace) {
                        this.messagesWaitingForSpace.add(new Runnable() { // from class: org.apache.activemq.broker.region.Queue.6
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    if (message.isExpired()) {
                                        Queue.this.broker.messageExpired(connectionContext, message);
                                    } else {
                                        Queue.this.doMessageSend(producerBrokerExchange, message);
                                    }
                                    if (z) {
                                        connectionContext.getConnection().dispatchAsync(new ProducerAck(info.getProducerId(), message.getSize()));
                                    } else {
                                        Response response = new Response();
                                        response.setCorrelationId(message.getCommandId());
                                        connectionContext.getConnection().dispatchAsync(response);
                                    }
                                } catch (Exception e) {
                                    if (z || connectionContext.isInRecoveryMode()) {
                                        return;
                                    }
                                    ExceptionResponse exceptionResponse = new ExceptionResponse(e);
                                    exceptionResponse.setCorrelationId(message.getCommandId());
                                    connectionContext.getConnection().dispatchAsync(exceptionResponse);
                                }
                            }
                        });
                        if (!this.memoryUsage.notifyCallbackWhenNotFull(this.sendMessagesWaitingForSpaceTask)) {
                            this.sendMessagesWaitingForSpaceTask.run();
                        }
                        connectionContext.setDontSendReponse(true);
                    }
                    return;
                }
                while (!this.memoryUsage.waitForSpace(1000L)) {
                    if (connectionContext.getStopping().get()) {
                        throw new IOException("Connection closed, send aborted.");
                    }
                }
                if (message.isExpired()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Expired message: " + message);
                    }
                    this.broker.getRoot().messageExpired(connectionContext, message);
                    return;
                }
            }
        }
        doMessageSend(producerBrokerExchange, message);
        if (z) {
            connectionContext.getConnection().dispatchAsync(new ProducerAck(info.getProducerId(), message.getSize()));
        }
    }

    void doMessageSend(ProducerBrokerExchange producerBrokerExchange, final Message message) throws IOException, Exception {
        final ConnectionContext connectionContext = producerBrokerExchange.getConnectionContext();
        synchronized (this.sendLock) {
            if (this.store != null && message.isPersistent()) {
                if (isProducerFlowControl() && connectionContext.isProducerFlowControl() && this.systemUsage.isSendFailIfNoSpace() && this.systemUsage.getStoreUsage().isFull()) {
                    throw new ResourceAllocationException("Usage Manager Store is Full");
                }
                while (!this.systemUsage.getStoreUsage().waitForSpace(1000L)) {
                    if (connectionContext.getStopping().get()) {
                        throw new IOException("Connection closed, send aborted.");
                    }
                }
                message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                this.store.addMessage(connectionContext, message);
            }
        }
        if (!connectionContext.isInTransaction()) {
            sendMessage(connectionContext, message);
        } else {
            message.incrementReferenceCount();
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.broker.region.Queue.7
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    try {
                        if (Queue.this.broker.isExpired(message)) {
                            Queue.this.broker.messageExpired(connectionContext, message);
                            message.decrementReferenceCount();
                        } else {
                            Queue.this.sendMessage(connectionContext, message);
                            message.decrementReferenceCount();
                        }
                    } catch (Throwable th) {
                        message.decrementReferenceCount();
                        throw th;
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    message.decrementReferenceCount();
                }
            });
        }
    }

    @Override // org.apache.activemq.broker.region.BaseDestination, org.apache.activemq.broker.region.Destination
    public void dispose(ConnectionContext connectionContext) throws IOException {
        super.dispose(connectionContext);
        if (this.store != null) {
            this.store.removeAllMessages(connectionContext);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void gc() {
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void acknowledge(ConnectionContext connectionContext, Subscription subscription, MessageAck messageAck, MessageReference messageReference) throws IOException {
        messageConsumed(connectionContext, messageReference);
        if (this.store == null || !messageReference.isPersistent()) {
            return;
        }
        if (messageAck.getMessageCount() > 0) {
            MessageAck messageAck2 = new MessageAck();
            messageAck.copy(messageAck2);
            messageAck = messageAck2;
            messageAck.setFirstMessageId(messageReference.getMessageId());
            messageAck.setLastMessageId(messageReference.getMessageId());
            messageAck.setMessageCount(1);
        }
        this.store.removeMessage(connectionContext, messageAck);
    }

    Message loadMessage(MessageId messageId) throws IOException {
        Message message = this.store.getMessage(messageId);
        if (message != null) {
            message.setRegionDestination(this);
        }
        return message;
    }

    public String toString() {
        int size;
        synchronized (this.messages) {
            size = this.messages.size();
        }
        return "Queue: destination=" + this.destination.getPhysicalName() + ", subscriptions=" + this.consumers.size() + ", memory=" + this.memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups=" + this.messageGroupOwners;
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        if (this.memoryUsage != null) {
            this.memoryUsage.start();
        }
        this.messages.start();
        doPageIn(false);
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        if (this.taskRunner != null) {
            this.taskRunner.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        if (this.messages != null) {
            this.messages.stop();
        }
        if (this.memoryUsage != null) {
            this.memoryUsage.stop();
        }
    }

    @Override // org.apache.activemq.broker.region.BaseDestination, org.apache.activemq.broker.region.Destination
    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    public MessageGroupMap getMessageGroupOwners() {
        if (this.messageGroupOwners == null) {
            this.messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
        }
        return this.messageGroupOwners;
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public MessageGroupMapFactory getMessageGroupMapFactory() {
        return this.messageGroupMapFactory;
    }

    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
        this.messageGroupMapFactory = messageGroupMapFactory;
    }

    public PendingMessageCursor getMessages() {
        return this.messages;
    }

    public void setMessages(PendingMessageCursor pendingMessageCursor) {
        this.messages = pendingMessageCursor;
    }

    public boolean isUseConsumerPriority() {
        return this.useConsumerPriority;
    }

    public void setUseConsumerPriority(boolean z) {
        this.useConsumerPriority = z;
    }

    public boolean isStrictOrderDispatch() {
        return this.strictOrderDispatch;
    }

    public void setStrictOrderDispatch(boolean z) {
        this.strictOrderDispatch = z;
    }

    public boolean isOptimizedDispatch() {
        return this.optimizedDispatch;
    }

    public void setOptimizedDispatch(boolean z) {
        this.optimizedDispatch = z;
    }

    private QueueMessageReference createMessageReference(Message message) {
        return new IndirectMessageReference(message);
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.activemq.broker.region.Destination
    public Message[] browse() {
        ArrayList arrayList = new ArrayList();
        try {
            doPageIn(true);
        } catch (Exception e) {
            LOG.error("caught an exception browsing " + this, e);
        }
        synchronized (this.pagedInMessages) {
            for (QueueMessageReference queueMessageReference : this.pagedInMessages.values()) {
                queueMessageReference.incrementReferenceCount();
                try {
                    try {
                        Message message = queueMessageReference.getMessage();
                        if (message != null) {
                            arrayList.add(message);
                        }
                        queueMessageReference.decrementReferenceCount();
                    } catch (Throwable th) {
                        queueMessageReference.decrementReferenceCount();
                        throw th;
                    }
                } catch (IOException e2) {
                    LOG.error("caught an exception browsing " + this, e2);
                    queueMessageReference.decrementReferenceCount();
                }
            }
        }
        synchronized (this.messages) {
            try {
                this.messages.reset();
                while (this.messages.hasNext()) {
                    try {
                        MessageReference next = this.messages.next();
                        next.incrementReferenceCount();
                        try {
                            Message message2 = next.getMessage();
                            if (message2 != null) {
                                arrayList.add(message2);
                            }
                            next.decrementReferenceCount();
                        } catch (Throwable th2) {
                            next.decrementReferenceCount();
                            throw th2;
                            break;
                        }
                    } catch (IOException e3) {
                        LOG.error("caught an exception brwsing " + this, e3);
                    }
                }
                this.messages.release();
            } catch (Throwable th3) {
                this.messages.release();
                throw th3;
            }
        }
        return (Message[]) arrayList.toArray(new Message[arrayList.size()]);
    }

    /* JADX WARN: Finally extract failed */
    public Message getMessage(String str) {
        MessageReference next;
        synchronized (this.messages) {
            try {
                this.messages.reset();
                while (true) {
                    if (!this.messages.hasNext()) {
                        break;
                    }
                    try {
                        next = this.messages.next();
                    } catch (IOException e) {
                        LOG.error("got an exception retrieving message " + str);
                    }
                    if (str.equals(next.getMessageId().toString())) {
                        next.incrementReferenceCount();
                        try {
                            Message message = next.getMessage();
                            if (message != null) {
                                this.messages.release();
                                return message;
                            }
                            next.decrementReferenceCount();
                        } finally {
                            next.decrementReferenceCount();
                        }
                    }
                }
                this.messages.release();
                return null;
            } catch (Throwable th) {
                this.messages.release();
                throw th;
            }
        }
    }

    public void purge() throws Exception {
        ArrayList arrayList;
        ConnectionContext createConnectionContext = createConnectionContext();
        while (true) {
            pageInMessages();
            synchronized (this.pagedInMessages) {
                arrayList = new ArrayList(this.pagedInMessages.values());
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    removeMessage(createConnectionContext, (IndirectMessageReference) ((QueueMessageReference) ((MessageReference) it.next())));
                } catch (IOException e) {
                }
            }
            if (this.pagedInMessages.isEmpty() && this.destinationStatistics.getMessages().getCount() <= 0) {
                gc();
                return;
            }
        }
    }

    public boolean removeMessage(String str) throws Exception {
        return removeMatchingMessages(createMessageIdFilter(str), 1) > 0;
    }

    public int removeMatchingMessages(String str) throws Exception {
        return removeMatchingMessages(str, -1);
    }

    public int removeMatchingMessages(String str, int i) throws Exception {
        return removeMatchingMessages(createSelectorFilter(str), i);
    }

    public int removeMatchingMessages(MessageReferenceFilter messageReferenceFilter, int i) throws Exception {
        int i2 = 0;
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        ConnectionContext createConnectionContext = createConnectionContext();
        do {
            pageInMessages();
            synchronized (this.pagedInMessages) {
                copyOnWriteArraySet.addAll(this.pagedInMessages.values());
            }
            Iterator it = new ArrayList(copyOnWriteArraySet).iterator();
            while (it.hasNext()) {
                IndirectMessageReference indirectMessageReference = (IndirectMessageReference) ((MessageReference) it.next());
                if (messageReferenceFilter.evaluate(createConnectionContext, indirectMessageReference)) {
                    removeMessage(createConnectionContext, indirectMessageReference);
                    copyOnWriteArraySet.remove(indirectMessageReference);
                    i2++;
                    if (i2 >= i && i > 0) {
                        return i2;
                    }
                }
            }
        } while (copyOnWriteArraySet.size() < this.destinationStatistics.getMessages().getCount());
        return i2;
    }

    public boolean copyMessageTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return copyMatchingMessages(connectionContext, createMessageIdFilter(str), activeMQDestination, 1) > 0;
    }

    public int copyMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return copyMatchingMessagesTo(connectionContext, str, activeMQDestination, -1);
    }

    public int copyMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination, int i) throws Exception {
        return copyMatchingMessages(connectionContext, createSelectorFilter(str), activeMQDestination, i);
    }

    public int copyMatchingMessages(ConnectionContext connectionContext, MessageReferenceFilter messageReferenceFilter, ActiveMQDestination activeMQDestination, int i) throws Exception {
        int i2 = 0;
        int i3 = 0;
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        do {
            int maxPageSize = getMaxPageSize();
            setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
            pageInMessages();
            setMaxPageSize(maxPageSize);
            synchronized (this.pagedInMessages) {
                copyOnWriteArraySet.addAll(this.pagedInMessages.values());
            }
            Iterator it = new ArrayList(copyOnWriteArraySet).iterator();
            while (it.hasNext()) {
                IndirectMessageReference indirectMessageReference = (IndirectMessageReference) ((MessageReference) it.next());
                if (messageReferenceFilter.evaluate(connectionContext, indirectMessageReference)) {
                    indirectMessageReference.incrementReferenceCount();
                    try {
                        BrokerSupport.resend(connectionContext, indirectMessageReference.getMessage(), activeMQDestination);
                        i2++;
                        if (i2 >= i && i > 0) {
                            return i2;
                        }
                        indirectMessageReference.decrementReferenceCount();
                    } finally {
                        indirectMessageReference.decrementReferenceCount();
                    }
                }
                i3++;
            }
        } while (i3 < this.destinationStatistics.getMessages().getCount());
        return i2;
    }

    public boolean moveMessageTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return moveMatchingMessagesTo(connectionContext, createMessageIdFilter(str), activeMQDestination, 1) > 0;
    }

    public int moveMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return moveMatchingMessagesTo(connectionContext, str, activeMQDestination, -1);
    }

    public int moveMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination, int i) throws Exception {
        return moveMatchingMessagesTo(connectionContext, createSelectorFilter(str), activeMQDestination, i);
    }

    public int moveMatchingMessagesTo(ConnectionContext connectionContext, MessageReferenceFilter messageReferenceFilter, ActiveMQDestination activeMQDestination, int i) throws Exception {
        int i2 = 0;
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        do {
            pageInMessages();
            synchronized (this.pagedInMessages) {
                copyOnWriteArraySet.addAll(this.pagedInMessages.values());
            }
            Iterator it = new ArrayList(copyOnWriteArraySet).iterator();
            while (it.hasNext()) {
                IndirectMessageReference indirectMessageReference = (IndirectMessageReference) ((MessageReference) it.next());
                if (messageReferenceFilter.evaluate(connectionContext, indirectMessageReference)) {
                    indirectMessageReference.incrementReferenceCount();
                    try {
                        BrokerSupport.resend(connectionContext, indirectMessageReference.getMessage(), activeMQDestination);
                        removeMessage(connectionContext, indirectMessageReference);
                        copyOnWriteArraySet.remove(indirectMessageReference);
                        i2++;
                        if (i2 >= i && i > 0) {
                            return i2;
                        }
                        indirectMessageReference.decrementReferenceCount();
                    } finally {
                        indirectMessageReference.decrementReferenceCount();
                    }
                }
            }
        } while (copyOnWriteArraySet.size() < this.destinationStatistics.getMessages().getCount());
        return i2;
    }

    RecoveryDispatch getNextRecoveryDispatch() {
        synchronized (this.pagedInMessages) {
            if (this.recoveries.isEmpty()) {
                return null;
            }
            return this.recoveries.removeFirst();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isRecoveryDispatchEmpty() {
        boolean isEmpty;
        synchronized (this.pagedInMessages) {
            isEmpty = this.recoveries.isEmpty();
        }
        return isEmpty;
    }

    /* JADX WARN: Code restructure failed: missing block: B:100:0x0134, code lost:
    
        throw r12;
     */
    /* JADX WARN: Code restructure failed: missing block: B:102:0x00f4, code lost:
    
        r0 = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x00da, code lost:
    
        r0 = r4.messages;
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x00e3, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x00ed, code lost:
    
        if (r4.messages.isEmpty() != false) goto L39;
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x00f0, code lost:
    
        r0 = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x00f5, code lost:
    
        r7 = r0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x00f8, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:59:0x0104, code lost:
    
        r4.dispatchLock.lock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x0115, code lost:
    
        if (r4.pagedInPendingDispatch.isEmpty() != false) goto L52;
     */
    /* JADX WARN: Code restructure failed: missing block: B:63:0x0118, code lost:
    
        r1 = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x011d, code lost:
    
        r0 = r7 | r1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x0120, code lost:
    
        r4.dispatchLock.unlock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:67:0x0136, code lost:
    
        if (r0 == false) goto L64;
     */
    /* JADX WARN: Code restructure failed: missing block: B:91:0x0139, code lost:
    
        pageInMessages(false);
     */
    /* JADX WARN: Code restructure failed: missing block: B:93:0x0141, code lost:
    
        r8 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:94:0x0143, code lost:
    
        org.apache.activemq.broker.region.Queue.LOG.error("Failed to page in more queue messages ", r8);
     */
    /* JADX WARN: Code restructure failed: missing block: B:96:0x011c, code lost:
    
        r1 = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:97:0x0129, code lost:
    
        r12 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:99:0x012c, code lost:
    
        r4.dispatchLock.unlock();
     */
    @Override // org.apache.activemq.thread.Task
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean iterate() {
        /*
            Method dump skipped, instructions count: 411
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.broker.region.Queue.iterate():boolean");
    }

    protected MessageReferenceFilter createMessageIdFilter(final String str) {
        return new MessageReferenceFilter() { // from class: org.apache.activemq.broker.region.Queue.8
            @Override // org.apache.activemq.broker.region.MessageReferenceFilter
            public boolean evaluate(ConnectionContext connectionContext, MessageReference messageReference) {
                return str.equals(messageReference.getMessageId().toString());
            }
        };
    }

    protected MessageReferenceFilter createSelectorFilter(String str) throws InvalidSelectorException {
        final BooleanExpression parse = new SelectorParser().parse(str);
        return new MessageReferenceFilter() { // from class: org.apache.activemq.broker.region.Queue.9
            @Override // org.apache.activemq.broker.region.MessageReferenceFilter
            public boolean evaluate(ConnectionContext connectionContext, MessageReference messageReference) throws JMSException {
                MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
                messageEvaluationContext.setMessageReference(messageReference);
                if (messageEvaluationContext.getDestination() == null) {
                    messageEvaluationContext.setDestination(Queue.this.getActiveMQDestination());
                }
                return parse.matches(messageEvaluationContext);
            }
        };
    }

    protected void removeMessage(ConnectionContext connectionContext, QueueMessageReference queueMessageReference) throws IOException {
        removeMessage(connectionContext, null, queueMessageReference);
    }

    protected void removeMessage(ConnectionContext connectionContext, Subscription subscription, QueueMessageReference queueMessageReference) throws IOException {
        MessageAck messageAck = new MessageAck();
        messageAck.setAckType((byte) 2);
        messageAck.setDestination(this.destination);
        messageAck.setMessageID(queueMessageReference.getMessageId());
        removeMessage(connectionContext, subscription, queueMessageReference, messageAck);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeMessage(ConnectionContext connectionContext, Subscription subscription, final QueueMessageReference queueMessageReference, MessageAck messageAck) throws IOException {
        queueMessageReference.setAcked(true);
        acknowledge(connectionContext, subscription, messageAck, queueMessageReference);
        if (messageAck.isInTransaction()) {
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.broker.region.Queue.10
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    queueMessageReference.drop();
                    Queue.this.destinationStatistics.getMessages().decrement();
                    synchronized (Queue.this.pagedInMessages) {
                        Queue.this.pagedInMessages.remove(queueMessageReference.getMessageId());
                    }
                    Queue.this.wakeup();
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    queueMessageReference.setAcked(false);
                }
            });
            return;
        }
        queueMessageReference.drop();
        this.destinationStatistics.getMessages().decrement();
        synchronized (this.pagedInMessages) {
            this.pagedInMessages.remove(queueMessageReference.getMessageId());
        }
        wakeup();
    }

    public void messageExpired(ConnectionContext connectionContext, MessageReference messageReference) {
        messageExpired(connectionContext, null, messageReference);
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void messageExpired(ConnectionContext connectionContext, Subscription subscription, MessageReference messageReference) {
        this.broker.messageExpired(connectionContext, messageReference);
        this.destinationStatistics.getDequeues().increment();
        this.destinationStatistics.getInflight().decrement();
        try {
            removeMessage(connectionContext, subscription, (QueueMessageReference) messageReference);
        } catch (IOException e) {
            LOG.error("Failed to remove expired Message from the store ", e);
        }
        synchronized (this.pagedInMessages) {
            this.pagedInMessages.remove(messageReference.getMessageId());
        }
        wakeup();
    }

    protected ConnectionContext createConnectionContext() {
        ConnectionContext connectionContext = new ConnectionContext(new NonCachedMessageEvaluationContext());
        connectionContext.setBroker(this.broker);
        connectionContext.getMessageEvaluationContext().setDestination(getActiveMQDestination());
        return connectionContext;
    }

    final void sendMessage(ConnectionContext connectionContext, Message message) throws Exception {
        if (!message.isPersistent() && this.messages.getSystemUsage() != null) {
            this.messages.getSystemUsage().getTempUsage().waitForSpace();
        }
        synchronized (this.messages) {
            this.messages.addMessageLast(message);
        }
        this.destinationStatistics.getEnqueues().increment();
        this.destinationStatistics.getMessages().increment();
        messageDelivered(connectionContext, message);
        wakeup();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void wakeup() {
        if (this.optimizedDispatch || isSlave()) {
            iterate();
            return;
        }
        try {
            this.taskRunner.wakeup();
        } catch (InterruptedException e) {
            LOG.warn("Task Runner failed to wakeup ", e);
        }
    }

    private boolean isSlave() {
        return this.broker.getBrokerService().isSlave();
    }

    /* JADX WARN: Finally extract failed */
    private List<QueueMessageReference> doPageIn(boolean z) throws Exception {
        ArrayList<QueueMessageReference> arrayList = null;
        this.dispatchLock.lock();
        try {
            int min = Math.min((getMaxPageSize() + ((int) this.destinationStatistics.getInflight().getCount())) - this.pagedInMessages.size(), getMaxPageSize());
            if (isLazyDispatch() && !z) {
                min = Math.min(getConsumerMessageCountBeforeFull(), min);
            }
            if ((z || !this.consumers.isEmpty()) && min > 0) {
                this.messages.setMaxBatchSize(min);
                int i = 0;
                arrayList = new ArrayList(min);
                synchronized (this.messages) {
                    try {
                        this.messages.reset();
                        while (this.messages.hasNext() && i < min) {
                            MessageReference next = this.messages.next();
                            next.incrementReferenceCount();
                            this.messages.remove();
                            QueueMessageReference createMessageReference = createMessageReference(next.getMessage());
                            if (this.broker.isExpired(next)) {
                                messageExpired(createConnectionContext(), createMessageReference);
                            } else {
                                arrayList.add(createMessageReference);
                                i++;
                            }
                        }
                        this.messages.release();
                    } catch (Throwable th) {
                        this.messages.release();
                        throw th;
                    }
                }
                synchronized (this.pagedInMessages) {
                    for (QueueMessageReference queueMessageReference : arrayList) {
                        this.pagedInMessages.put(queueMessageReference.getMessageId(), queueMessageReference);
                    }
                }
            }
            return arrayList;
        } finally {
            this.dispatchLock.unlock();
        }
    }

    private void doDispatch(List<QueueMessageReference> list) throws Exception {
        this.dispatchLock.lock();
        try {
            if (!this.pagedInPendingDispatch.isEmpty()) {
                this.pagedInPendingDispatch = doActualDispatch(this.pagedInPendingDispatch);
            }
            if (list != null && !list.isEmpty()) {
                if (this.pagedInPendingDispatch.isEmpty()) {
                    this.pagedInPendingDispatch.addAll(doActualDispatch(list));
                } else {
                    this.pagedInPendingDispatch.addAll(list);
                }
            }
        } finally {
            this.dispatchLock.unlock();
        }
    }

    private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
        ArrayList arrayList;
        ArrayList arrayList2 = new ArrayList(list.size());
        HashSet hashSet = new HashSet(this.consumers.size());
        synchronized (this.consumers) {
            arrayList = new ArrayList(this.consumers);
        }
        for (QueueMessageReference queueMessageReference : list) {
            Subscription subscription = null;
            int i = 0;
            Iterator it = arrayList.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Subscription subscription2 = (Subscription) it.next();
                if (this.dispatchSelector.canSelect(subscription2, queueMessageReference)) {
                    if (!hashSet.contains(subscription2)) {
                        if (!subscription2.isFull()) {
                            subscription2.add(queueMessageReference);
                            subscription = subscription2;
                            break;
                        }
                        hashSet.add(subscription2);
                    }
                    i++;
                }
            }
            if (subscription == null && i > 0) {
                arrayList2.add(queueMessageReference);
            }
            if (subscription != null && !this.strictOrderDispatch && arrayList.size() > 1 && !this.dispatchSelector.isExclusiveConsumer(subscription)) {
                synchronized (this.consumers) {
                    if (removeFromConsumerList(subscription)) {
                        addToConsumerList(subscription);
                        arrayList = new ArrayList(this.consumers);
                    }
                }
            }
        }
        return arrayList2;
    }

    private void pageInMessages() throws Exception {
        pageInMessages(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pageInMessages(boolean z) throws Exception {
        doDispatch(doPageIn(z));
    }

    private void addToConsumerList(Subscription subscription) {
        if (!this.useConsumerPriority) {
            this.consumers.add(subscription);
        } else {
            this.consumers.add(subscription);
            Collections.sort(this.consumers, orderedCompare);
        }
    }

    private boolean removeFromConsumerList(Subscription subscription) {
        return this.consumers.remove(subscription);
    }

    private int getConsumerMessageCountBeforeFull() throws Exception {
        int i = 0;
        boolean z = false;
        synchronized (this.consumers) {
            for (Subscription subscription : this.consumers) {
                z |= subscription.getPrefetchSize() == 0;
                i += subscription.countBeforeFull();
            }
        }
        if (i == 0 && z) {
            i = 1;
        }
        return i;
    }
}
