/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTemporaryQueue;
import org.apache.qpid.client.AMQTemporaryTopic;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.QueueReceiverAdaptor;
import org.apache.qpid.client.TopicSubscriberAdaptor;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicRecoverBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;

public class AMQSession
extends Closeable
implements Session,
QueueSession,
TopicSession {
    private static final Logger _logger = Logger.getLogger(AMQSession.class);
    public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
    public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
    private AMQConnection _connection;
    private boolean _transacted;
    private int _acknowledgeMode;
    private int _channelId;
    private int _defaultPrefetchHighMark = 5000;
    private int _defaultPrefetchLowMark = 2500;
    private int _nextTag = 1;
    private final FlowControllingBlockingQueue _queue;
    private Dispatcher _dispatcher;
    private MessageFactoryRegistry _messageFactoryRegistry;
    private Map _producers = new ConcurrentHashMap();
    private Map _consumers = new ConcurrentHashMap();
    protected static final boolean DEFAULT_IMMEDIATE = false;
    protected static final boolean DEFAULT_MANDATORY = true;
    private long _nextProducerId;
    private volatile AtomicBoolean _stopped = new AtomicBoolean(true);

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) {
        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, 5000, 2500);
    }

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) {
        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
    }

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) {
        this._connection = con;
        this._transacted = transacted;
        this._acknowledgeMode = transacted ? 0 : acknowledgeMode;
        this._channelId = channelId;
        this._messageFactoryRegistry = messageFactoryRegistry;
        this._defaultPrefetchHighMark = defaultPrefetchHighMark;
        this._defaultPrefetchLowMark = defaultPrefetchLowMark;
        this._queue = this._acknowledgeMode == 257 ? new FlowControllingBlockingQueue(this._defaultPrefetchHighMark, this._defaultPrefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener(){

            public void aboveThreshold(int currentValue) {
                if (AMQSession.this._acknowledgeMode == 257) {
                    _logger.warn((Object)("Above threshold(" + AMQSession.this._defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue));
                    AMQSession.this.suspendChannel();
                }
            }

            public void underThreshold(int currentValue) {
                if (AMQSession.this._acknowledgeMode == 257) {
                    _logger.warn((Object)("Below threshold(" + AMQSession.this._defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue));
                    AMQSession.this.unsuspendChannel();
                }
            }
        }) : new FlowControllingBlockingQueue(this._defaultPrefetchHighMark, null);
    }

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) {
        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry());
    }

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch) {
        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
    }

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) {
        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
    }

    AMQConnection getAMQConnection() {
        return this._connection;
    }

    public BytesMessage createBytesMessage() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            try {
                return (BytesMessage)this._messageFactoryRegistry.createMessage("application/octet-stream");
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create message: " + (Object)((Object)e));
            }
        }
    }

    public MapMessage createMapMessage() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            try {
                return (MapMessage)this._messageFactoryRegistry.createMessage("jms/map-message");
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create message: " + (Object)((Object)e));
            }
        }
    }

    public Message createMessage() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            try {
                return (BytesMessage)this._messageFactoryRegistry.createMessage("application/octet-stream");
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create message: " + (Object)((Object)e));
            }
        }
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            try {
                return (ObjectMessage)this._messageFactoryRegistry.createMessage("application/java-object-stream");
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create message: " + (Object)((Object)e));
            }
        }
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        Object object2 = this._connection.getFailoverMutex();
        synchronized (object2) {
            this.checkNotClosed();
            try {
                ObjectMessage msg = (ObjectMessage)this._messageFactoryRegistry.createMessage("application/java-object-stream");
                msg.setObject(object);
                return msg;
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create message: " + (Object)((Object)e));
            }
        }
    }

    public StreamMessage createStreamMessage() throws JMSException {
        this.checkNotClosed();
        throw new UnsupportedOperationException("Stream messages not supported");
    }

    public TextMessage createTextMessage() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            try {
                return (TextMessage)this._messageFactoryRegistry.createMessage("text/plain");
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create text message: " + (Object)((Object)e));
            }
        }
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            try {
                TextMessage msg = (TextMessage)this._messageFactoryRegistry.createMessage("text/plain");
                msg.setText(text);
                return msg;
            }
            catch (AMQException e) {
                throw new JMSException("Unable to create text message: " + (Object)((Object)e));
            }
        }
    }

    public boolean getTransacted() throws JMSException {
        this.checkNotClosed();
        return this._transacted;
    }

    public int getAcknowledgeMode() throws JMSException {
        this.checkNotClosed();
        return this._acknowledgeMode;
    }

    public void commit() throws JMSException {
        this.checkTransacted();
        try {
            Iterator i = this._consumers.values().iterator();
            while (i.hasNext()) {
                ((BasicMessageConsumer)i.next()).acknowledgeLastDelivered();
            }
            this._connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame((int)this._channelId), TxCommitOkBody.class);
        }
        catch (AMQException e) {
            JMSException exception = new JMSException("Failed to commit: " + e.getMessage());
            exception.setLinkedException((Exception)((Object)e));
            throw exception;
        }
    }

    public void rollback() throws JMSException {
        this.checkTransacted();
        try {
            this._connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame((int)this._channelId), TxRollbackOkBody.class);
        }
        catch (AMQException e) {
            throw (JMSException)new JMSException("Failed to rollback: " + (Object)((Object)e)).initCause((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this._closed.set(true);
            this.closeProducersAndConsumers(null);
            try {
                this._connection.getProtocolHandler().closeSession(this);
                AMQFrame frame = ChannelCloseBody.createAMQFrame((int)this.getChannelId(), (int)AMQConstant.REPLY_SUCCESS.getCode(), (String)"JMS client closing channel", (int)0, (int)0);
                this._connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
            }
            catch (AMQException e) {
                throw new JMSException("Error closing session: " + (Object)((Object)e));
            }
            finally {
                this._connection.deregisterSession(this._channelId);
            }
        }
    }

    private void closeProducersAndConsumers(AMQException amqe) {
        try {
            this.closeProducers();
        }
        catch (JMSException e) {
            _logger.error((Object)("Error closing session: " + (Object)((Object)e)), (Throwable)e);
        }
        try {
            this.closeConsumers(amqe);
        }
        catch (JMSException e) {
            _logger.error((Object)("Error closing session: " + (Object)((Object)e)), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed(Throwable e) {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            this._closed.set(true);
            AMQException amqe = e instanceof AMQException ? (AMQException)e : new AMQException(_logger, "Closing session forcibly", e);
            this._connection.deregisterSession(this._channelId);
            this.closeProducersAndConsumers(amqe);
        }
    }

    void markClosed() {
        this._closed.set(true);
        this._connection.deregisterSession(this._channelId);
        this.markClosedProducersAndConsumers();
    }

    private void markClosedProducersAndConsumers() {
        try {
            this.closeProducers();
        }
        catch (JMSException e) {
            _logger.error((Object)("Error closing session: " + (Object)((Object)e)), (Throwable)e);
        }
        try {
            this.markClosedConsumers();
        }
        catch (JMSException e) {
            _logger.error((Object)("Error closing session: " + (Object)((Object)e)), (Throwable)e);
        }
    }

    private void closeProducers() throws JMSException {
        ArrayList clonedProducers = new ArrayList(this._producers.values());
        for (BasicMessageProducer prod : clonedProducers) {
            prod.close();
        }
    }

    private void closeConsumers(Throwable error) throws JMSException {
        if (this._dispatcher != null) {
            this._dispatcher.stopDispatcher();
        }
        ArrayList clonedConsumers = new ArrayList(this._consumers.values());
        for (BasicMessageConsumer con : clonedConsumers) {
            if (error != null) {
                con.notifyError(error);
                continue;
            }
            con.close();
        }
    }

    private void markClosedConsumers() throws JMSException {
        if (this._dispatcher != null) {
            this._dispatcher.stopDispatcher();
        }
        ArrayList clonedConsumers = new ArrayList(this._consumers.values());
        for (BasicMessageConsumer con : clonedConsumers) {
            con.markClosed();
        }
    }

    public void recover() throws JMSException {
        this.checkNotClosed();
        this.checkNotTransacted();
        this._connection.getProtocolHandler().writeFrame((AMQDataBlock)BasicRecoverBody.createAMQFrame((int)this._channelId, (boolean)false));
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkNotClosed();
        throw new UnsupportedOperationException("MessageListener interface not supported");
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkNotClosed();
        throw new UnsupportedOperationException("MessageListener interface not supported");
    }

    public void run() {
        throw new UnsupportedOperationException();
    }

    public javax.jms.MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException {
        return this.createProducerImpl(destination, mandatory, immediate, waitUntilSent);
    }

    public javax.jms.MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException {
        return this.createProducerImpl(destination, mandatory, immediate);
    }

    public javax.jms.MessageProducer createProducer(Destination destination, boolean immediate) throws JMSException {
        return this.createProducerImpl(destination, true, immediate);
    }

    public javax.jms.MessageProducer createProducer(Destination destination) throws JMSException {
        return this.createProducerImpl(destination, true, false);
    }

    private MessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException {
        return this.createProducerImpl(destination, mandatory, immediate, false);
    }

    private MessageProducer createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent) throws JMSException {
        return (MessageProducer)new FailoverSupport(){

            public Object operation() throws JMSException {
                AMQSession.this.checkNotClosed();
                return new BasicMessageProducer(AMQSession.this._connection, (AMQDestination)destination, AMQSession.this._transacted, AMQSession.this._channelId, AMQSession.this, AMQSession.this._connection.getProtocolHandler(), AMQSession.this.getNextProducerId(), immediate, mandatory, waitUntilSent);
            }
        }.execute(this._connection);
    }

    public QueueReceiver createQueueReceiver(Destination destination) throws JMSException {
        AMQQueue dest = (AMQQueue)destination;
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer(destination);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException {
        AMQQueue dest = (AMQQueue)destination;
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer(destination, messageSelector);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination) throws JMSException {
        return this.createConsumer(destination, this._defaultPrefetchHighMark, this._defaultPrefetchLowMark, false, false, null);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        return this.createConsumer(destination, this._defaultPrefetchHighMark, this._defaultPrefetchLowMark, false, false, messageSelector);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        return this.createConsumer(destination, this._defaultPrefetchHighMark, this._defaultPrefetchLowMark, noLocal, false, messageSelector);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException {
        return this.createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector) throws JMSException {
        return this.createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector, FieldTable rawSelector) throws JMSException {
        return this.createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector);
    }

    public javax.jms.MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable rawSelector) throws JMSException {
        return this.createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector);
    }

    protected javax.jms.MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, final String selector, final FieldTable rawSelector) throws JMSException {
        return (MessageConsumer)new FailoverSupport(){

            public Object operation() throws JMSException {
                AMQSession.this.checkNotClosed();
                AMQDestination amqd = (AMQDestination)destination;
                AMQProtocolHandler protocolHandler = AMQSession.this._connection.getProtocolHandler();
                FieldTable ft = new FieldTable();
                if (rawSelector != null) {
                    ft.putAll((Map)rawSelector);
                }
                BasicMessageConsumer consumer = new BasicMessageConsumer(AMQSession.this._channelId, AMQSession.this._connection, amqd, selector, noLocal, AMQSession.this._messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, AMQSession.this._acknowledgeMode);
                try {
                    AMQSession.this.registerConsumer(consumer);
                }
                catch (AMQException e) {
                    JMSException ex = new JMSException("Error registering consumer: " + (Object)((Object)e));
                    ex.setLinkedException((Exception)((Object)e));
                    throw ex;
                }
                return consumer;
            }
        }.execute(this._connection);
    }

    public void declareExchange(String name, String type) {
        this.declareExchange(name, type, this._connection.getProtocolHandler());
    }

    public void declareExchangeSynch(String name, String type) throws AMQException {
        AMQFrame frame = ExchangeDeclareBody.createAMQFrame((int)this._channelId, (int)0, (String)name, (String)type, (boolean)false, (boolean)false, (boolean)false, (boolean)false, (boolean)false, null);
        this._connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
    }

    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) {
        this.declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
    }

    private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) {
        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame((int)this._channelId, (int)0, (String)name, (String)type, (boolean)false, (boolean)false, (boolean)false, (boolean)false, (boolean)true, null);
        protocolHandler.writeFrame((AMQDataBlock)exchangeDeclare);
    }

    private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException {
        if (amqd.isNameRequired()) {
            amqd.setQueueName(protocolHandler.generateQueueName());
        }
        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame((int)this._channelId, (int)0, (String)amqd.getQueueName(), (boolean)false, (boolean)amqd.isDurable(), (boolean)amqd.isExclusive(), (boolean)amqd.isAutoDelete(), (boolean)true, null);
        protocolHandler.writeFrame((AMQDataBlock)queueDeclare);
        return amqd.getQueueName();
    }

    private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException {
        AMQFrame queueBind = QueueBindBody.createAMQFrame((int)this._channelId, (int)0, (String)queueName, (String)amqd.getExchangeName(), (String)amqd.getRoutingKey(), (boolean)true, (FieldTable)ft);
        protocolHandler.writeFrame((AMQDataBlock)queueBind);
    }

    private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException {
        String tag = Integer.toString(this._nextTag++);
        AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame((int)this._channelId, (int)0, (String)queueName, (String)tag, (boolean)noLocal, (acknowledgeMode == 257 ? 1 : 0) != 0, (boolean)exclusive, (boolean)true);
        protocolHandler.writeFrame((AMQDataBlock)jmsConsume);
        return tag;
    }

    public Queue createQueue(String queueName) throws JMSException {
        if (queueName.indexOf(47) == -1) {
            return new AMQQueue(queueName);
        }
        try {
            return new AMQQueue((BindingURL)new AMQBindingURL(queueName));
        }
        catch (URLSyntaxException urlse) {
            JMSException jmse = new JMSException(urlse.getReason());
            jmse.setLinkedException((Exception)((Object)urlse));
            throw jmse;
        }
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        AMQQueue dest = (AMQQueue)queue;
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer(dest);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        AMQQueue dest = (AMQQueue)queue;
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer(dest, messageSelector);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        return (QueueSender)this.createProducer((Destination)queue);
    }

    public Topic createTopic(String topicName) throws JMSException {
        if (topicName.indexOf(47) == -1) {
            return new AMQTopic(topicName);
        }
        try {
            return new AMQTopic((BindingURL)new AMQBindingURL(topicName));
        }
        catch (URLSyntaxException urlse) {
            JMSException jmse = new JMSException(urlse.getReason());
            jmse.setLinkedException((Exception)((Object)urlse));
            throw jmse;
        }
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        AMQTopic dest = new AMQTopic(topic.getTopicName());
        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)this.createConsumer(dest));
    }

    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        AMQTopic dest = new AMQTopic(topic.getTopicName());
        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)this.createConsumer(dest, messageSelector, noLocal));
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        AMQTopic dest = new AMQTopic((AMQTopic)topic, this._connection.getClientID(), name);
        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)this.createConsumer(dest));
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        AMQTopic dest = new AMQTopic((AMQTopic)topic, this._connection.getClientID(), name);
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer(dest, messageSelector, noLocal);
        return new TopicSubscriberAdaptor(dest, consumer);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        return (TopicPublisher)this.createProducer((Destination)topic);
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        throw new UnsupportedOperationException("Queue browsing not supported");
    }

    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        throw new UnsupportedOperationException("Queue browsing not supported");
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        return new AMQTemporaryQueue();
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        return new AMQTemporaryTopic();
    }

    public void unsubscribe(String name) throws JMSException {
        String queue = this._connection.getClientID() + ":" + name;
        AMQFrame frame = QueueDeleteBody.createAMQFrame((int)this._channelId, (int)0, (String)queue, (boolean)false, (boolean)false, (boolean)true);
        this._connection.getProtocolHandler().writeFrame((AMQDataBlock)frame);
    }

    private void checkTransacted() throws JMSException {
        if (!this.getTransacted()) {
            throw new IllegalStateException("Session is not transacted");
        }
    }

    private void checkNotTransacted() throws JMSException {
        if (this.getTransacted()) {
            throw new IllegalStateException("Session is transacted");
        }
    }

    public void messageReceived(UnprocessedMessage message) {
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Message received in session with channel id " + this._channelId));
        }
        this._queue.add(message);
    }

    public void acknowledgeMessage(long deliveryTag, boolean multiple) {
        AMQFrame ackFrame = BasicAckBody.createAMQFrame((int)this._channelId, (long)deliveryTag, (boolean)multiple);
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Sending ack for delivery tag " + deliveryTag + " on channel " + this._channelId));
        }
        this._connection.getProtocolHandler().writeFrame((AMQDataBlock)ackFrame);
    }

    public int getDefaultPrefetch() {
        return this._defaultPrefetchHighMark;
    }

    public int getDefaultPrefetchHigh() {
        return this._defaultPrefetchHighMark;
    }

    public int getDefaultPrefetchLow() {
        return this._defaultPrefetchLowMark;
    }

    public int getChannelId() {
        return this._channelId;
    }

    void start() {
        if (this._dispatcher != null) {
            this.unsuspendChannel();
        }
        this._dispatcher = new Dispatcher();
        this._dispatcher.setDaemon(true);
        this._dispatcher.start();
    }

    void stop() {
        this.suspendChannel();
        this._stopped.set(true);
    }

    boolean isStopped() {
        return this._stopped.get();
    }

    void registerConsumer(BasicMessageConsumer consumer) throws AMQException {
        AMQDestination amqd = consumer.getDestination();
        AMQProtocolHandler protocolHandler = this._connection.getProtocolHandler();
        this.declareExchange(amqd, protocolHandler);
        String queueName = this.declareQueue(amqd, protocolHandler);
        this.bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
        String consumerTag = this.consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
        consumer.setConsumerTag(consumerTag);
        this._consumers.put(consumerTag, consumer);
    }

    void deregisterConsumer(String consumerTag) {
        this._consumers.remove(consumerTag);
    }

    private void registerProducer(long producerId, javax.jms.MessageProducer producer) {
        this._producers.put(new Long(producerId), producer);
    }

    void deregisterProducer(long producerId) {
        this._producers.remove(new Long(producerId));
    }

    private long getNextProducerId() {
        return ++this._nextProducerId;
    }

    void resubscribe() throws AMQException {
        this.resubscribeProducers();
        this.resubscribeConsumers();
    }

    private void resubscribeProducers() throws AMQException {
        ArrayList producers = new ArrayList(this._producers.values());
        _logger.info((Object)MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size()));
        for (BasicMessageProducer producer : producers) {
            producer.resubscribe();
        }
    }

    private void resubscribeConsumers() throws AMQException {
        ArrayList consumers = new ArrayList(this._consumers.values());
        this._consumers.clear();
        for (BasicMessageConsumer consumer : consumers) {
            this.registerConsumer(consumer);
        }
    }

    private void suspendChannel() {
        _logger.warn((Object)"Suspending channel");
        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame((int)this._channelId, (boolean)false);
        this._connection.getProtocolHandler().writeFrame((AMQDataBlock)channelFlowFrame);
    }

    private void unsuspendChannel() {
        _logger.warn((Object)"Unsuspending channel");
        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame((int)this._channelId, (boolean)true);
        this._connection.getProtocolHandler().writeFrame((AMQDataBlock)channelFlowFrame);
    }

    private class Dispatcher
    extends Thread {
        public Dispatcher() {
            super("Dispatcher-Channel-" + AMQSession.this._channelId);
        }

        public void run() {
            AMQSession.this._stopped.set(false);
            try {
                UnprocessedMessage message;
                while (!AMQSession.this._stopped.get() && (message = (UnprocessedMessage)AMQSession.this._queue.take()) != null) {
                    this.dispatchMessage(message);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            _logger.info((Object)("Dispatcher thread terminating for channel " + AMQSession.this._channelId));
        }

        private void dispatchMessage(UnprocessedMessage message) {
            if (message.deliverBody != null) {
                BasicMessageConsumer consumer = (BasicMessageConsumer)AMQSession.this._consumers.get(message.deliverBody.consumerTag);
                if (consumer == null) {
                    _logger.warn((Object)("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring..."));
                } else {
                    consumer.notifyMessage(message, AMQSession.this._channelId);
                }
            } else {
                try {
                    AbstractJMSMessage bouncedMessage = AMQSession.this._messageFactoryRegistry.createMessage(0L, false, message.contentHeader, message.bodies);
                    int errorCode = message.bounceBody.replyCode;
                    String reason = message.bounceBody.replyText;
                    _logger.debug((Object)("Message returned with error code " + errorCode + " (" + reason + ")"));
                    if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) {
                        AMQSession.this._connection.exceptionReceived((Throwable)((Object)new AMQNoConsumersException("Error: " + reason, bouncedMessage)));
                    } else if (errorCode == AMQConstant.NO_ROUTE.getCode()) {
                        AMQSession.this._connection.exceptionReceived((Throwable)((Object)new AMQNoRouteException("Error: " + reason, bouncedMessage)));
                    } else {
                        AMQSession.this._connection.exceptionReceived((Throwable)new AMQUndeliveredException(errorCode, "Error: " + reason, (Object)bouncedMessage));
                    }
                }
                catch (Exception e) {
                    _logger.error((Object)"Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", (Throwable)e);
                }
            }
        }

        public void stopDispatcher() {
            AMQSession.this._stopped.set(true);
            this.interrupt();
        }
    }
}

