/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.ExtractResendAndRequeue;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.UnauthorizedAccessException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AMQChannel {
    public static final int DEFAULT_PREFETCH = 5000;
    private static final Logger _log = Logger.getLogger(AMQChannel.class);
    private final int _channelId;
    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0L, 0L);
    private long _deliveryTag = 0L;
    private AMQQueue _defaultQueue;
    private int _consumerTag;
    private IncomingMessage _currentMessage;
    protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
    private final MessageStore _messageStore;
    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(5000);
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private TransactionalContext _txnContext;
    private final StoreContext _storeContext;
    private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
    private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
    private final AMQProtocolSession _session;
    private boolean _closing;
    private final String id = "(" + System.identityHashCode(this) + ")";
    private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod(){

        public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException {
            AMQChannel.this.getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), AMQChannel.this.getChannelId(), deliveryTag, sub.getConsumerTag());
        }
    };
    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod(){

        public void recordMessageDelivery(Subscription sub, QueueEntry entry, long deliveryTag) {
            AMQChannel.this.addUnacknowledgedMessage(entry, deliveryTag, sub);
        }
    };

    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException {
        this._session = session;
        this._channelId = channelId;
        this._storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
        this._messageStore = messageStore;
        this._txnContext = new NonTransactionalContext(this._messageStore, this._storeContext, this, this._returnMessages);
    }

    public void setLocalTransactional() {
        this._txnContext = new LocalTransactionalContext(this);
    }

    public boolean isTransactional() {
        return !(this._txnContext instanceof NonTransactionalContext);
    }

    public int getChannelId() {
        return this._channelId;
    }

    public void setPublishFrame(MessagePublishInfo info, Exchange e) throws AMQException {
        this._currentMessage = new IncomingMessage(this._messageStore.getNewMessageId(), info, this._txnContext, this._session);
        this._currentMessage.setMessageStore(this._messageStore);
        this._currentMessage.setExchange(e);
    }

    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
        }
        if (_log.isDebugEnabled()) {
            _log.debug((Object)("Content header received on channel " + this._channelId));
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        this._currentMessage.setExpiration();
        this.routeCurrentMessage();
        this._currentMessage.routingComplete(this._messageStore, this._messageHandleFactory);
        this.deliverCurrentMessageIfComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverCurrentMessageIfComplete() throws AMQException {
        if (this._currentMessage.allContentReceived()) {
            try {
                this._currentMessage.deliverToQueues();
            }
            catch (NoRouteException e) {
                this._returnMessages.add(e);
            }
            catch (UnauthorizedAccessException ex) {
                this._returnMessages.add(ex);
            }
            finally {
                this._txnContext.messageProcessed(this._session);
                this._currentMessage = null;
            }
        }
    }

    public void publishContentBody(ContentBody contentBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
        }
        if (_log.isDebugEnabled()) {
            _log.debug((Object)(this.debugIdentity() + "Content body received on channel " + this._channelId));
        }
        try {
            this._currentMessage.addContentBodyFrame(this._session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk((AMQBody)contentBody));
            this.deliverCurrentMessageIfComplete();
        }
        catch (AMQException e) {
            this._currentMessage = null;
            throw e;
        }
    }

    protected void routeCurrentMessage() throws AMQException {
        try {
            this._currentMessage.route();
        }
        catch (NoRouteException e) {
            this._returnMessages.add(e);
        }
    }

    public long getNextDeliveryTag() {
        return ++this._deliveryTag;
    }

    public int getNextConsumerTag() {
        return ++this._consumerTag;
    }

    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException {
        if (tag == null) {
            tag = new AMQShortString("sgen_" + this.getNextConsumerTag());
        }
        if (this._tag2SubscriptionMap.containsKey(tag)) {
            throw new ConsumerTagNotUniqueException();
        }
        Subscription subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(this._channelId, this._session, tag, acks, filters, noLocal, this._creditManager);
        this._tag2SubscriptionMap.put(tag, subscription);
        try {
            queue.registerSubscription(subscription, exclusive);
        }
        catch (AMQException e) {
            this._tag2SubscriptionMap.remove(tag);
            throw e;
        }
        return tag;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException {
        Subscription sub = this._tag2SubscriptionMap.remove(consumerTag);
        if (sub != null) {
            try {
                sub.getSendLock();
                sub.getQueue().unregisterSubscription(sub);
            }
            finally {
                sub.releaseSendLock();
            }
            return true;
        }
        _log.warn((Object)("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."));
        return false;
    }

    public void close() throws AMQException {
        this._txnContext.rollback();
        this.unsubscribeAllConsumers();
        try {
            this.requeue();
        }
        catch (AMQException e) {
            _log.error((Object)("Caught AMQException whilst attempting to reque:" + (Object)((Object)e)));
        }
        this.setClosing(true);
    }

    private void setClosing(boolean closing) {
        this._closing = closing;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsubscribeAllConsumers() throws AMQException {
        if (_log.isInfoEnabled()) {
            if (!this._tag2SubscriptionMap.isEmpty()) {
                _log.info((Object)("Unsubscribing all consumers on channel " + this.toString()));
            } else {
                _log.info((Object)("No consumers to unsubscribe on channel " + this.toString()));
            }
        }
        for (Map.Entry<AMQShortString, Subscription> me : this._tag2SubscriptionMap.entrySet()) {
            if (_log.isInfoEnabled()) {
                _log.info((Object)("Unsubscribing consumer '" + me.getKey() + "' on channel " + this.toString()));
            }
            Subscription sub = me.getValue();
            try {
                sub.getSendLock();
                sub.getQueue().unregisterSubscription(sub);
            }
            finally {
                sub.releaseSendLock();
            }
        }
        this._tag2SubscriptionMap.clear();
    }

    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) {
        if (_log.isDebugEnabled()) {
            if (entry.getQueue() == null) {
                _log.debug((Object)("Adding unacked message with a null queue:" + entry.debugIdentity()));
            } else if (_log.isDebugEnabled()) {
                _log.debug((Object)(this.debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + ") with a queue(" + entry.getQueue() + ") for " + subscription));
            }
        }
        this._unacknowledgedMessageMap.add(deliveryTag, entry);
    }

    public String debugIdentity() {
        return this._channelId + this.id;
    }

    public void requeue() throws AMQException {
        Collection<QueueEntry> messagesToBeDelivered = this._unacknowledgedMessageMap.cancelAllMessages();
        TransactionalContext deliveryContext = null;
        if (!messagesToBeDelivered.isEmpty()) {
            if (_log.isInfoEnabled()) {
                _log.info((Object)("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + this.toString()));
            }
            deliveryContext = !(this._txnContext instanceof NonTransactionalContext) ? new NonTransactionalContext(this._messageStore, this._storeContext, this, this._returnMessages) : this._txnContext;
        }
        for (QueueEntry unacked : messagesToBeDelivered) {
            if (!unacked.isQueueDeleted()) {
                unacked.getMessage().setRedelivered(true);
                unacked.release();
                deliveryContext.requeue(unacked);
                continue;
            }
            unacked.discard(this._storeContext);
        }
    }

    public void requeue(long deliveryTag) throws AMQException {
        QueueEntry unacked = this._unacknowledgedMessageMap.remove(deliveryTag);
        if (unacked != null) {
            unacked.getMessage().setRedelivered(true);
            if (!unacked.isQueueDeleted()) {
                unacked.release();
            }
            TransactionalContext deliveryContext = !(this._txnContext instanceof NonTransactionalContext) ? new NonTransactionalContext(this._messageStore, this._storeContext, this, this._returnMessages) : this._txnContext;
            if (!unacked.isQueueDeleted()) {
                deliveryContext.requeue(unacked);
            } else {
                _log.warn((Object)(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."));
                unacked.discard(this._storeContext);
            }
        } else {
            _log.warn((Object)("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + this._unacknowledgedMessageMap.size()));
        }
    }

    public void resend(boolean requeue) throws AMQException {
        LinkedHashMap<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
        LinkedHashMap<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
        if (_log.isDebugEnabled()) {
            _log.debug((Object)("unacked map Size:" + this._unacknowledgedMessageMap.size()));
        }
        this._unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(this._unacknowledgedMessageMap, msgToRequeue, msgToResend, requeue, this._storeContext));
        if (_log.isDebugEnabled()) {
            if (!msgToResend.isEmpty()) {
                _log.debug((Object)("Preparing (" + msgToResend.size() + ") message to resend."));
            } else {
                _log.debug((Object)"No message to resend.");
            }
        }
        for (Map.Entry entry : msgToResend.entrySet()) {
            QueueEntry message = (QueueEntry)entry.getValue();
            long deliveryTag = (Long)entry.getKey();
            AMQMessage msg = message.getMessage();
            AMQQueue queue = message.getQueue();
            msg.setRedelivered(true);
            Subscription sub = message.getDeliveredSubscription();
            if (sub != null) {
                if (queue.resend(message, sub)) continue;
                msgToRequeue.put(deliveryTag, message);
                continue;
            }
            if (_log.isInfoEnabled()) {
                _log.info((Object)("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"));
            }
            msgToRequeue.put(deliveryTag, message);
        }
        if (_log.isInfoEnabled() && !msgToRequeue.isEmpty()) {
            _log.info((Object)("Preparing (" + msgToRequeue.size() + ") message to requeue to."));
        }
        TransactionalContext deliveryContext = !(this._txnContext instanceof NonTransactionalContext) ? new NonTransactionalContext(this._messageStore, this._storeContext, this, this._returnMessages) : this._txnContext;
        for (Map.Entry entry : msgToRequeue.entrySet()) {
            QueueEntry message = (QueueEntry)entry.getValue();
            long deliveryTag = (Long)entry.getKey();
            message.release();
            message.setRedelivered(true);
            deliveryContext.requeue(message);
            this._unacknowledgedMessageMap.remove(deliveryTag);
        }
    }

    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException {
        this._unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, this._txnContext);
    }

    public UnacknowledgedMessageMap getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    public void setSuspended(boolean suspended) {
        boolean wasSuspended = this._suspended.getAndSet(suspended);
        if (wasSuspended != suspended && wasSuspended) {
            for (Subscription s : this._tag2SubscriptionMap.values()) {
                s.getQueue().deliverAsync(s);
            }
        }
    }

    public boolean isSuspended() {
        return this._suspended.get();
    }

    public void commit() throws AMQException {
        if (!this.isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        this._txnContext.commit();
    }

    public void rollback() throws AMQException {
        this._txnContext.rollback();
    }

    public String toString() {
        return "[" + this._session.toString() + ":" + this._channelId + "]";
    }

    public void setDefaultQueue(AMQQueue queue) {
        this._defaultQueue = queue;
    }

    public AMQQueue getDefaultQueue() {
        return this._defaultQueue;
    }

    public StoreContext getStoreContext() {
        return this._storeContext;
    }

    public void processReturns() throws AMQException {
        if (!this._returnMessages.isEmpty()) {
            for (RequiredDeliveryException bouncedMessage : this._returnMessages) {
                AMQMessage message = bouncedMessage.getAMQMessage();
                this._session.getProtocolOutputConverter().writeReturn(message, this._channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage()));
                message.decrementReference(this._storeContext);
            }
            this._returnMessages.clear();
        }
    }

    public TransactionalContext getTransactionalContext() {
        return this._txnContext;
    }

    public boolean isClosing() {
        return this._closing;
    }

    public AMQProtocolSession getProtocolSession() {
        return this._session;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    public void setCredit(long prefetchSize, int prefetchCount) {
        this._creditManager.setCreditLimits(prefetchSize, prefetchCount);
    }

    public List<RequiredDeliveryException> getReturnMessages() {
        return this._returnMessages;
    }

    public MessageStore getMessageStore() {
        return this._messageStore;
    }

    public ClientDeliveryMethod getClientDeliveryMethod() {
        return this._clientDeliveryMethod;
    }

    public RecordDeliveryMethod getRecordDeliveryMethod() {
        return this._recordDeliveryMethod;
    }
}

