/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.queue;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.JMException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueMBean;
import org.apache.qpid.server.queue.ExchangeBinding;
import org.apache.qpid.server.queue.ExchangeBindings;
import org.apache.qpid.server.queue.FailedDequeueException;
import org.apache.qpid.server.queue.MessageCleanupException;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.queue.QueueEntryList;
import org.apache.qpid.server.queue.QueueEntryListFactory;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class SimpleAMQQueue
implements AMQQueue,
Subscription.StateListener {
    private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
    private final AMQShortString _name;
    private final AMQShortString _owner;
    private final boolean _durable;
    private final boolean _autoDelete;
    private final VirtualHost _virtualHost;
    private final ExchangeBindings _bindings = new ExchangeBindings(this);
    private final AtomicBoolean _deleted = new AtomicBoolean(false);
    private final List<AMQQueue.Task> _deleteTaskList = new CopyOnWriteArrayList<AMQQueue.Task>();
    private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
    private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
    private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(this._subscriptionList.getHead());
    private volatile Subscription _exclusiveSubscriber;
    protected final QueueEntryList _entries;
    private final AMQQueueMBean _managedObject;
    private final Executor _asyncDelivery;
    private final AtomicLong _totalMessagesReceived = new AtomicLong();
    public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
    public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
    public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
    public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
    public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
    private static final int MAX_ASYNC_DELIVERIES = 10;
    private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
    private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
    private AtomicReference _asynchronousRunner = new AtomicReference<Object>(null);
    private AtomicInteger _deliveredMessages = new AtomicInteger();
    private AtomicBoolean _stopped = new AtomicBoolean(false);

    protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException {
        this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
    }

    protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, QueueEntryListFactory entryListFactory) throws AMQException {
        if (name == null) {
            throw new IllegalArgumentException("Queue name must not be null");
        }
        if (virtualHost == null) {
            throw new IllegalArgumentException("Virtual Host must not be null");
        }
        this._name = name;
        this._durable = durable;
        this._owner = owner;
        this._autoDelete = autoDelete;
        this._virtualHost = virtualHost;
        this._entries = entryListFactory.createQueueEntryList(this);
        this._asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
        try {
            this._managedObject = new AMQQueueMBean(this);
            this._managedObject.register();
        }
        catch (JMException e) {
            throw new AMQException("AMQQueue MBean creation has failed ", (Throwable)e);
        }
        this.resetNotifications();
    }

    public void resetNotifications() {
        this.setMaximumMessageAge(this._maximumMessageAge);
        this.setMaximumMessageCount(this._maximumMessageCount);
        this.setMaximumMessageSize(this._maximumMessageSize);
        this.setMaximumQueueDepth(this._maximumQueueDepth);
    }

    @Override
    public AMQShortString getName() {
        return this._name;
    }

    @Override
    public boolean isDurable() {
        return this._durable;
    }

    @Override
    public boolean isAutoDelete() {
        return this._autoDelete;
    }

    @Override
    public AMQShortString getOwner() {
        return this._owner;
    }

    @Override
    public VirtualHost getVirtualHost() {
        return this._virtualHost;
    }

    @Override
    public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException {
        exchange.registerQueue(routingKey, this, arguments);
        if (this.isDurable() && exchange.isDurable()) {
            this._virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
        }
        this._bindings.addBinding(routingKey, arguments, exchange);
    }

    @Override
    public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException {
        boolean removed;
        exchange.deregisterQueue(routingKey, this, arguments);
        if (this.isDurable() && exchange.isDurable()) {
            this._virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
        }
        if (!(removed = this._bindings.remove(routingKey, arguments, exchange))) {
            _logger.error((Object)"Mismatch between queue bindings and exchange record of bindings");
        }
    }

    @Override
    public List<ExchangeBinding> getExchangeBindings() {
        return new ArrayList<ExchangeBinding>(this._bindings.getExchangeBindings());
    }

    @Override
    public synchronized void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException {
        if (this.isExclusiveSubscriber()) {
            throw new AMQQueue.ExistingExclusiveSubscription();
        }
        if (exclusive) {
            if (this.getConsumerCount() != 0) {
                throw new AMQQueue.ExistingSubscriptionPreventsExclusive();
            }
            this._exclusiveSubscriber = subscription;
        }
        this._activeSubscriberCount.incrementAndGet();
        subscription.setStateListener(this);
        subscription.setLastSeenEntry(null, this._entries.getHead());
        if (!this.isDeleted()) {
            subscription.setQueue(this);
            this._subscriptionList.add(subscription);
            if (this.isDeleted()) {
                subscription.queueDeleted(this);
            }
        }
        this.deliverAsync(subscription);
    }

    @Override
    public synchronized void unregisterSubscription(Subscription subscription) throws AMQException {
        if (subscription == null) {
            throw new NullPointerException("subscription argument is null");
        }
        boolean removed = this._subscriptionList.remove(subscription);
        if (removed) {
            QueueEntry lastSeen;
            subscription.close();
            this.setExclusiveSubscriber(null);
            while ((lastSeen = subscription.getLastSeenEntry()) != null) {
                subscription.setLastSeenEntry(lastSeen, null);
            }
            if (this._autoDelete && this.getConsumerCount() == 0) {
                if (_logger.isInfoEnabled()) {
                    _logger.info((Object)("Auto-deleteing queue:" + this));
                }
                this.delete();
                subscription.queueDeleted(this);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException {
        this.incrementQueueCount();
        this.incrementQueueSize(message);
        this._totalMessagesReceived.incrementAndGet();
        exclusiveSub = this._exclusiveSubscriber;
        if (exclusiveSub != null) {
            exclusiveSub.getSendLock();
            try {
                entry = this._entries.add(message);
                this.deliverToSubscription(exclusiveSub, entry);
                if (entry.isAcquired() || entry.isDeleted()) ** GOTO lbl38
                this.deliverToSubscription(exclusiveSub, entry);
            }
            finally {
                exclusiveSub.releaseSendLock();
            }
        } else {
            entry = this._entries.add(message);
            node = this._lastSubscriptionNode.get();
            nextNode = node.getNext();
            if (nextNode == null) {
                nextNode = this._subscriptionList.getHead().getNext();
            }
            while (nextNode != null && !this._lastSubscriptionNode.compareAndSet(node, nextNode)) {
                node = this._lastSubscriptionNode.get();
                nextNode = node.getNext();
                if (nextNode != null) continue;
                nextNode = this._subscriptionList.getHead().getNext();
            }
            loops = 2;
            while (!entry.isAcquired() && !entry.isDeleted() && loops != 0) {
                if (nextNode == null) {
                    --loops;
                    nextNode = this._subscriptionList.getHead();
                } else {
                    sub = nextNode.getSubscription();
                    this.deliverToSubscription(sub, entry);
                }
                nextNode = nextNode.getNext();
            }
        }
lbl38:
        // 3 sources

        if (entry.immediateAndNotDelivered()) {
            this.dequeue(storeContext, entry);
            entry.dispose(storeContext);
        } else if (!entry.isAcquired() && !entry.isDeleted()) {
            this.checkSubscriptionsNotAheadOfDelivery(entry);
            this.deliverAsync();
        }
        this._managedObject.checkForNotification(entry.getMessage());
        return entry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverToSubscription(Subscription sub, QueueEntry entry) throws AMQException {
        sub.getSendLock();
        try {
            if (this.subscriptionReadyAndHasInterest(sub, entry) && !sub.isSuspended() && !sub.wouldSuspend(entry)) {
                if (!sub.isBrowser() && !entry.acquire(sub)) {
                    sub.restoreCredit(entry);
                } else {
                    this.deliverMessage(sub, entry);
                }
            }
        }
        finally {
            sub.releaseSendLock();
        }
    }

    protected void checkSubscriptionsNotAheadOfDelivery(QueueEntry entry) {
    }

    private void incrementQueueSize(AMQMessage message) {
        this.getAtomicQueueSize().addAndGet(message.getSize());
    }

    private void incrementQueueCount() {
        this.getAtomicQueueCount().incrementAndGet();
    }

    private void deliverMessage(Subscription sub, QueueEntry entry) throws AMQException {
        this._deliveredMessages.incrementAndGet();
        sub.send(entry);
    }

    private boolean subscriptionReadyAndHasInterest(Subscription sub, QueueEntry entry) {
        QueueEntry node = sub.getLastSeenEntry();
        while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node))) {
            QueueEntry newNode = this._entries.next(node);
            if (newNode != null) {
                sub.setLastSeenEntry(node, newNode);
                node = sub.getLastSeenEntry();
                continue;
            }
            node = null;
            break;
        }
        return node == entry;
    }

    private void updateLastSeenEntry(Subscription sub, QueueEntry entry) {
        QueueEntry node = sub.getLastSeenEntry();
        if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) {
            do {
                if (!sub.setLastSeenEntry(node, entry)) continue;
                return;
            } while ((node = sub.getLastSeenEntry()) != null && entry.compareTo(node) < 0);
        }
    }

    @Override
    public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException {
        SubscriptionList.SubscriptionNodeIterator subscriberIter = this._subscriptionList.iterator();
        while (subscriberIter.advance()) {
            Subscription sub = subscriberIter.getNode().getSubscription();
            if (sub.isBrowser()) continue;
            this.updateLastSeenEntry(sub, entry);
        }
        this.deliverAsync();
    }

    @Override
    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException {
        this.decrementQueueCount();
        this.decrementQueueSize(entry);
        if (entry.acquiredBySubscription()) {
            this._deliveredMessages.decrementAndGet();
        }
        try {
            AMQMessage msg = entry.getMessage();
            if (msg.isPersistent()) {
                this._virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
            }
        }
        catch (MessageCleanupException e) {
            _logger.error((Object)e, (Throwable)((Object)e));
        }
        catch (AMQException e) {
            throw new FailedDequeueException(this._name.toString(), e);
        }
    }

    private void decrementQueueSize(QueueEntry entry) {
        this.getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
    }

    void decrementQueueCount() {
        this.getAtomicQueueCount().decrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException {
        subscription.getSendLock();
        try {
            if (!subscription.isClosed()) {
                this.deliverMessage(subscription, entry);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            subscription.releaseSendLock();
        }
    }

    @Override
    public int getConsumerCount() {
        return this._subscriptionList.size();
    }

    @Override
    public int getActiveConsumerCount() {
        return this._activeSubscriberCount.get();
    }

    @Override
    public boolean isUnused() {
        return this.getConsumerCount() == 0;
    }

    @Override
    public boolean isEmpty() {
        return this.getMessageCount() == 0;
    }

    @Override
    public int getMessageCount() {
        return this.getAtomicQueueCount().get();
    }

    @Override
    public long getQueueDepth() {
        return this.getAtomicQueueSize().get();
    }

    @Override
    public int getUndeliveredMessageCount() {
        int count = this.getMessageCount() - this._deliveredMessages.get();
        if (count < 0) {
            return 0;
        }
        return count;
    }

    @Override
    public long getReceivedMessageCount() {
        return this._totalMessagesReceived.get();
    }

    @Override
    public long getOldestMessageArrivalTime() {
        QueueEntry entry = this.getOldestQueueEntry();
        return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
    }

    protected QueueEntry getOldestQueueEntry() {
        return this._entries.next(this._entries.getHead());
    }

    @Override
    public boolean isDeleted() {
        return this._deleted.get();
    }

    @Override
    public List<QueueEntry> getMessagesOnTheQueue() {
        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
        QueueEntryIterator queueListIterator = this._entries.iterator();
        while (queueListIterator.advance()) {
            QueueEntry node = queueListIterator.getNode();
            if (node == null || node.isDeleted()) continue;
            entryList.add(node);
        }
        return entryList;
    }

    @Override
    public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) {
        if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) {
            this._activeSubscriberCount.decrementAndGet();
        } else if (newState == Subscription.State.ACTIVE) {
            if (oldState != Subscription.State.ACTIVE) {
                this._activeSubscriberCount.incrementAndGet();
            }
            this.deliverAsync(sub);
        }
    }

    @Override
    public int compareTo(AMQQueue o) {
        return this._name.compareTo(o.getName());
    }

    public AtomicInteger getAtomicQueueCount() {
        return this._atomicQueueCount;
    }

    public AtomicLong getAtomicQueueSize() {
        return this._atomicQueueSize;
    }

    private boolean isExclusiveSubscriber() {
        return this._exclusiveSubscriber != null;
    }

    private void setExclusiveSubscriber(Subscription exclusiveSubscriber) {
        this._exclusiveSubscriber = exclusiveSubscriber;
    }

    @Override
    public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) {
        return this.getMessagesOnTheQueue(new QueueEntryFilter(){

            public boolean accept(QueueEntry entry) {
                long messageId = entry.getMessage().getMessageId();
                return messageId >= fromMessageId && messageId <= toMessageId;
            }

            public boolean filterComplete() {
                return false;
            }
        });
    }

    @Override
    public QueueEntry getMessageOnTheQueue(final long messageId) {
        List<QueueEntry> entries = this.getMessagesOnTheQueue(new QueueEntryFilter(){
            private boolean _complete;

            public boolean accept(QueueEntry entry) {
                this._complete = entry.getMessage().getMessageId() == messageId;
                return this._complete;
            }

            public boolean filterComplete() {
                return this._complete;
            }
        });
        return entries.isEmpty() ? null : entries.get(0);
    }

    public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) {
        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
        QueueEntryIterator queueListIterator = this._entries.iterator();
        while (queueListIterator.advance() && !filter.filterComplete()) {
            QueueEntry node = queueListIterator.getNode();
            if (node.isDeleted() || !filter.accept(node)) continue;
            entryList.add(node);
        }
        return entryList;
    }

    @Override
    public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, StoreContext storeContext) {
        AMQQueue toQueue = this.getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
        MessageStore store = this.getVirtualHost().getMessageStore();
        List<QueueEntry> entries = this.getMessagesOnTheQueue(new QueueEntryFilter(){

            public boolean accept(QueueEntry entry) {
                long messageId = entry.getMessage().getMessageId();
                return messageId >= fromMessageId && messageId <= toMessageId && entry.acquire();
            }

            public boolean filterComplete() {
                return false;
            }
        });
        try {
            store.beginTran(storeContext);
            for (QueueEntry entry : entries) {
                AMQMessage message = entry.getMessage();
                if (message.isPersistent() && toQueue.isDurable()) {
                    store.enqueueMessage(storeContext, toQueue, message.getMessageId());
                }
                entry.dequeue(storeContext);
            }
            try {
                store.commitTran(storeContext);
            }
            catch (AMQException e) {
                throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
            }
        }
        catch (AMQException e) {
            try {
                store.abortTran(storeContext);
            }
            catch (AMQException rollbackEx) {
                _logger.error((Object)"Failed to rollback transaction when error occured moving messages", (Throwable)rollbackEx);
            }
            throw new RuntimeException(e);
        }
        try {
            for (QueueEntry entry : entries) {
                toQueue.enqueue(storeContext, entry.getMessage());
            }
        }
        catch (MessageCleanupException e) {
            throw new RuntimeException((Throwable)((Object)e));
        }
        catch (AMQException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void copyMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, StoreContext storeContext) {
        AMQQueue toQueue = this.getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
        MessageStore store = this.getVirtualHost().getMessageStore();
        List<QueueEntry> entries = this.getMessagesOnTheQueue(new QueueEntryFilter(){

            public boolean accept(QueueEntry entry) {
                long messageId = entry.getMessage().getMessageId();
                if (messageId >= fromMessageId && messageId <= toMessageId && !entry.isDeleted()) {
                    return entry.getMessage().incrementReference();
                }
                return false;
            }

            public boolean filterComplete() {
                return false;
            }
        });
        try {
            store.beginTran(storeContext);
            for (QueueEntry entry : entries) {
                AMQMessage message = entry.getMessage();
                if (!message.isReferenced() || !message.isPersistent() || !toQueue.isDurable()) continue;
                store.enqueueMessage(storeContext, toQueue, message.getMessageId());
            }
            try {
                store.commitTran(storeContext);
            }
            catch (AMQException e) {
                throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
            }
        }
        catch (AMQException e) {
            try {
                store.abortTran(storeContext);
            }
            catch (AMQException rollbackEx) {
                _logger.error((Object)"Failed to rollback transaction when error occured moving messages", (Throwable)rollbackEx);
            }
            throw new RuntimeException(e);
        }
        try {
            for (QueueEntry entry : entries) {
                if (!entry.getMessage().isReferenced()) continue;
                toQueue.enqueue(storeContext, entry.getMessage());
            }
        }
        catch (MessageCleanupException e) {
            throw new RuntimeException((Throwable)((Object)e));
        }
        catch (AMQException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) {
        try {
            QueueEntryIterator queueListIterator = this._entries.iterator();
            while (queueListIterator.advance()) {
                QueueEntry node = queueListIterator.getNode();
                long messageId = node.getMessage().getMessageId();
                if (messageId < fromMessageId || messageId > toMessageId || node.isDeleted() || !node.acquire()) continue;
                node.discard(storeContext);
            }
        }
        catch (AMQException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteMessageFromTop(StoreContext storeContext) throws AMQException {
        QueueEntryIterator queueListIterator = this._entries.iterator();
        boolean noDeletes = true;
        while (noDeletes && queueListIterator.advance()) {
            QueueEntry node = queueListIterator.getNode();
            if (node.isDeleted() || !node.acquire()) continue;
            node.discard(storeContext);
            noDeletes = false;
        }
    }

    @Override
    public long clearQueue(StoreContext storeContext) throws AMQException {
        QueueEntryIterator queueListIterator = this._entries.iterator();
        long count = 0L;
        while (queueListIterator.advance()) {
            QueueEntry node = queueListIterator.getNode();
            if (node.isDeleted() || !node.acquire()) continue;
            node.discard(storeContext);
            ++count;
        }
        return count;
    }

    @Override
    public void addQueueDeleteTask(AMQQueue.Task task) {
        this._deleteTaskList.add(task);
    }

    @Override
    public int delete() throws AMQException {
        if (!this._deleted.getAndSet(true)) {
            SubscriptionList.SubscriptionNodeIterator subscriptionIter = this._subscriptionList.iterator();
            while (subscriptionIter.advance()) {
                Subscription s = subscriptionIter.getNode().getSubscription();
                if (s == null) continue;
                s.queueDeleted(this);
            }
            this._bindings.deregister();
            this._virtualHost.getQueueRegistry().unregisterQueue(this._name);
            this._managedObject.unregister();
            for (AMQQueue.Task task : this._deleteTaskList) {
                task.doTask(this);
            }
            this._deleteTaskList.clear();
            this.stop();
        }
        return this.getMessageCount();
    }

    @Override
    public void stop() {
        if (!this._stopped.getAndSet(true)) {
            ReferenceCountingExecutorService.getInstance().releaseExecutorService();
        }
    }

    @Override
    public void deliverAsync() {
        this._stateChangeCount.incrementAndGet();
        Runner runner = new Runner();
        if (this._asynchronousRunner.compareAndSet(null, runner)) {
            this._asyncDelivery.execute((Runnable)((Object)runner));
        }
    }

    @Override
    public void deliverAsync(Subscription sub) {
        this._asyncDelivery.execute((Runnable)((Object)new SubFlushRunner(sub)));
    }

    @Override
    public void flushSubscription(Subscription sub) throws AMQException {
        this.flushSubscription(sub, Long.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException {
        boolean atTail = false;
        while (!sub.isSuspended() && !atTail && iterations != 0L) {
            try {
                sub.getSendLock();
                atTail = this.attemptDelivery(sub);
                if (atTail && sub.isAutoClose()) {
                    this.unregisterSubscription(sub);
                    ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
                    converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
                    continue;
                }
                if (atTail) continue;
                Long l = iterations;
                Long l2 = iterations = Long.valueOf(iterations - 1L);
            }
            finally {
                sub.releaseSendLock();
            }
        }
        if (!this.isExclusiveSubscriber()) {
            this.advanceAllSubscriptions();
        }
        return atTail;
    }

    private boolean attemptDelivery(Subscription sub) throws AMQException {
        boolean atTail = false;
        boolean advanced = false;
        boolean subActive = sub.isActive();
        if (subActive) {
            QueueEntry node = this.moveSubscriptionToNextNode(sub);
            if (!(node.isAcquired() || node.isDeleted() || sub.isSuspended())) {
                if (sub.hasInterest(node)) {
                    if (!sub.wouldSuspend(node)) {
                        if (!sub.isBrowser() && !node.acquire(sub)) {
                            sub.restoreCredit(node);
                        } else {
                            QueueEntry newNode;
                            this.deliverMessage(sub, node);
                            if (sub.isBrowser() && (newNode = this._entries.next(node)) != null) {
                                advanced = true;
                                sub.setLastSeenEntry(node, newNode);
                                node = sub.getLastSeenEntry();
                            }
                        }
                    } else {
                        subActive = false;
                        node.addStateChangeListener(new QueueEntryListener(sub, node));
                    }
                } else {
                    QueueEntry newNode = this._entries.next(node);
                    if (newNode != null) {
                        sub.setLastSeenEntry(node, newNode);
                    }
                }
            }
            atTail = this._entries.next(node) == null && !advanced;
        }
        return atTail || !subActive;
    }

    protected void advanceAllSubscriptions() throws AMQException {
        SubscriptionList.SubscriptionNodeIterator subscriberIter = this._subscriptionList.iterator();
        while (subscriberIter.advance()) {
            SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
            Subscription sub = subNode.getSubscription();
            this.moveSubscriptionToNextNode(sub);
        }
    }

    private QueueEntry moveSubscriptionToNextNode(Subscription sub) throws AMQException {
        QueueEntry node = sub.getLastSeenEntry();
        while (node != null && (node.isAcquired() || node.isDeleted() || node.expired())) {
            QueueEntry newNode;
            if (!node.isAcquired() && !node.isDeleted() && node.expired() && node.acquire()) {
                StoreContext reapingStoreContext = new StoreContext();
                node.discard(reapingStoreContext);
            }
            if ((newNode = this._entries.next(node)) == null) break;
            sub.setLastSeenEntry(node, newNode);
            node = sub.getLastSeenEntry();
        }
        return node;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processQueue(Runnable runner) throws AMQException {
        long stateChangeCount;
        long previousStateChangeCount = Long.MIN_VALUE;
        boolean deliveryIncomplete = true;
        int extraLoops = 1;
        Long iterations = new Long(10L);
        this._asynchronousRunner.compareAndSet(runner, null);
        while (iterations != 0L && (previousStateChangeCount != (stateChangeCount = this._stateChangeCount.get()) || deliveryIncomplete) && this._asynchronousRunner.compareAndSet(null, runner)) {
            if (previousStateChangeCount != stateChangeCount) {
                extraLoops = 1;
            }
            previousStateChangeCount = stateChangeCount;
            deliveryIncomplete = this._subscriptionList.size() != 0;
            boolean done = true;
            SubscriptionList.SubscriptionNodeIterator subscriptionIter = this._subscriptionList.iterator();
            while (subscriptionIter.advance()) {
                boolean closeConsumer = false;
                Subscription sub = subscriptionIter.getNode().getSubscription();
                sub.getSendLock();
                try {
                    QueueEntry node;
                    if (sub != null && (node = this.moveSubscriptionToNextNode(sub)) != null) {
                        done = this.attemptDelivery(sub);
                    }
                    if (done) {
                        if (extraLoops == 0) {
                            deliveryIncomplete = false;
                            if (!sub.isAutoClose()) continue;
                            this.unregisterSubscription(sub);
                            ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
                            converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
                            continue;
                        }
                        --extraLoops;
                        continue;
                    }
                    Long l = iterations;
                    Long l2 = iterations = Long.valueOf(iterations - 1L);
                    extraLoops = 1;
                }
                finally {
                    sub.releaseSendLock();
                }
            }
            this._asynchronousRunner.set(null);
        }
        if (iterations == 0L && this._asynchronousRunner.compareAndSet(null, runner)) {
            this._asyncDelivery.execute(runner);
        }
    }

    @Override
    public void checkMessageStatus() throws AMQException {
        StoreContext storeContext = new StoreContext();
        QueueEntryIterator queueListIterator = this._entries.iterator();
        while (queueListIterator.advance()) {
            QueueEntry node = queueListIterator.getNode();
            if (!node.isDeleted() && node.expired() && node.acquire()) {
                node.discard(storeContext);
                continue;
            }
            this._managedObject.checkForNotification(node.getMessage());
        }
    }

    @Override
    public long getMinimumAlertRepeatGap() {
        return this._minimumAlertRepeatGap;
    }

    @Override
    public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) {
        this._minimumAlertRepeatGap = minimumAlertRepeatGap;
    }

    @Override
    public long getMaximumMessageAge() {
        return this._maximumMessageAge;
    }

    @Override
    public void setMaximumMessageAge(long maximumMessageAge) {
        this._maximumMessageAge = maximumMessageAge;
        if (maximumMessageAge == 0L) {
            this._notificationChecks.remove((Object)NotificationCheck.MESSAGE_AGE_ALERT);
        } else {
            this._notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
        }
    }

    @Override
    public long getMaximumMessageCount() {
        return this._maximumMessageCount;
    }

    @Override
    public void setMaximumMessageCount(long maximumMessageCount) {
        this._maximumMessageCount = maximumMessageCount;
        if (maximumMessageCount == 0L) {
            this._notificationChecks.remove((Object)NotificationCheck.MESSAGE_COUNT_ALERT);
        } else {
            this._notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
        }
    }

    @Override
    public long getMaximumQueueDepth() {
        return this._maximumQueueDepth;
    }

    @Override
    public void setMaximumQueueDepth(long maximumQueueDepth) {
        this._maximumQueueDepth = maximumQueueDepth;
        if (maximumQueueDepth == 0L) {
            this._notificationChecks.remove((Object)NotificationCheck.QUEUE_DEPTH_ALERT);
        } else {
            this._notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
        }
    }

    @Override
    public long getMaximumMessageSize() {
        return this._maximumMessageSize;
    }

    @Override
    public void setMaximumMessageSize(long maximumMessageSize) {
        this._maximumMessageSize = maximumMessageSize;
        if (maximumMessageSize == 0L) {
            this._notificationChecks.remove((Object)NotificationCheck.MESSAGE_SIZE_ALERT);
        } else {
            this._notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
        }
    }

    @Override
    public Set<NotificationCheck> getNotificationChecks() {
        return this._notificationChecks;
    }

    @Override
    public ManagedObject getManagedObject() {
        return this._managedObject;
    }

    @Override
    public List<Long> getMessagesOnTheQueue(int num) {
        return this.getMessagesOnTheQueue(num, 0);
    }

    @Override
    public List<Long> getMessagesOnTheQueue(int num, int offset) {
        int i;
        ArrayList<Long> ids = new ArrayList<Long>(num);
        QueueEntryIterator it = this._entries.iterator();
        for (i = 0; i < offset; ++i) {
            it.advance();
        }
        for (i = 0; i < num && !it.atTail(); ++i) {
            it.advance();
            ids.add(it.getNode().getMessage().getMessageId());
        }
        return ids;
    }

    @Override
    public void configure(QueueConfiguration config) {
        if (config != null) {
            this.setMaximumMessageAge(config.getMaximumMessageAge());
            this.setMaximumQueueDepth(config.getMaximumQueueDepth());
            this.setMaximumMessageSize(config.getMaximumMessageSize());
            this.setMaximumMessageCount(config.getMaximumMessageCount());
            this.setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
        }
    }

    private final class QueueEntryListener
    implements QueueEntry.StateChangeListener {
        private final QueueEntry _entry;
        private final Subscription _sub;

        public QueueEntryListener(Subscription sub, QueueEntry entry) {
            this._entry = entry;
            this._sub = sub;
        }

        public boolean equals(Object o) {
            return this._entry == ((QueueEntryListener)o)._entry && this._sub == ((QueueEntryListener)o)._sub;
        }

        public int hashCode() {
            return System.identityHashCode(this._entry) ^ System.identityHashCode(this._sub);
        }

        public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) {
            entry.removeStateChangeListener(this);
            SimpleAMQQueue.this.deliverAsync(this._sub);
        }
    }

    private class SubFlushRunner
    implements ReadWriteRunnable {
        private final Subscription _sub;

        public SubFlushRunner(Subscription sub) {
            this._sub = sub;
        }

        public void run() {
            boolean complete = false;
            try {
                complete = SimpleAMQQueue.this.flushSubscription(this._sub, new Long(10L));
            }
            catch (AMQException e) {
                _logger.error((Object)e);
            }
            if (!complete && !this._sub.isSuspended()) {
                SimpleAMQQueue.this._asyncDelivery.execute((Runnable)((Object)this));
            }
        }

        public boolean isRead() {
            return false;
        }

        public boolean isWrite() {
            return true;
        }
    }

    private class Runner
    implements ReadWriteRunnable {
        private Runner() {
        }

        public void run() {
            try {
                SimpleAMQQueue.this.processQueue((Runnable)((Object)this));
            }
            catch (AMQException e) {
                _logger.error((Object)e);
            }
        }

        public boolean isRead() {
            return false;
        }

        public boolean isWrite() {
            return true;
        }
    }

    public static interface QueueEntryFilter {
        public boolean accept(QueueEntry var1);

        public boolean filterComplete();
    }
}

