package org.apache.qpid.server.queue;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;

/* loaded from: input_file:org/apache/qpid/server/queue/ConcurrentDeliveryManager.class */
public class ConcurrentDeliveryManager implements DeliveryManager {
    private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);

    @Configured(path = "advanced.compressBufferOnQueue", defaultValue = "false")
    public boolean compressBufferOnQueue;
    private final SubscriptionManager _subscriptions;
    private final AMQQueue _queue;
    private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize();
    private final AtomicBoolean _processing = new AtomicBoolean();
    private ReentrantLock _lock = new ReentrantLock();
    Runner asyncDelivery = new Runner();

    /* loaded from: input_file:org/apache/qpid/server/queue/ConcurrentDeliveryManager$Runner.class */
    private class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            while (z) {
                ConcurrentDeliveryManager.this.processQueue();
                ConcurrentDeliveryManager.this._lock.lock();
                try {
                    if (!ConcurrentDeliveryManager.this.hasQueuedMessages() || !ConcurrentDeliveryManager.this._subscriptions.hasActiveSubscribers()) {
                        z = false;
                        ConcurrentDeliveryManager.this._processing.set(false);
                    }
                } finally {
                    ConcurrentDeliveryManager.this._lock.unlock();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentDeliveryManager(SubscriptionManager subscriptionManager, AMQQueue aMQQueue) {
        Configurator.configure(this);
        if (this.compressBufferOnQueue) {
            _log.info("Compressing Buffers on queue.");
        }
        this._subscriptions = subscriptionManager;
        this._queue = aMQQueue;
    }

    private boolean queueing() {
        return hasQueuedMessages();
    }

    private boolean enqueue(AMQMessage aMQMessage) {
        if (aMQMessage.isImmediate()) {
            return false;
        }
        this._lock.lock();
        try {
            if (!queueing()) {
                return false;
            }
            boolean addMessageToQueue = addMessageToQueue(aMQMessage);
            this._lock.unlock();
            return addMessageToQueue;
        } finally {
            this._lock.unlock();
        }
    }

    private void startQueueing(AMQMessage aMQMessage) {
        if (aMQMessage.isImmediate()) {
            return;
        }
        addMessageToQueue(aMQMessage);
    }

    private boolean addMessageToQueue(AMQMessage aMQMessage) {
        if (this.compressBufferOnQueue) {
            Iterator<ContentBody> it = aMQMessage.getContentBodies().iterator();
            while (it.hasNext()) {
                it.next().reduceBufferToFit();
            }
        }
        this._messages.offer(aMQMessage);
        return true;
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public boolean hasQueuedMessages() {
        this._lock.lock();
        try {
            return !this._messages.isEmpty();
        } finally {
            this._lock.unlock();
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public int getQueueMessageCount() {
        return getMessageCount();
    }

    private int getMessageCount() {
        return this._messages.size();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized List<AMQMessage> getMessages() {
        return new ArrayList(this._messages);
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized void removeAMessageFromTop() throws AMQException {
        AMQMessage poll = poll();
        if (poll != null) {
            poll.dequeue(this._queue);
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public synchronized void clearAllMessages() throws AMQException {
        AMQMessage poll = poll();
        while (true) {
            AMQMessage aMQMessage = poll;
            if (aMQMessage == null) {
                return;
            }
            aMQMessage.dequeue(this._queue);
            poll = poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processQueue() {
        try {
            try {
                boolean hasActiveSubscribers = this._subscriptions.hasActiveSubscribers();
                AMQMessage peek = peek();
                while (peek != null && hasActiveSubscribers) {
                    Subscription nextSubscriber = this._subscriptions.nextSubscriber(peek);
                    if (nextSubscriber != null) {
                        nextSubscriber.send(peek, this._queue);
                        poll();
                        peek = peek();
                    } else {
                        hasActiveSubscribers = false;
                    }
                }
                _log.debug("End of processQueue: (" + getQueueMessageCount() + ") subscribers:" + this._subscriptions.hasActiveSubscribers());
            } catch (FailedDequeueException e) {
                _log.error("Unable to deliver message as dequeue failed: " + e, e);
                _log.debug("End of processQueue: (" + getQueueMessageCount() + ") subscribers:" + this._subscriptions.hasActiveSubscribers());
            }
        } catch (Throwable th) {
            _log.debug("End of processQueue: (" + getQueueMessageCount() + ") subscribers:" + this._subscriptions.hasActiveSubscribers());
            throw th;
        }
    }

    private AMQMessage peek() {
        return this._messages.peek();
    }

    private AMQMessage poll() {
        return this._messages.poll();
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void processAsync(Executor executor) {
        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") Active:" + this._subscriptions.hasActiveSubscribers() + " Processing:" + this._processing.get());
        if (hasQueuedMessages() && this._subscriptions.hasActiveSubscribers() && this._processing.compareAndSet(false, true)) {
            executor.execute(this.asyncDelivery);
        }
    }

    @Override // org.apache.qpid.server.queue.DeliveryManager
    public void deliver(String str, AMQMessage aMQMessage) throws FailedDequeueException {
        if (enqueue(aMQMessage)) {
            return;
        }
        this._lock.lock();
        try {
            Subscription nextSubscriber = this._subscriptions.nextSubscriber(aMQMessage);
            if (nextSubscriber != null) {
                nextSubscriber.send(aMQMessage, this._queue);
                aMQMessage.setDeliveredToConsumer();
            } else if (!aMQMessage.isImmediate()) {
                startQueueing(aMQMessage);
            }
        } finally {
            this._lock.unlock();
        }
    }
}
