/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client.protocol;

import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.protocol.HeartbeatDiagnostics;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.ssl.BogusSSLContextFactory;

public class AMQProtocolHandler
extends IoHandlerAdapter {
    private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
    private AMQConnection _connection;
    private boolean _useSSL;
    private volatile AMQProtocolSession _protocolSession;
    private AMQStateManager _stateManager = new AMQStateManager();
    private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
    private FailoverHandler _failoverHandler;
    private FailoverState _failoverState = FailoverState.NOT_STARTED;
    private CountDownLatch _failoverLatch;
    private static int _messageReceivedCount;
    private static int _messagesOut;

    public AMQProtocolHandler(AMQConnection con) {
        this._connection = con;
        this._frameListeners.add(new AMQMethodListener(){

            public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
                return AMQProtocolHandler.this._stateManager.methodReceived(evt);
            }

            public void error(Exception e) {
                AMQProtocolHandler.this._stateManager.error(e);
            }
        });
    }

    public boolean isUseSSL() {
        return this._useSSL;
    }

    public void setUseSSL(boolean useSSL) {
        this._useSSL = useSSL;
    }

    public void sessionCreated(IoSession session) throws Exception {
        _logger.debug((Object)("Protocol session created for session " + System.identityHashCode(session)));
        this._failoverHandler = new FailoverHandler(this, session);
        ProtocolCodecFilter pcf = new ProtocolCodecFilter((ProtocolCodecFactory)new AMQCodecFactory(false));
        if (Boolean.getBoolean("amqj.shared_read_write_pool")) {
            session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", (IoFilter)pcf);
        } else {
            session.getFilterChain().addLast("protocolFilter", (IoFilter)pcf);
        }
        if (this._useSSL) {
            SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory.getInstance((boolean)false));
            sslFilter.setUseClientMode(true);
            session.getFilterChain().addBefore("protocolFilter", "ssl", (IoFilter)sslFilter);
        }
        this._protocolSession = new AMQProtocolSession(this, session, this._connection);
        this._protocolSession.init();
    }

    public void sessionOpened(IoSession session) throws Exception {
        System.setProperty("foo", "bar");
    }

    public void sessionClosed(IoSession session) throws Exception {
        if (this._connection.isClosed()) {
            _logger.info((Object)"Session closed called by client");
        } else {
            _logger.info((Object)("Session closed called with failover state currently " + this._failoverState));
            if (this._failoverState != FailoverState.IN_PROGRESS && this._connection.failoverAllowed()) {
                _logger.info((Object)"FAILOVER STARTING");
                if (this._failoverState == FailoverState.NOT_STARTED) {
                    this._failoverState = FailoverState.IN_PROGRESS;
                    this.startFailoverThread();
                } else {
                    _logger.info((Object)("Not starting failover as state currently " + this._failoverState));
                }
            } else {
                _logger.info((Object)"Failover not allowed by policy.");
                if (_logger.isDebugEnabled()) {
                    _logger.debug((Object)this._connection.getFailoverPolicy().toString());
                }
                if (this._failoverState != FailoverState.IN_PROGRESS) {
                    _logger.info((Object)"sessionClose() not allowed to failover");
                    this._connection.exceptionReceived((Throwable)new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
                } else {
                    _logger.info((Object)"sessionClose() failover in progress");
                }
            }
        }
        _logger.info((Object)("Protocol Session [" + (Object)((Object)this) + "] closed"));
    }

    private void startFailoverThread() {
        Thread failoverThread = new Thread(this._failoverHandler);
        failoverThread.setName("Failover");
        failoverThread.setDaemon(false);
        failoverThread.start();
    }

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        _logger.debug((Object)("Protocol Session [" + (Object)((Object)this) + ":" + session + "] idle: " + status));
        if (IdleStatus.WRITER_IDLE.equals(status)) {
            _logger.debug((Object)"Sent heartbeat");
            session.write((Object)HeartbeatBody.FRAME);
            HeartbeatDiagnostics.sent();
        } else if (IdleStatus.READER_IDLE.equals(status)) {
            HeartbeatDiagnostics.timeout();
            _logger.warn((Object)"Timed out while waiting for heartbeat from peer.");
            session.close();
        }
    }

    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        if (this._failoverState == FailoverState.NOT_STARTED) {
            if (cause instanceof AMQConnectionClosedException) {
                _logger.info((Object)("Exception caught therefore going to attempt failover: " + cause), cause);
                this.sessionClosed(session);
            }
        } else if (this._failoverState == FailoverState.FAILED) {
            _logger.error((Object)("Exception caught by protocol handler: " + cause), cause);
            AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
            this.propagateExceptionToWaiters((Exception)((Object)amqe));
            this._connection.exceptionReceived(cause);
        }
    }

    public void propagateExceptionToWaiters(Exception e) {
        this._stateManager.error(e);
        for (AMQMethodListener ml : this._frameListeners) {
            ml.error(e);
        }
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
        if (_messageReceivedCount++ % 1000 == 0) {
            _logger.debug((Object)("Received " + _messageReceivedCount + " protocol messages"));
        }
        Iterator it = this._frameListeners.iterator();
        AMQFrame frame = (AMQFrame)message;
        HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
        if (frame.bodyFrame instanceof AMQMethodBody) {
            if (_logger.isDebugEnabled()) {
                _logger.debug((Object)("Method frame received: " + frame));
            }
            AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody)frame.bodyFrame, this._protocolSession);
            try {
                boolean wasAnyoneInterested = false;
                while (it.hasNext()) {
                    AMQMethodListener listener = (AMQMethodListener)it.next();
                    wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                }
                if (!wasAnyoneInterested) {
                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
                }
            }
            catch (AMQException e) {
                for (AMQMethodListener listener : this._frameListeners) {
                    listener.error((Exception)((Object)e));
                }
                this.exceptionCaught(session, e);
            }
        } else if (frame.bodyFrame instanceof ContentHeaderBody) {
            this._protocolSession.messageContentHeaderReceived(frame.channel, (ContentHeaderBody)frame.bodyFrame);
        } else if (frame.bodyFrame instanceof ContentBody) {
            this._protocolSession.messageContentBodyReceived(frame.channel, (ContentBody)frame.bodyFrame);
        } else if (frame.bodyFrame instanceof HeartbeatBody) {
            _logger.debug((Object)"Received heartbeat");
        }
        this._connection.bytesReceived(this._protocolSession.getIoSession().getReadBytes());
    }

    public void messageSent(IoSession session, Object message) throws Exception {
        if (_messagesOut++ % 1000 == 0) {
            _logger.debug((Object)("Sent " + _messagesOut + " protocol messages"));
        }
        this._connection.bytesSent(session.getWrittenBytes());
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Sent frame " + message));
        }
    }

    public void addFrameListener(AMQMethodListener listener) {
        this._frameListeners.add(listener);
    }

    public void removeFrameListener(AMQMethodListener listener) {
        this._frameListeners.remove(listener);
    }

    public void attainState(AMQState s) throws AMQException {
        this._stateManager.attainState(s);
    }

    public void writeFrame(AMQDataBlock frame) {
        this._protocolSession.writeFrame(frame);
    }

    public void writeFrame(AMQDataBlock frame, boolean wait) {
        this._protocolSession.writeFrame(frame, wait);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) throws AMQException {
        try {
            AMQMethodEvent e;
            this._frameListeners.add(listener);
            this._protocolSession.writeFrame((AMQDataBlock)frame);
            AMQMethodEvent aMQMethodEvent = e = listener.blockForFrame();
            return aMQMethodEvent;
        }
        finally {
            this._frameListeners.remove(listener);
        }
    }

    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException {
        return this.writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.channel, responseClass));
    }

    public void addSessionByChannel(int channelId, AMQSession session) {
        this._protocolSession.addSessionByChannel(channelId, session);
    }

    public void removeSessionByChannel(int channelId) {
        this._protocolSession.removeSessionByChannel(channelId);
    }

    public void closeSession(AMQSession session) throws AMQException {
        this._protocolSession.closeSession(session);
    }

    public void closeConnection() throws AMQException {
        this._stateManager.changeState(AMQState.CONNECTION_CLOSING);
        AMQFrame frame = ConnectionCloseBody.createAMQFrame((int)0, (int)AMQConstant.REPLY_SUCCESS.getCode(), (String)"JMS client is closing the connection.", (int)0, (int)0);
        this.syncWrite(frame, ConnectionCloseOkBody.class);
        this._protocolSession.closeProtocolSession();
    }

    public long getReadBytes() {
        return this._protocolSession.getIoSession().getReadBytes();
    }

    public long getWrittenBytes() {
        return this._protocolSession.getIoSession().getWrittenBytes();
    }

    public void failover(String host, int port) {
        this._failoverHandler.setHost(host);
        this._failoverHandler.setPort(port);
        this.startFailoverThread();
    }

    public void blockUntilNotFailingOver() throws InterruptedException {
        if (this._failoverLatch != null) {
            this._failoverLatch.await();
        }
    }

    public String generateQueueName() {
        return this._protocolSession.generateQueueName();
    }

    public CountDownLatch getFailoverLatch() {
        return this._failoverLatch;
    }

    public void setFailoverLatch(CountDownLatch failoverLatch) {
        this._failoverLatch = failoverLatch;
    }

    public AMQConnection getConnection() {
        return this._connection;
    }

    public AMQStateManager getStateManager() {
        return this._stateManager;
    }

    public void setStateManager(AMQStateManager stateManager) {
        this._stateManager = stateManager;
    }

    FailoverState getFailoverState() {
        return this._failoverState;
    }

    public void setFailoverState(FailoverState failoverState) {
        this._failoverState = failoverState;
    }
}

