package org.apache.qpid.server.queue;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/qpid/server/queue/SynchronizedDeliveryManager.class */
public class SynchronizedDeliveryManager implements DeliveryManager {
    private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
    private final Queue<AMQMessage> _messages = new LinkedList();
    private final AtomicBoolean _processing = new AtomicBoolean();
    private final SubscriptionManager _subscriptions;
    private volatile boolean _queueing;
    private final AMQQueue _queue;

    /* loaded from: input_file:org/apache/qpid/server/queue/SynchronizedDeliveryManager$Runner.class */
    private class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            SynchronizedDeliveryManager.this.processQueue();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronizedDeliveryManager(SubscriptionManager subscriptionManager, AMQQueue aMQQueue) {
        this._subscriptions = subscriptionManager;
        this._queue = aMQQueue;
    }

    private synchronized boolean enqueue(AMQMessage aMQMessage) {
        if (aMQMessage.isImmediate() || !this._queueing) {
            return false;
        }
        this._messages.offer(aMQMessage);
        return true;
    }

    private synchronized void startQueueing(AMQMessage aMQMessage) {
        this._queueing = true;
        enqueue(aMQMessage);
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized boolean hasQueuedMessages() {
        boolean isEmpty = this._messages.isEmpty();
        if (isEmpty) {
            this._queueing = false;
        }
        return !isEmpty;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized int getQueueMessageCount() {
        return this._messages.size();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized List<AMQMessage> getMessages() {
        return new ArrayList(this._messages);
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized void removeAMessageFromTop() throws AMQException {
        AMQMessage poll = poll();
        if (poll != null) {
            poll.dequeue(this._queue);
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized void clearAllMessages() throws AMQException {
        AMQMessage poll = poll();
        while (true) {
            AMQMessage aMQMessage = poll;
            if (aMQMessage == null) {
                return;
            }
            aMQMessage.dequeue(this._queue);
            poll = poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processQueue() {
        try {
            boolean hasActiveSubscribers = this._subscriptions.hasActiveSubscribers();
            while (hasQueuedMessages() && hasActiveSubscribers) {
                Subscription nextSubscriber = this._subscriptions.nextSubscriber(peek());
                if (nextSubscriber != null) {
                    try {
                        nextSubscriber.send(poll(), this._queue);
                    } catch (AMQException e) {
                        _log.error("Unable to deliver message: " + e, e);
                    }
                } else {
                    hasActiveSubscribers = false;
                }
            }
        } finally {
            this._processing.set(false);
        }
    }

    private synchronized AMQMessage peek() {
        return this._messages.peek();
    }

    private synchronized AMQMessage poll() {
        return this._messages.poll();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void processAsync(Executor executor) {
        if (hasQueuedMessages() && this._subscriptions.hasActiveSubscribers() && this._processing.compareAndSet(false, true)) {
            executor.execute(new Runner());
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void deliver(String str, AMQMessage aMQMessage) throws FailedDequeueException {
        if (enqueue(aMQMessage)) {
            return;
        }
        synchronized (this) {
            Subscription nextSubscriber = this._subscriptions.nextSubscriber(aMQMessage);
            if (nextSubscriber == null) {
                startQueueing(aMQMessage);
            } else {
                nextSubscriber.send(aMQMessage, this._queue);
                aMQMessage.setDeliveredToConsumer();
            }
        }
    }
}
