/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.txn;

import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.StoreMessageOperation;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.TxnBuffer;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class LocalTransactionalContext
implements TransactionalContext {
    private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
    private final TxnBuffer _txnBuffer = new TxnBuffer();
    private final List<DeliveryAction> _postCommitDeliveryList = new ArrayList<DeliveryAction>();
    private TxAck _ackOp;
    private boolean _inTran = false;
    private boolean _messageDelivered = false;
    private final AMQChannel _channel;

    public LocalTransactionalContext(AMQChannel channel) {
        this._channel = channel;
    }

    @Override
    public StoreContext getStoreContext() {
        return this._channel.getStoreContext();
    }

    public List<RequiredDeliveryException> getReturnMessages() {
        return this._channel.getReturnMessages();
    }

    public MessageStore getMessageStore() {
        return this._channel.getMessageStore();
    }

    @Override
    public void rollback() throws AMQException {
        this._txnBuffer.rollback(this.getStoreContext());
        if (this.getMessageStore().inTran(this.getStoreContext())) {
            this.getMessageStore().abortTran(this.getStoreContext());
            this._inTran = false;
        }
        this._postCommitDeliveryList.clear();
    }

    @Override
    public void deliver(AMQQueue queue, AMQMessage message) throws AMQException {
        this._postCommitDeliveryList.add(new PublishAction(queue, message));
        this._messageDelivered = true;
    }

    @Override
    public void requeue(QueueEntry entry) throws AMQException {
        this._postCommitDeliveryList.add(new RequeueAction(entry));
        this._messageDelivered = true;
    }

    private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        if (!unacknowledgedMessageMap.contains(deliveryTag)) {
            throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
        }
    }

    @Override
    public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        if (!multiple || deliveryTag > 0L) {
            this.checkAck(deliveryTag, unacknowledgedMessageMap);
        }
        if (this._ackOp == null) {
            this._ackOp = new TxAck(unacknowledgedMessageMap);
            this._txnBuffer.enlist(this._ackOp);
        }
        if (multiple && deliveryTag == 0L) {
            this._ackOp.update(lastDeliveryTag, multiple);
        } else {
            this._ackOp.update(deliveryTag, multiple);
        }
        if (!this._inTran && this._ackOp.checkPersistent()) {
            this.beginTranIfNecessary();
        }
    }

    @Override
    public void messageFullyReceived(boolean persistent) throws AMQException {
    }

    @Override
    public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException {
    }

    @Override
    public void beginTranIfNecessary() throws AMQException {
        if (!this._inTran) {
            if (_log.isDebugEnabled()) {
                _log.debug((Object)("Starting transaction on message store: " + this));
            }
            this.getMessageStore().beginTran(this.getStoreContext());
            this._inTran = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws AMQException {
        if (_log.isDebugEnabled()) {
            _log.debug((Object)("Committing transactional context: " + this));
        }
        if (this._ackOp != null) {
            this._messageDelivered = true;
            this._ackOp.consolidate();
            this._ackOp = null;
        }
        if (this._messageDelivered && this._inTran) {
            this._txnBuffer.enlist(new StoreMessageOperation(this.getMessageStore()));
        }
        try {
            this._txnBuffer.commit(this.getStoreContext());
        }
        finally {
            this._messageDelivered = false;
            this._inTran = this.getMessageStore().inTran(this.getStoreContext());
        }
        try {
            this.postCommitDelivery();
        }
        catch (AMQException e) {
            _log.error((Object)("Failed to deliver messages following txn commit: " + (Object)((Object)e)), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void postCommitDelivery() throws AMQException {
        if (_log.isDebugEnabled()) {
            _log.debug((Object)"Performing post commit delivery");
        }
        try {
            for (DeliveryAction dd : this._postCommitDeliveryList) {
                dd.process();
            }
        }
        finally {
            this._postCommitDeliveryList.clear();
        }
    }

    private class PublishAction
    extends DeliveryAction {
        private final AMQQueue _queue;
        private final AMQMessage _message;

        public PublishAction(AMQQueue queue, AMQMessage message) {
            this._queue = queue;
            this._message = message;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void process() throws AMQException {
            this._message.incrementReference();
            try {
                QueueEntry entry = this._queue.enqueue(LocalTransactionalContext.this.getStoreContext(), this._message);
                if (entry.immediateAndNotDelivered()) {
                    LocalTransactionalContext.this.getReturnMessages().add(new NoConsumersException(this._message));
                }
            }
            finally {
                this._message.decrementReference(LocalTransactionalContext.this.getStoreContext());
            }
        }
    }

    private class RequeueAction
    extends DeliveryAction {
        public QueueEntry entry;

        public RequeueAction(QueueEntry entry) {
            this.entry = entry;
        }

        public void process() throws AMQException {
            this.entry.requeue(LocalTransactionalContext.this.getStoreContext());
        }
    }

    private abstract class DeliveryAction {
        private DeliveryAction() {
        }

        public abstract void process() throws AMQException;
    }
}

