/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;

public class BasicMessageConsumer
extends Closeable
implements MessageConsumer {
    private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
    private AMQConnection _connection;
    private String _messageSelector;
    private boolean _noLocal;
    private AMQDestination _destination;
    private final AtomicBoolean _receiving = new AtomicBoolean(false);
    private final AtomicReference _messageListener = new AtomicReference();
    private String _consumerTag;
    private int _channelId;
    private final SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
    private MessageFactoryRegistry _messageFactory;
    private AMQSession _session;
    private AMQProtocolHandler _protocolHandler;
    private FieldTable _rawSelectorFieldTable;
    private int _prefetchHigh;
    private int _prefetchLow;
    private boolean _exclusive;
    private int _acknowledgeMode;
    private int _outstanding;
    private long _lastDeliveryTag;
    private boolean _dups_ok_acknowledge_send;

    protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) {
        this._channelId = channelId;
        this._connection = connection;
        this._messageSelector = messageSelector;
        this._noLocal = noLocal;
        this._destination = destination;
        this._messageFactory = messageFactory;
        this._session = session;
        this._protocolHandler = protocolHandler;
        this._rawSelectorFieldTable = rawSelectorFieldTable;
        this._prefetchHigh = prefetchHigh;
        this._prefetchLow = prefetchLow;
        this._exclusive = exclusive;
        this._acknowledgeMode = acknowledgeMode;
    }

    public AMQDestination getDestination() {
        return this._destination;
    }

    public String getMessageSelector() throws JMSException {
        return this._messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        return (MessageListener)this._messageListener.get();
    }

    public int getAcknowledgeMode() {
        return this._acknowledgeMode;
    }

    private boolean isMessageListenerSet() {
        return this._messageListener.get() != null;
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        this.checkNotClosed();
        if (this._session.isStopped()) {
            this._messageListener.set(messageListener);
            _logger.debug((Object)("Message listener set for destination " + this._destination));
        } else {
            Object msg;
            if (this._receiving.get()) {
                throw new IllegalStateException("Another thread is already receiving synchronously.");
            }
            if (!this._messageListener.compareAndSet(null, messageListener)) {
                throw new IllegalStateException("Attempt to alter listener while session is started.");
            }
            _logger.debug((Object)("Message listener set for destination " + this._destination));
            if (messageListener != null && (msg = this._synchronousQueue.poll()) != null) {
                AbstractJMSMessage jmsMsg = (AbstractJMSMessage)msg;
                messageListener.onMessage((Message)jmsMsg);
                this.postDeliver(jmsMsg);
            }
        }
    }

    private void acquireReceiving() throws JMSException {
        if (!this._receiving.compareAndSet(false, true)) {
            throw new IllegalStateException("Another thread is already receiving.");
        }
        if (this.isMessageListenerSet()) {
            throw new IllegalStateException("A listener has already been set.");
        }
    }

    private void releaseReceiving() {
        this._receiving.set(false);
    }

    public FieldTable getRawSelectorFieldTable() {
        return this._rawSelectorFieldTable;
    }

    public int getPrefetch() {
        return this._prefetchHigh;
    }

    public int getPrefetchHigh() {
        return this._prefetchHigh;
    }

    public int getPrefetchLow() {
        return this._prefetchLow;
    }

    public boolean isNoLocal() {
        return this._noLocal;
    }

    public boolean isExclusive() {
        return this._exclusive;
    }

    public Message receive() throws JMSException {
        return this.receive(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receive(long l) throws JMSException {
        this.checkNotClosed();
        this.acquireReceiving();
        try {
            Object o = null;
            o = l > 0L ? this._synchronousQueue.poll(l, TimeUnit.MILLISECONDS) : this._synchronousQueue.take();
            AbstractJMSMessage m = this.returnMessageOrThrow(o);
            if (m != null) {
                this.postDeliver(m);
            }
            AbstractJMSMessage abstractJMSMessage = m;
            return abstractJMSMessage;
        }
        catch (InterruptedException e) {
            Message message = null;
            return message;
        }
        finally {
            this.releaseReceiving();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receiveNoWait() throws JMSException {
        this.checkNotClosed();
        this.acquireReceiving();
        try {
            Object o = this._synchronousQueue.poll();
            AbstractJMSMessage m = this.returnMessageOrThrow(o);
            if (m != null) {
                this.postDeliver(m);
            }
            AbstractJMSMessage abstractJMSMessage = m;
            return abstractJMSMessage;
        }
        finally {
            this.releaseReceiving();
        }
    }

    private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException {
        if (o instanceof Throwable) {
            JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
            if (o instanceof Exception) {
                e.setLinkedException((Exception)o);
            }
            throw e;
        }
        return (AbstractJMSMessage)o;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        Object object = this._connection.getFailoverMutex();
        synchronized (object) {
            if (!this._closed.getAndSet(true)) {
                AMQFrame cancelFrame = BasicCancelBody.createAMQFrame((int)this._channelId, (String)this._consumerTag, (boolean)false);
                try {
                    this._protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
                }
                catch (AMQException e) {
                    _logger.error((Object)("Error closing consumer: " + (Object)((Object)e)), (Throwable)e);
                    throw new JMSException("Error closing consumer: " + (Object)((Object)e));
                }
                this.deregisterConsumer();
            }
        }
    }

    void markClosed() {
        this._closed.set(true);
        this.deregisterConsumer();
    }

    void notifyMessage(UnprocessedMessage messageFrame, int channelId) {
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag));
        }
        try {
            AbstractJMSMessage jmsMessage = this._messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, messageFrame.deliverBody.redelivered, messageFrame.contentHeader, messageFrame.bodies);
            _logger.debug((Object)("Message is of type: " + jmsMessage.getClass().getName()));
            this.preDeliver(jmsMessage);
            if (this.isMessageListenerSet()) {
                this.getMessageListener().onMessage((Message)jmsMessage);
                this.postDeliver(jmsMessage);
            } else {
                this._synchronousQueue.put(jmsMessage);
            }
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info((Object)"SynchronousQueue.put interupted. Usually result of connection closing");
            }
            _logger.error((Object)"Caught exception (dump follows) - ignoring...", (Throwable)e);
        }
    }

    private void preDeliver(AbstractJMSMessage msg) {
        switch (this._acknowledgeMode) {
            case 258: {
                this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
                break;
            }
            case 2: {
                msg.setAMQSession(this._session);
            }
        }
    }

    private void postDeliver(AbstractJMSMessage msg) {
        switch (this._acknowledgeMode) {
            case 3: {
                if (++this._outstanding >= this._prefetchHigh) {
                    this._dups_ok_acknowledge_send = true;
                }
                if (this._outstanding <= this._prefetchLow) {
                    this._dups_ok_acknowledge_send = false;
                }
                if (!this._dups_ok_acknowledge_send) break;
                this._session.acknowledgeMessage(msg.getDeliveryTag(), true);
                break;
            }
            case 1: {
                this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
                break;
            }
            case 0: {
                this._lastDeliveryTag = msg.getDeliveryTag();
            }
        }
    }

    void acknowledgeLastDelivered() {
        if (this._lastDeliveryTag > 0L) {
            this._session.acknowledgeMessage(this._lastDeliveryTag, true);
            this._lastDeliveryTag = -1L;
        }
    }

    void notifyError(Throwable cause) {
        this._closed.set(true);
        if (!this.isMessageListenerSet() && this._synchronousQueue.offer(cause)) {
            _logger.debug((Object)"Passed exception to synchronous queue for propagation to receive()");
        }
        this.deregisterConsumer();
    }

    private void deregisterConsumer() {
        this._session.deregisterConsumer(this._consumerTag);
    }

    public String getConsumerTag() {
        return this._consumerTag;
    }

    public void setConsumerTag(String consumerTag) {
        this._consumerTag = consumerTag;
    }
}

