package org.apache.qpid.server.txn;

import java.util.List;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;

/* loaded from: input_file:org/apache/qpid/server/txn/NonTransactionalContext.class */
public class NonTransactionalContext implements TransactionalContext {
    private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
    private final AMQChannel _channel;
    private final List<RequiredDeliveryException> _returnMessages;
    private final MessageStore _messageStore;
    private final StoreContext _storeContext;
    private boolean _inTran;

    public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel aMQChannel, List<RequiredDeliveryException> list) {
        this._channel = aMQChannel;
        this._storeContext = storeContext;
        this._returnMessages = list;
        this._messageStore = messageStore;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public StoreContext getStoreContext() {
        return this._storeContext;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void beginTranIfNecessary() throws AMQException {
        if (this._inTran) {
            return;
        }
        this._messageStore.beginTran(this._storeContext);
        this._inTran = true;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void commit() throws AMQException {
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void rollback() throws AMQException {
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void deliver(AMQQueue aMQQueue, AMQMessage aMQMessage) throws AMQException {
        QueueEntry enqueue = aMQQueue.enqueue(this._storeContext, aMQMessage);
        if (enqueue.immediateAndNotDelivered()) {
            this._returnMessages.add(new NoConsumersException(enqueue.getMessage()));
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void requeue(QueueEntry queueEntry) throws AMQException {
        queueEntry.requeue(this._storeContext);
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void acknowledgeMessage(long j, long j2, boolean z, final UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        final boolean isDebugEnabled = _log.isDebugEnabled();
        if (!z) {
            QueueEntry queueEntry = unacknowledgedMessageMap.get(j);
            if (queueEntry == null) {
                _log.info("Single ack on delivery tag " + j + " not known for channel:" + this._channel.getChannelId());
                throw new AMQException("Single ack on delivery tag " + j + " not known for channel:" + this._channel.getChannelId());
            }
            if (isDebugEnabled) {
                _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
            }
            if (queueEntry.getMessage().isPersistent()) {
                beginTranIfNecessary();
            }
            queueEntry.discard(this._storeContext);
            unacknowledgedMessageMap.remove(j);
            if (isDebugEnabled) {
                _log.debug("Received non-multiple ack for messaging with delivery tag " + j + " msg id " + queueEntry.getMessage().getMessageId());
            }
        } else if (j == 0) {
            _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + unacknowledgedMessageMap.size());
            unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { // from class: org.apache.qpid.server.txn.NonTransactionalContext.1
                @Override // org.apache.qpid.server.ack.UnacknowledgedMessageMap.Visitor
                public boolean callback(long j3, QueueEntry queueEntry2) throws AMQException {
                    if (isDebugEnabled) {
                        NonTransactionalContext._log.debug("Discarding message: " + queueEntry2.getMessage().getMessageId());
                    }
                    if (queueEntry2.getMessage().isPersistent()) {
                        NonTransactionalContext.this.beginTranIfNecessary();
                    }
                    queueEntry2.discard(NonTransactionalContext.this._storeContext);
                    return false;
                }

                @Override // org.apache.qpid.server.ack.UnacknowledgedMessageMap.Visitor
                public void visitComplete() {
                    unacknowledgedMessageMap.clear();
                }
            });
        } else {
            if (!unacknowledgedMessageMap.contains(j)) {
                throw new AMQException("Multiple ack on delivery tag " + j + " not known for channel");
            }
            unacknowledgedMessageMap.drainTo(j, this._storeContext);
        }
        if (this._inTran) {
            this._messageStore.commitTran(this._storeContext);
            this._inTran = false;
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void messageFullyReceived(boolean z) throws AMQException {
        if (z) {
            this._messageStore.commitTran(this._storeContext);
            this._inTran = false;
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void messageProcessed(AMQProtocolSession aMQProtocolSession) throws AMQException {
        this._channel.processReturns();
    }
}
