/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.proton.hawtdispatch.impl;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
import org.apache.qpid.proton.hawtdispatch.api.Callback;
import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
import org.apache.qpid.proton.hawtdispatch.api.TransportState;
import org.apache.qpid.proton.hawtdispatch.impl.AmqpHeader;
import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
import org.apache.qpid.proton.hawtdispatch.impl.AmqpProtocolCodec;
import org.apache.qpid.proton.hawtdispatch.impl.Defer;
import org.apache.qpid.proton.hawtdispatch.impl.EndpointContext;
import org.apache.qpid.proton.hawtdispatch.impl.Support;
import org.apache.qpid.proton.hawtdispatch.impl.Watch;
import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.CustomDispatchSource;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.EventAggregator;
import org.fusesource.hawtdispatch.EventAggregators;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
import org.fusesource.hawtdispatch.transport.ProtocolCodec;
import org.fusesource.hawtdispatch.transport.SslTransport;
import org.fusesource.hawtdispatch.transport.TcpTransport;
import org.fusesource.hawtdispatch.transport.Transport;
import org.fusesource.hawtdispatch.transport.TransportListener;

public class AmqpTransport
extends WatchBase {
    private TransportState state = TransportState.CREATED;
    final DispatchQueue queue;
    final ConnectionImpl connection = new ConnectionImpl();
    Transport hawtdispatchTransport;
    TransportImpl protonTransport;
    Throwable failure;
    CustomDispatchSource<Defer, LinkedList<Defer>> defers;
    public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
    private Defer deferedPumpOut = new Defer(){

        public void run() {
            AmqpTransport.this.doPumpOut();
        }
    };
    public Sasl sasl;
    AmqpListener listener = new AmqpListener();

    private AmqpTransport(DispatchQueue queue) {
        this.queue = queue;
        this.defers = Dispatch.createSource((EventAggregator)EventAggregators.linkedList(), (DispatchQueue)this.queue);
        this.defers.setEventHandler(new Task(){

            public void run() {
                for (Defer defer : (LinkedList)AmqpTransport.this.defers.getData()) {
                    if (!$assertionsDisabled) {
                        defer.defered = true;
                        if (!true) {
                            throw new AssertionError();
                        }
                    }
                    defer.defered = false;
                    defer.run();
                }
            }
        });
        this.defers.resume();
    }

    public static AmqpTransport connect(AmqpConnectOptions options) {
        AmqpConnectOptions opts = options.clone();
        if (opts.getDispatchQueue() == null) {
            opts.setDispatchQueue(Dispatch.createQueue());
        }
        if (opts.getBlockingExecutor() == null) {
            opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
        }
        return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
    }

    private AmqpTransport connecting(AmqpConnectOptions options) {
        assert (this.state == TransportState.CREATED);
        try {
            this.state = TransportState.CONNECTING;
            if (options.getLocalContainerId() != null) {
                this.connection.setLocalContainerId(options.getLocalContainerId());
            }
            if (options.getRemoteContainerId() != null) {
                this.connection.setContainer(options.getRemoteContainerId());
            }
            this.connection.setHostname(options.getHost().getHost());
            Callback<Void> onConnect = new Callback<Void>(){

                @Override
                public void onSuccess(Void value) {
                    if (AmqpTransport.this.state == TransportState.CONNECTED) {
                        AmqpTransport.this.hawtdispatchTransport.setTransportListener((TransportListener)new AmqpTransportListener());
                        AmqpTransport.this.fireWatches();
                    }
                }

                @Override
                public void onFailure(Throwable value) {
                    if (AmqpTransport.this.state == TransportState.CONNECTED) {
                        AmqpTransport.this.failure = value;
                        AmqpTransport.this.disconnect();
                        AmqpTransport.this.fireWatches();
                    }
                }
            };
            if (options.getUser() != null) {
                onConnect = new SaslClientHandler(options, onConnect);
            }
            this.createTransport(options, onConnect);
        }
        catch (Throwable e) {
            this.failure = e;
        }
        this.fireWatches();
        return this;
    }

    public TransportState getState() {
        return this.state;
    }

    void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
        TcpTransport transport;
        if (options.getSslContext() != null) {
            SslTransport ssl = new SslTransport();
            ssl.setSSLContext(options.getSslContext());
            transport = ssl;
        } else {
            transport = new TcpTransport();
        }
        URI host = options.getHost();
        if (host.getPort() == -1) {
            host = options.getSslContext() != null ? new URI(host.getScheme() + "://" + host.getHost() + ":5672") : new URI(host.getScheme() + "://" + host.getHost() + ":5671");
        }
        transport.setBlockingExecutor(options.getBlockingExecutor());
        transport.setDispatchQueue(options.getDispatchQueue());
        transport.setMaxReadRate(options.getMaxReadRate());
        transport.setMaxWriteRate(options.getMaxWriteRate());
        transport.setReceiveBufferSize(options.getReceiveBufferSize());
        transport.setSendBufferSize(options.getSendBufferSize());
        transport.setTrafficClass(options.getTrafficClass());
        transport.setUseLocalHost(options.isUseLocalHost());
        transport.connecting(host, options.getLocalAddress());
        transport.setTransportListener((TransportListener)new DefaultTransportListener(){

            public void onTransportConnected() {
                if (AmqpTransport.this.state == TransportState.CONNECTING) {
                    AmqpTransport.this.state = TransportState.CONNECTED;
                    onConnect.onSuccess(null);
                    transport.resumeRead();
                }
            }

            public void onTransportFailure(IOException error) {
                if (AmqpTransport.this.state == TransportState.CONNECTING) {
                    onConnect.onFailure(error);
                }
            }
        });
        transport.connecting(host, options.getLocalAddress());
        this.bind((Transport)transport);
        transport.start(Dispatch.NOOP);
    }

    public static AmqpTransport accept(Transport transport) {
        return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
    }

    private AmqpTransport accepted(Transport transport) {
        this.state = TransportState.CONNECTED;
        this.bind(transport);
        this.hawtdispatchTransport.setTransportListener((TransportListener)new SaslServerListener());
        return this;
    }

    private void bind(Transport transport) {
        this.hawtdispatchTransport = transport;
        this.protonTransport = new TransportImpl();
        this.protonTransport.bind((Connection)this.connection);
        if (transport.getProtocolCodec() == null) {
            try {
                transport.setProtocolCodec((ProtocolCodec)new AmqpProtocolCodec());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void defer(Defer defer) {
        if (!defer.defered) {
            defer.defered = true;
            this.defers.merge((Object)defer);
        }
    }

    public void pumpOut() {
        this.assertExecuting();
        this.defer(this.deferedPumpOut);
    }

    private void doPumpOut() {
        switch (this.state) {
            case CONNECTING: 
            case CONNECTED: {
                break;
            }
            default: {
                return;
            }
        }
        int size = this.hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
        byte[] data = new byte[size];
        boolean done = false;
        int pumped = 0;
        while (!done && !this.hawtdispatchTransport.full()) {
            int count = this.protonTransport.output(data, 0, size);
            if (count > 0) {
                pumped += count;
                boolean accepted = this.hawtdispatchTransport.offer((Object)new Buffer(data, 0, count));
                assert (accepted) : "Should be accepted since the transport was not full";
                continue;
            }
            done = true;
        }
        if (pumped > 0 && !this.hawtdispatchTransport.full()) {
            this.listener.processRefill();
        }
    }

    public void fireListenerEvents() {
        this.fireWatches();
        if (this.sasl != null) {
            this.sasl = this.listener.processSaslEvent(this.sasl);
            if (this.sasl == null) {
                ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
            }
        }
        this.context((Endpoint)this.connection).fireListenerEvents(this.listener);
        for (Session session = this.connection.sessionHead(ALL_SET, ALL_SET); session != null; session = session.next(ALL_SET, ALL_SET)) {
            this.context((Endpoint)session).fireListenerEvents(this.listener);
        }
        for (Link link = this.connection.linkHead(ALL_SET, ALL_SET); link != null; link = link.next(ALL_SET, ALL_SET)) {
            this.context((Endpoint)link).fireListenerEvents(this.listener);
        }
        for (DeliveryImpl delivery = this.connection.getWorkHead(); delivery != null; delivery = delivery.getWorkNext()) {
            this.listener.processDelivery((Delivery)delivery);
        }
        this.listener.processRefill();
    }

    public ConnectionImpl connection() {
        return this.connection;
    }

    public AmqpListener getListener() {
        return this.listener;
    }

    public void setListener(AmqpListener listener) {
        this.listener = listener;
    }

    public EndpointContext context(Endpoint endpoint) {
        EndpointContext context = (EndpointContext)endpoint.getContext();
        if (context == null) {
            context = new EndpointContext(this, endpoint);
            endpoint.setContext((Object)context);
        }
        return context;
    }

    public void disconnect() {
        this.assertExecuting();
        if (this.state == TransportState.CONNECTING || this.state == TransportState.CONNECTED) {
            this.state = TransportState.DISCONNECTING;
            if (this.hawtdispatchTransport != null) {
                this.hawtdispatchTransport.stop(new Task(){

                    public void run() {
                        AmqpTransport.this.state = TransportState.DISCONNECTED;
                        AmqpTransport.this.hawtdispatchTransport = null;
                        AmqpTransport.this.protonTransport = null;
                        AmqpTransport.this.fireWatches();
                    }
                });
            }
        }
    }

    public DispatchQueue queue() {
        return this.queue;
    }

    public void assertExecuting() {
        this.queue().assertExecuting();
    }

    public void onTransportConnected(final Callback<Void> cb) {
        this.addWatch(new Watch(){

            @Override
            public boolean execute() {
                if (AmqpTransport.this.failure != null) {
                    cb.onFailure(AmqpTransport.this.failure);
                    return true;
                }
                if (AmqpTransport.this.state != TransportState.CONNECTING) {
                    cb.onSuccess(null);
                    return true;
                }
                return false;
            }
        });
    }

    public void onTransportDisconnected(final Callback<Void> cb) {
        this.addWatch(new Watch(){

            @Override
            public boolean execute() {
                if (AmqpTransport.this.state == TransportState.DISCONNECTED) {
                    cb.onSuccess(null);
                    return true;
                }
                return false;
            }
        });
    }

    public void onTransportFailure(final Callback<Throwable> cb) {
        this.addWatch(new Watch(){

            @Override
            public boolean execute() {
                if (AmqpTransport.this.failure != null) {
                    cb.onSuccess(AmqpTransport.this.failure);
                    return true;
                }
                return false;
            }
        });
    }

    public Throwable getFailure() {
        return this.failure;
    }

    public void setProtocolTracer(ProtocolTracer protocolTracer) {
        this.protonTransport.setProtocolTracer(protocolTracer);
    }

    public ProtocolTracer getProtocolTracer() {
        return this.protonTransport.getProtocolTracer();
    }

    class AmqpTransportListener
    extends DefaultTransportListener {
        AmqpTransportListener() {
        }

        public void onTransportConnected() {
            if (AmqpTransport.this.listener != null) {
                AmqpTransport.this.listener.processTransportConnected();
            }
        }

        public void onRefill() {
            if (AmqpTransport.this.listener != null) {
                AmqpTransport.this.listener.processRefill();
            }
        }

        public void onTransportCommand(Object command) {
            if (AmqpTransport.this.state != TransportState.CONNECTED) {
                return;
            }
            try {
                Buffer buffer = command.getClass() == AmqpHeader.class ? ((AmqpHeader)command).getBuffer() : (Buffer)command;
                AmqpTransport.this.protonTransport.input(buffer.data, buffer.offset, buffer.length);
                this.process();
                AmqpTransport.this.pumpOut();
            }
            catch (Exception e) {
                this.onFailure(e);
            }
        }

        void process() {
            AmqpTransport.this.fireListenerEvents();
        }

        public void onTransportFailure(IOException error) {
            if (AmqpTransport.this.state == TransportState.CONNECTED) {
                AmqpTransport.this.failure = error;
                if (AmqpTransport.this.listener != null) {
                    AmqpTransport.this.listener.processTransportFailure(error);
                    AmqpTransport.this.fireWatches();
                }
            }
        }

        void onFailure(Throwable error) {
            AmqpTransport.this.failure = error;
            if (AmqpTransport.this.listener != null) {
                AmqpTransport.this.listener.processFailure(error);
                AmqpTransport.this.fireWatches();
            }
        }
    }

    class SaslServerListener
    extends AmqpTransportListener {
        Sasl sasl;

        SaslServerListener() {
        }

        @Override
        public void onTransportCommand(Object command) {
            try {
                if (command.getClass() == AmqpHeader.class) {
                    AmqpHeader header = (AmqpHeader)command;
                    switch (header.getProtocolId()) {
                        case 3: {
                            if (AmqpTransport.this.listener != null) {
                                this.sasl = AmqpTransport.this.listener.processSaslConnect(AmqpTransport.this.protonTransport);
                                break;
                            }
                        }
                        default: {
                            AmqpTransportListener listener = new AmqpTransportListener();
                            AmqpTransport.this.hawtdispatchTransport.setTransportListener((TransportListener)listener);
                            listener.onTransportCommand(command);
                            return;
                        }
                    }
                    command = header.getBuffer();
                }
            }
            catch (Exception e) {
                this.onFailure(e);
            }
            super.onTransportCommand(command);
        }

        @Override
        void process() {
            if (this.sasl != null) {
                this.sasl = AmqpTransport.this.listener.processSaslEvent(this.sasl);
            }
            if (this.sasl == null) {
                ((AmqpProtocolCodec)AmqpTransport.this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
                AmqpTransport.this.hawtdispatchTransport.setTransportListener((TransportListener)new AmqpTransportListener());
            }
        }
    }

    class SaslClientHandler
    extends ChainedCallback<Void, Void> {
        private final AmqpConnectOptions options;

        public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
            super(next);
            this.options = options;
        }

        @Override
        public void onSuccess(Void value) {
            final Sasl s = AmqpTransport.this.protonTransport.sasl();
            s.client();
            AmqpTransport.this.pumpOut();
            AmqpTransport.this.hawtdispatchTransport.setTransportListener((TransportListener)new AmqpTransportListener(){
                Sasl sasl;
                boolean authSent;
                {
                    this.sasl = s;
                    this.authSent = false;
                }

                @Override
                void process() {
                    if (this.sasl != null) {
                        this.sasl = this.processSaslEvent(this.sasl);
                        if (this.sasl == null) {
                            ((AmqpProtocolCodec)AmqpTransport.this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
                        }
                    }
                }

                @Override
                public void onTransportFailure(IOException error) {
                    SaslClientHandler.this.next.onFailure(error);
                }

                @Override
                void onFailure(Throwable error) {
                    SaslClientHandler.this.next.onFailure(error);
                }

                private Sasl processSaslEvent(Sasl sasl) {
                    if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
                        SaslClientHandler.this.next.onSuccess(null);
                        return null;
                    }
                    HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
                    if (!this.authSent && !mechanisims.isEmpty()) {
                        if (!mechanisims.contains("PLAIN")) {
                            SaslClientHandler.this.next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
                            return null;
                        }
                        this.authSent = true;
                        DataByteArrayOutputStream os = new DataByteArrayOutputStream();
                        try {
                            os.writeByte(0);
                            os.write((Buffer)new UTF8Buffer(SaslClientHandler.this.options.getUser()));
                            os.writeByte(0);
                            if (SaslClientHandler.this.options.getPassword() != null) {
                                os.write((Buffer)new UTF8Buffer(SaslClientHandler.this.options.getPassword()));
                            }
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                        Buffer buffer = os.toBuffer();
                        sasl.setMechanisms(new String[]{"PLAIN"});
                        sasl.send(buffer.data, buffer.offset, buffer.length);
                    }
                    return sasl;
                }
            });
        }
    }
}

