/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.txn;

import java.util.List;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class NonTransactionalContext
implements TransactionalContext {
    private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
    private final AMQChannel _channel;
    private final List<RequiredDeliveryException> _returnMessages;
    private final MessageStore _messageStore;
    private final StoreContext _storeContext;
    private boolean _inTran;

    public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, List<RequiredDeliveryException> returnMessages) {
        this._channel = channel;
        this._storeContext = storeContext;
        this._returnMessages = returnMessages;
        this._messageStore = messageStore;
    }

    @Override
    public StoreContext getStoreContext() {
        return this._storeContext;
    }

    @Override
    public void beginTranIfNecessary() throws AMQException {
        if (!this._inTran) {
            this._messageStore.beginTran(this._storeContext);
            this._inTran = true;
        }
    }

    @Override
    public void commit() throws AMQException {
    }

    @Override
    public void rollback() throws AMQException {
    }

    @Override
    public void deliver(AMQQueue queue, AMQMessage message) throws AMQException {
        QueueEntry entry = queue.enqueue(this._storeContext, message);
        if (entry.immediateAndNotDelivered()) {
            this._returnMessages.add(new NoConsumersException(entry.getMessage()));
        }
    }

    @Override
    public void requeue(QueueEntry entry) throws AMQException {
        entry.requeue(this._storeContext);
    }

    @Override
    public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException {
        final boolean debug = _log.isDebugEnabled();
        if (multiple) {
            if (deliveryTag == 0L) {
                _log.info((Object)("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + unacknowledgedMessageMap.size()));
                unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor(){

                    public boolean callback(long deliveryTag, QueueEntry message) throws AMQException {
                        if (debug) {
                            _log.debug((Object)("Discarding message: " + message.getMessage().getMessageId()));
                        }
                        if (message.getMessage().isPersistent()) {
                            NonTransactionalContext.this.beginTranIfNecessary();
                        }
                        message.discard(NonTransactionalContext.this._storeContext);
                        return false;
                    }

                    public void visitComplete() {
                        unacknowledgedMessageMap.clear();
                    }
                });
            } else {
                if (!unacknowledgedMessageMap.contains(deliveryTag)) {
                    throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
                }
                unacknowledgedMessageMap.drainTo(deliveryTag, this._storeContext);
            }
        } else {
            QueueEntry msg = unacknowledgedMessageMap.get(deliveryTag);
            if (msg == null) {
                _log.info((Object)("Single ack on delivery tag " + deliveryTag + " not known for channel:" + this._channel.getChannelId()));
                throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + this._channel.getChannelId());
            }
            if (debug) {
                _log.debug((Object)("Discarding message: " + msg.getMessage().getMessageId()));
            }
            if (msg.getMessage().isPersistent()) {
                this.beginTranIfNecessary();
            }
            msg.discard(this._storeContext);
            unacknowledgedMessageMap.remove(deliveryTag);
            if (debug) {
                _log.debug((Object)("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + msg.getMessage().getMessageId()));
            }
        }
        if (this._inTran) {
            this._messageStore.commitTran(this._storeContext);
            this._inTran = false;
        }
    }

    @Override
    public void messageFullyReceived(boolean persistent) throws AMQException {
        if (persistent) {
            this._messageStore.commitTran(this._storeContext);
            this._inTran = false;
        }
    }

    @Override
    public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException {
        this._channel.processReturns();
    }
}

