package org.apache.qpid.server.txn;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;

/* loaded from: input_file:org/apache/qpid/server/txn/LocalTransactionalContext.class */
public class LocalTransactionalContext implements TransactionalContext {
    private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
    private TxAck _ackOp;
    private final AMQChannel _channel;
    private final TxnBuffer _txnBuffer = new TxnBuffer();
    private final List<DeliveryAction> _postCommitDeliveryList = new ArrayList();
    private boolean _inTran = false;
    private boolean _messageDelivered = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/txn/LocalTransactionalContext$DeliveryAction.class */
    public abstract class DeliveryAction {
        private DeliveryAction() {
        }

        public abstract void process() throws AMQException;
    }

    /* loaded from: input_file:org/apache/qpid/server/txn/LocalTransactionalContext$PublishAction.class */
    private class PublishAction extends DeliveryAction {
        private final AMQQueue _queue;
        private final AMQMessage _message;

        public PublishAction(AMQQueue aMQQueue, AMQMessage aMQMessage) {
            super();
            this._queue = aMQQueue;
            this._message = aMQMessage;
        }

        @Override // org.apache.qpid.server.txn.LocalTransactionalContext.DeliveryAction
        public void process() throws AMQException {
            this._message.incrementReference();
            try {
                if (this._queue.enqueue(LocalTransactionalContext.this.getStoreContext(), this._message).immediateAndNotDelivered()) {
                    LocalTransactionalContext.this.getReturnMessages().add(new NoConsumersException(this._message));
                }
            } finally {
                this._message.decrementReference(LocalTransactionalContext.this.getStoreContext());
            }
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/txn/LocalTransactionalContext$RequeueAction.class */
    private class RequeueAction extends DeliveryAction {
        public QueueEntry entry;

        public RequeueAction(QueueEntry queueEntry) {
            super();
            this.entry = queueEntry;
        }

        @Override // org.apache.qpid.server.txn.LocalTransactionalContext.DeliveryAction
        public void process() throws AMQException {
            this.entry.requeue(LocalTransactionalContext.this.getStoreContext());
        }
    }

    public LocalTransactionalContext(AMQChannel aMQChannel) {
        this._channel = aMQChannel;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public StoreContext getStoreContext() {
        return this._channel.getStoreContext();
    }

    public List<RequiredDeliveryException> getReturnMessages() {
        return this._channel.getReturnMessages();
    }

    public MessageStore getMessageStore() {
        return this._channel.getMessageStore();
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void rollback() throws AMQException {
        this._txnBuffer.rollback(getStoreContext());
        if (getMessageStore().inTran(getStoreContext())) {
            getMessageStore().abortTran(getStoreContext());
            this._inTran = false;
        }
        this._postCommitDeliveryList.clear();
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void deliver(AMQQueue aMQQueue, AMQMessage aMQMessage) throws AMQException {
        this._postCommitDeliveryList.add(new PublishAction(aMQQueue, aMQMessage));
        this._messageDelivered = true;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void requeue(QueueEntry queueEntry) throws AMQException {
        this._postCommitDeliveryList.add(new RequeueAction(queueEntry));
        this._messageDelivered = true;
    }

    private void checkAck(long j, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        if (!unacknowledgedMessageMap.contains(j)) {
            throw new AMQException("Ack with delivery tag " + j + " not known for channel");
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void acknowledgeMessage(long j, long j2, boolean z, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        if (!z || j > 0) {
            checkAck(j, unacknowledgedMessageMap);
        }
        if (this._ackOp == null) {
            this._ackOp = new TxAck(unacknowledgedMessageMap);
            this._txnBuffer.enlist(this._ackOp);
        }
        if (z && j == 0) {
            this._ackOp.update(j2, z);
        } else {
            this._ackOp.update(j, z);
        }
        if (this._inTran || !this._ackOp.checkPersistent()) {
            return;
        }
        beginTranIfNecessary();
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void messageFullyReceived(boolean z) throws AMQException {
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void messageProcessed(AMQProtocolSession aMQProtocolSession) throws AMQException {
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void beginTranIfNecessary() throws AMQException {
        if (this._inTran) {
            return;
        }
        if (_log.isDebugEnabled()) {
            _log.debug("Starting transaction on message store: " + this);
        }
        getMessageStore().beginTran(getStoreContext());
        this._inTran = true;
    }

    @Override // org.apache.qpid.server.txn.TransactionalContext
    public void commit() throws AMQException {
        if (_log.isDebugEnabled()) {
            _log.debug("Committing transactional context: " + this);
        }
        if (this._ackOp != null) {
            this._messageDelivered = true;
            this._ackOp.consolidate();
            this._ackOp = null;
        }
        if (this._messageDelivered && this._inTran) {
            this._txnBuffer.enlist(new StoreMessageOperation(getMessageStore()));
        }
        try {
            this._txnBuffer.commit(getStoreContext());
            this._messageDelivered = false;
            this._inTran = getMessageStore().inTran(getStoreContext());
            try {
                postCommitDelivery();
            } catch (AMQException e) {
                _log.error("Failed to deliver messages following txn commit: " + e, e);
            }
        } catch (Throwable th) {
            this._messageDelivered = false;
            this._inTran = getMessageStore().inTran(getStoreContext());
            throw th;
        }
    }

    private void postCommitDelivery() throws AMQException {
        if (_log.isDebugEnabled()) {
            _log.debug("Performing post commit delivery");
        }
        try {
            Iterator<DeliveryAction> it = this._postCommitDeliveryList.iterator();
            while (it.hasNext()) {
                it.next().process();
            }
        } finally {
            this._postCommitDeliveryList.clear();
        }
    }
}
