/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.proton.hawtdispatch.api;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.SessionImpl;
import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
import org.apache.qpid.proton.hawtdispatch.api.AmqpEndpointBase;
import org.apache.qpid.proton.hawtdispatch.api.AmqpLink;
import org.apache.qpid.proton.hawtdispatch.api.AmqpSender;
import org.apache.qpid.proton.hawtdispatch.api.AmqpSession;
import org.apache.qpid.proton.hawtdispatch.api.Callback;
import org.apache.qpid.proton.hawtdispatch.api.Future;
import org.apache.qpid.proton.hawtdispatch.api.Promise;
import org.apache.qpid.proton.hawtdispatch.api.TransportState;
import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.Task;

public class AmqpConnection
extends AmqpEndpointBase {
    AmqpTransport transport;
    ConnectionImpl connection;
    HashSet<AmqpSender> senders = new HashSet();
    boolean closing = false;

    public static AmqpConnection connect(AmqpConnectOptions options) {
        return new AmqpConnection(options);
    }

    private AmqpConnection(AmqpConnectOptions options) {
        this.transport = AmqpTransport.connect(options);
        this.transport.setListener(new AmqpListener(){

            @Override
            public void processDelivery(Delivery delivery) {
                AmqpEndpointBase.Attachment attachment = (AmqpEndpointBase.Attachment)((Object)AmqpConnection.this.getTransport().context((Endpoint)delivery.getLink()).getAttachment());
                AmqpLink link = (AmqpLink)attachment.endpoint();
                link.processDelivery(delivery);
            }

            @Override
            public void processRefill() {
                for (AmqpSender sender : new ArrayList<AmqpSender>(AmqpConnection.this.senders)) {
                    sender.pumpDeliveries();
                }
                AmqpConnection.this.pumpOut();
            }

            @Override
            public void processTransportFailure(IOException e) {
            }
        });
        this.connection = this.transport.connection();
        this.connection.open();
        this.attach();
    }

    public void waitForConnected() throws Exception {
        AmqpConnection.assertNotOnDispatchQueue();
        this.getConnectedFuture().await();
    }

    public Future<Void> getConnectedFuture() {
        final Promise<Void> rc = new Promise<Void>();
        this.queue().execute(new Task(){

            public void run() {
                AmqpConnection.this.onConnected(rc);
            }
        });
        return rc;
    }

    public void onConnected(Callback<Void> cb) {
        this.transport.onTransportConnected(cb);
    }

    @Override
    protected Endpoint getEndpoint() {
        return this.connection;
    }

    @Override
    protected AmqpConnection getConnection() {
        return this;
    }

    @Override
    protected AmqpEndpointBase getParent() {
        return null;
    }

    public AmqpSession createSession() {
        this.assertExecuting();
        SessionImpl session = this.connection.session();
        session.open();
        this.pumpOut();
        return new AmqpSession(this, session);
    }

    public int getMaxSessions() {
        return this.connection.getMaxChannels();
    }

    public void disconnect() {
        this.closing = true;
        this.transport.disconnect();
    }

    public void waitForDisconnected() throws Exception {
        AmqpConnection.assertNotOnDispatchQueue();
        this.getDisconnectedFuture().await();
    }

    public Future<Void> getDisconnectedFuture() {
        final Promise<Void> rc = new Promise<Void>();
        this.queue().execute(new Task(){

            public void run() {
                AmqpConnection.this.onDisconnected(rc);
            }
        });
        return rc;
    }

    public void onDisconnected(Callback<Void> cb) {
        this.transport.onTransportDisconnected(cb);
    }

    public TransportState getTransportState() {
        return this.transport.getState();
    }

    public Throwable getTransportFailure() {
        return this.transport.getFailure();
    }

    public Future<Throwable> getTransportFailureFuture() {
        final Promise<Throwable> rc = new Promise<Throwable>();
        this.queue().execute(new Task(){

            public void run() {
                AmqpConnection.this.onTransportFailure(rc);
            }
        });
        return rc;
    }

    public void onTransportFailure(Callback<Throwable> cb) {
        this.transport.onTransportFailure(cb);
    }

    @Override
    public DispatchQueue queue() {
        return super.queue();
    }

    public void setProtocolTracer(ProtocolTracer protocolTracer) {
        this.transport.setProtocolTracer(protocolTracer);
    }

    public ProtocolTracer getProtocolTracer() {
        return this.transport.getProtocolTracer();
    }

    @Override
    public void close() {
        super.close();
        this.onRemoteClose((Callback)new Callback<EndpointError>(){

            @Override
            public void onSuccess(EndpointError value) {
                AmqpConnection.this.disconnect();
            }

            @Override
            public void onFailure(Throwable value) {
                AmqpConnection.this.disconnect();
            }
        });
    }
}

