/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQBrokerDetails;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQQueueSessionAdaptor;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;

public class AMQConnection
extends Closeable
implements Connection,
QueueConnection,
TopicConnection,
Referenceable {
    private static final Logger _logger = Logger.getLogger(AMQConnection.class);
    private AtomicInteger _idFactory = new AtomicInteger(0);
    private final Object _failoverMutex = new Object();
    private long _maximumChannelCount;
    private long _maximumFrameSize;
    private AMQProtocolHandler _protocolHandler;
    private final Map _sessions = new LinkedHashMap();
    private String _clientName;
    private String _username;
    private String _password;
    private String _virtualHost;
    private ExceptionListener _exceptionListener;
    private ConnectionListener _connectionListener;
    private ConnectionURL _connectionURL;
    private boolean _started;
    private FailoverPolicy _failoverPolicy;
    private boolean _connected;
    private AMQException _lastAMQException = null;

    public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException {
        this(new AMQConnectionURL("amqp://" + username + ":" + password + "@" + clientName + virtualHost + "?brokerlist='" + broker + "'"));
    }

    public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException {
        this(host, port, false, username, password, clientName, virtualHost);
    }

    public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException {
        this(new AMQConnectionURL(useSSL ? "amqp://" + username + ":" + password + "@" + clientName + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + "ssl" + "='true'" : "amqp://" + username + ":" + password + "@" + clientName + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + "ssl" + "='false'"));
    }

    public AMQConnection(String connection) throws AMQException, URLSyntaxException {
        this(new AMQConnectionURL(connection));
    }

    public AMQConnection(ConnectionURL connectionURL) throws AMQException {
        _logger.info((Object)("Connection:" + connectionURL));
        if (connectionURL == null) {
            throw new IllegalArgumentException("Connection must be specified");
        }
        this._connectionURL = connectionURL;
        this._clientName = connectionURL.getClientName();
        this._username = connectionURL.getUsername();
        this._password = connectionURL.getPassword();
        this._virtualHost = connectionURL.getVirtualHost();
        this._failoverPolicy = new FailoverPolicy(connectionURL);
        this._protocolHandler = new AMQProtocolHandler(this);
        this._connected = false;
        Exception lastException = new Exception();
        lastException.initCause(new ConnectException());
        while (lastException != null && this.checkException(lastException) && this._failoverPolicy.failoverAllowed()) {
            try {
                this.makeBrokerConnection(this._failoverPolicy.getNextBrokerDetails());
                lastException = null;
            }
            catch (Exception e) {
                lastException = e;
                _logger.info((Object)("Unable to connect to broker at " + this._failoverPolicy.getCurrentBrokerDetails()), e.getCause());
            }
        }
        _logger.debug((Object)("Are we connected:" + this._connected));
        if (this._failoverPolicy.failoverAllowed()) {
            while (!this._connected && !this._closed.get()) {
                try {
                    _logger.debug((Object)"Sleeping.");
                    Thread.sleep(100L);
                }
                catch (InterruptedException ie) {
                    _logger.debug((Object)"Woken up.");
                }
            }
            if (!(this._failoverPolicy.failoverAllowed() && this._failoverPolicy.getCurrentBrokerDetails() != null || this._lastAMQException == null)) {
                throw this._lastAMQException;
            }
        } else {
            String message = null;
            if (lastException != null) {
                message = lastException.getCause() != null ? lastException.getCause().getMessage() : lastException.getMessage();
            }
            if (message == null || message.equals("")) {
                message = "Unable to Connect";
            }
            AMQConnectionException e = new AMQConnectionException(message);
            if (lastException != null) {
                if (lastException instanceof UnresolvedAddressException) {
                    e = new AMQUnresolvedAddressException(message, ((Object)this._failoverPolicy.getCurrentBrokerDetails()).toString());
                }
                e.initCause((Throwable)lastException);
            }
            throw e;
        }
    }

    protected boolean checkException(Throwable thrown) {
        Throwable cause = thrown.getCause();
        if (cause == null) {
            cause = thrown;
        }
        return cause instanceof ConnectException || cause instanceof UnresolvedAddressException;
    }

    protected AMQConnection(String username, String password, String clientName, String virtualHost) {
        this._clientName = clientName;
        this._username = username;
        this._password = password;
        this._virtualHost = virtualHost;
    }

    private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException {
        try {
            TransportConnection.getInstance(brokerDetail).connect(this._protocolHandler, brokerDetail);
            this._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
            this._failoverPolicy.attainedConnection();
            this._connected = true;
        }
        catch (AMQException e) {
            this._lastAMQException = e;
            throw e;
        }
    }

    public boolean attemptReconnection(String host, int port, boolean useSSL) {
        AMQBrokerDetails bd = new AMQBrokerDetails(host, port, useSSL);
        this._failoverPolicy.setBroker(bd);
        try {
            this.makeBrokerConnection(bd);
            return true;
        }
        catch (Exception e) {
            _logger.info((Object)("Unable to connect to broker at " + bd));
            this.attemptReconnection();
            return false;
        }
    }

    public boolean attemptReconnection() {
        while (this._failoverPolicy.failoverAllowed()) {
            try {
                this.makeBrokerConnection(this._failoverPolicy.getNextBrokerDetails());
                return true;
            }
            catch (Exception e) {
                if (!(e instanceof AMQException)) {
                    _logger.info((Object)("Unable to connect to broker at " + this._failoverPolicy.getCurrentBrokerDetails()), (Throwable)e);
                    continue;
                }
                _logger.info((Object)(e.getMessage() + ":Unable to connect to broker at " + this._failoverPolicy.getCurrentBrokerDetails()));
            }
        }
        return false;
    }

    public BrokerDetails getActiveBrokerDetails() {
        return this._failoverPolicy.getCurrentBrokerDetails();
    }

    public boolean failoverAllowed() {
        return this._failoverPolicy.failoverAllowed();
    }

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        return this.createSession(transacted, acknowledgeMode, 5000);
    }

    public org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode, int prefetch) throws JMSException {
        return this.createSession(transacted, acknowledgeMode, prefetch, prefetch);
    }

    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException {
        this.checkNotClosed();
        if (this.channelLimitReached()) {
            throw new ChannelLimitReachedException(this._maximumChannelCount);
        }
        return (org.apache.qpid.jms.Session)new FailoverSupport(){

            public Object operation() throws JMSException {
                int channelId = AMQConnection.this._idFactory.incrementAndGet();
                if (_logger.isDebugEnabled()) {
                    _logger.debug((Object)("Write channel open frame for channel id " + channelId));
                }
                AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
                AMQConnection.this._protocolHandler.addSessionByChannel(channelId, session);
                AMQConnection.this.registerSession(channelId, session);
                boolean success = false;
                try {
                    AMQConnection.this.createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
                    success = true;
                }
                catch (AMQException e) {
                    JMSException jmse = new JMSException("Error creating session: " + (Object)((Object)e));
                    jmse.setLinkedException((Exception)((Object)e));
                    throw jmse;
                }
                finally {
                    if (!success) {
                        AMQConnection.this._protocolHandler.removeSessionByChannel(channelId);
                        AMQConnection.this.deregisterSession(channelId);
                    }
                }
                if (AMQConnection.this._started) {
                    session.start();
                }
                return session;
            }
        }.execute(this);
    }

    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException {
        this._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame((int)channelId, null), ChannelOpenOkBody.class);
        this._protocolHandler.syncWrite(BasicQosBody.createAMQFrame((int)channelId, (long)0L, (int)prefetchHigh, (boolean)false), BasicQosOkBody.class);
        if (transacted) {
            if (_logger.isDebugEnabled()) {
                _logger.debug((Object)("Issuing TxSelect for " + channelId));
            }
            this._protocolHandler.syncWrite(TxSelectBody.createAMQFrame((int)channelId), TxSelectOkBody.class);
        }
    }

    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException {
        try {
            this.createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
        }
        catch (AMQException e) {
            this._protocolHandler.removeSessionByChannel(channelId);
            this.deregisterSession(channelId);
            throw new AMQException("Error reopening channel " + channelId + " after failover: " + (Object)((Object)e));
        }
    }

    public void setFailoverPolicy(FailoverPolicy policy) {
        this._failoverPolicy = policy;
    }

    public FailoverPolicy getFailoverPolicy() {
        return this._failoverPolicy;
    }

    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
        return new AMQQueueSessionAdaptor(this.createSession(transacted, acknowledgeMode));
    }

    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
        return new AMQTopicSessionAdaptor(this.createSession(transacted, acknowledgeMode));
    }

    private boolean channelLimitReached() {
        return this._maximumChannelCount != 0L && (long)this._sessions.size() == this._maximumChannelCount;
    }

    public String getClientID() throws JMSException {
        this.checkNotClosed();
        return this._clientName;
    }

    public void setClientID(String clientID) throws JMSException {
        this.checkNotClosed();
        this._clientName = clientID;
    }

    public ConnectionMetaData getMetaData() throws JMSException {
        this.checkNotClosed();
        return null;
    }

    public ExceptionListener getExceptionListener() throws JMSException {
        this.checkNotClosed();
        return this._exceptionListener;
    }

    public void setExceptionListener(ExceptionListener listener) throws JMSException {
        this.checkNotClosed();
        this._exceptionListener = listener;
    }

    public void start() throws JMSException {
        this.checkNotClosed();
        if (!this._started) {
            Iterator it = this._sessions.entrySet().iterator();
            while (it.hasNext()) {
                AMQSession s = (AMQSession)it.next().getValue();
                s.start();
            }
            this._started = true;
        }
    }

    public void stop() throws JMSException {
        this.checkNotClosed();
        if (this._started) {
            Iterator i = this._sessions.values().iterator();
            while (i.hasNext()) {
                ((AMQSession)i.next()).stop();
            }
            this._started = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        Object object = this.getFailoverMutex();
        synchronized (object) {
            if (!this._closed.getAndSet(true)) {
                try {
                    this.closeAllSessions(null);
                    this._protocolHandler.closeConnection();
                }
                catch (AMQException e) {
                    throw new JMSException("Error closing connection: " + (Object)((Object)e));
                }
            }
        }
    }

    private void markAllSessionsClosed() {
        LinkedList sessionCopy = new LinkedList(this._sessions.values());
        for (AMQSession session : sessionCopy) {
            session.markClosed();
        }
        this._sessions.clear();
    }

    private void closeAllSessions(Throwable cause) throws JMSException {
        LinkedList sessionCopy = new LinkedList(this._sessions.values());
        Iterator it = sessionCopy.iterator();
        JMSException sessionException = null;
        while (it.hasNext()) {
            AMQSession session = (AMQSession)it.next();
            if (cause != null) {
                session.closed(cause);
                continue;
            }
            try {
                session.close();
            }
            catch (JMSException e) {
                _logger.error((Object)("Error closing session: " + (Object)((Object)e)));
                sessionException = e;
            }
        }
        this._sessions.clear();
        if (sessionException != null) {
            throw sessionException;
        }
    }

    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        return null;
    }

    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        return null;
    }

    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        return null;
    }

    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        return null;
    }

    public long getMaximumChannelCount() throws JMSException {
        this.checkNotClosed();
        return this._maximumChannelCount;
    }

    public void setConnectionListener(ConnectionListener listener) {
        this._connectionListener = listener;
    }

    public ConnectionListener getConnectionListener() {
        return this._connectionListener;
    }

    public void setMaximumChannelCount(long maximumChannelCount) {
        this._maximumChannelCount = maximumChannelCount;
    }

    public void setMaximumFrameSize(long frameMax) {
        this._maximumFrameSize = frameMax;
    }

    public long getMaximumFrameSize() {
        return this._maximumFrameSize;
    }

    public Map getSessions() {
        return this._sessions;
    }

    public String getUsername() {
        return this._username;
    }

    public String getPassword() {
        return this._password;
    }

    public String getVirtualHost() {
        return this._virtualHost;
    }

    public AMQProtocolHandler getProtocolHandler() {
        return this._protocolHandler;
    }

    public void bytesSent(long writtenBytes) {
        if (this._connectionListener != null) {
            this._connectionListener.bytesSent(writtenBytes);
        }
    }

    public void bytesReceived(long receivedBytes) {
        if (this._connectionListener != null) {
            this._connectionListener.bytesReceived(receivedBytes);
        }
    }

    public boolean firePreFailover(boolean redirect) {
        boolean proceed = true;
        if (this._connectionListener != null) {
            proceed = this._connectionListener.preFailover(redirect);
        }
        return proceed;
    }

    public boolean firePreResubscribe() throws JMSException {
        if (this._connectionListener != null) {
            boolean resubscribe = this._connectionListener.preResubscribe();
            if (!resubscribe) {
                this.markAllSessionsClosed();
            }
            return resubscribe;
        }
        return true;
    }

    public void fireFailoverComplete() {
        if (this._connectionListener != null) {
            this._connectionListener.failoverComplete();
        }
    }

    public final Object getFailoverMutex() {
        return this._failoverMutex;
    }

    public void blockUntilNotFailingOver() throws InterruptedException {
        this._protocolHandler.blockUntilNotFailingOver();
    }

    public void exceptionReceived(Throwable cause) {
        JMSException je;
        _logger.debug((Object)("Connection Close done by:" + Thread.currentThread().getName()));
        _logger.debug((Object)"exceptionReceived is ", cause);
        if (cause instanceof JMSException) {
            je = (JMSException)cause;
        } else {
            je = new JMSException("Exception thrown against " + this.toString() + ": " + cause);
            if (cause instanceof Exception) {
                je.setLinkedException((Exception)cause);
            }
        }
        if (cause instanceof IOException) {
            this._closed.set(true);
        }
        if (this._exceptionListener != null) {
            this._exceptionListener.onException(je);
        }
        if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException)) {
            try {
                _logger.info((Object)("Closing AMQConnection due to :" + cause.getMessage()));
                this._closed.set(true);
                this.closeAllSessions(cause);
            }
            catch (JMSException e) {
                _logger.error((Object)("Error closing all sessions: " + (Object)((Object)e)), (Throwable)e);
            }
        } else {
            _logger.info((Object)"Not a hard-error connection not closing.");
        }
    }

    void registerSession(int channelId, AMQSession session) {
        this._sessions.put(channelId, session);
    }

    void deregisterSession(int channelId) {
        this._sessions.remove(channelId);
    }

    public void resubscribeSessions() throws JMSException, AMQException {
        ArrayList sessions = new ArrayList(this._sessions.values());
        _logger.info((Object)MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size()));
        for (AMQSession s : sessions) {
            this._protocolHandler.addSessionByChannel(s.getChannelId(), s);
            this.reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
            s.resubscribe();
        }
    }

    public String toString() {
        StringBuffer buf = new StringBuffer("AMQConnection:\n");
        if (this._failoverPolicy.getCurrentBrokerDetails() == null) {
            buf.append("No active broker connection");
        } else {
            BrokerDetails bd = this._failoverPolicy.getCurrentBrokerDetails();
            buf.append("Host: ").append(String.valueOf(bd.getHost()));
            buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
        }
        buf.append("\nVirtual Host: ").append(String.valueOf(this._virtualHost));
        buf.append("\nClient ID: ").append(String.valueOf(this._clientName));
        buf.append("\nActive session count: ").append(this._sessions == null ? 0 : this._sessions.size());
        return buf.toString();
    }

    public String toURL() {
        return this._connectionURL.toString();
    }

    public Reference getReference() throws NamingException {
        return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), this.toURL()), AMQConnectionFactory.class.getName(), null);
    }
}

