/*
 * Decompiled with CFR 0.152.
 */
package org.mobicents.protocols.sctp;

import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import javolution.util.FastList;
import javolution.xml.XMLFormat;
import javolution.xml.stream.XMLStreamException;
import org.apache.log4j.Logger;
import org.mobicents.protocols.api.Association;
import org.mobicents.protocols.api.AssociationListener;
import org.mobicents.protocols.api.PayloadData;
import org.mobicents.protocols.sctp.AssociationHandler;
import org.mobicents.protocols.sctp.AssociationType;
import org.mobicents.protocols.sctp.ChangeRequest;
import org.mobicents.protocols.sctp.ManagementImpl;
import org.mobicents.protocols.sctp.Worker;

public class AssociationImpl
implements Association {
    private static final Logger logger = Logger.getLogger((String)AssociationImpl.class.getName());
    private static final String NAME = "name";
    private static final String SERVER_NAME = "serverName";
    private static final String HOST_ADDRESS = "hostAddress";
    private static final String HOST_PORT = "hostPort";
    private static final String PEER_ADDRESS = "peerAddress";
    private static final String PEER_PORT = "peerPort";
    private static final String ASSOCIATION_TYPE = "assoctype";
    private String hostAddress;
    private int hostPort;
    private String peerAddress;
    private int peerPort;
    private String serverName;
    private String name;
    private SctpChannel socketChannel;
    private AssociationType type;
    private AssociationListener associationListener = null;
    private final AssociationHandler associationHandler = new AssociationHandler();
    private volatile boolean started = false;
    private static final int MAX_SLS = 32;
    private int[] slsTable = new int[32];
    private int[] workerThreadTable = null;
    private ConcurrentLinkedQueue<PayloadData> txQueue = new ConcurrentLinkedQueue();
    private ManagementImpl management;
    private ByteBuffer rxBuffer = ByteBuffer.allocateDirect(8192);
    private ByteBuffer txBuffer = ByteBuffer.allocateDirect(8192);
    private volatile MessageInfo msgInfo;
    private volatile int ioErrors = 0;
    protected static final XMLFormat<AssociationImpl> ASSOCIATION_XML = new XMLFormat<AssociationImpl>(AssociationImpl.class){

        public void read(XMLFormat.InputElement xml, AssociationImpl association) throws XMLStreamException {
            association.name = xml.getAttribute(AssociationImpl.NAME, "");
            association.type = AssociationType.getAssociationType(xml.getAttribute(AssociationImpl.ASSOCIATION_TYPE, ""));
            association.hostAddress = xml.getAttribute(AssociationImpl.HOST_ADDRESS, "");
            association.hostPort = xml.getAttribute(AssociationImpl.HOST_PORT, 0);
            association.peerAddress = xml.getAttribute(AssociationImpl.PEER_ADDRESS, "");
            association.peerPort = xml.getAttribute(AssociationImpl.PEER_PORT, 0);
            association.serverName = xml.getAttribute(AssociationImpl.SERVER_NAME, "");
        }

        public void write(AssociationImpl association, XMLFormat.OutputElement xml) throws XMLStreamException {
            xml.setAttribute(AssociationImpl.NAME, association.name);
            xml.setAttribute(AssociationImpl.ASSOCIATION_TYPE, association.type.getType());
            xml.setAttribute(AssociationImpl.HOST_ADDRESS, association.hostAddress);
            xml.setAttribute(AssociationImpl.HOST_PORT, association.hostPort);
            xml.setAttribute(AssociationImpl.PEER_ADDRESS, association.peerAddress);
            xml.setAttribute(AssociationImpl.PEER_PORT, association.peerPort);
            xml.setAttribute(AssociationImpl.SERVER_NAME, association.serverName);
        }
    };

    public AssociationImpl() {
        this.txBuffer.clear();
        this.txBuffer.rewind();
        this.txBuffer.flip();
        this.rxBuffer.clear();
        this.rxBuffer.rewind();
        this.rxBuffer.flip();
    }

    public AssociationImpl(String hostAddress, int hostport, String peerAddress, int peerPort, String assocName) throws IOException {
        this();
        this.hostAddress = hostAddress;
        this.hostPort = hostport;
        this.peerAddress = peerAddress;
        this.peerPort = peerPort;
        this.name = assocName;
        this.type = AssociationType.CLIENT;
    }

    public AssociationImpl(String peerAddress, int peerPort, String serverName, String assocName) {
        this();
        this.peerAddress = peerAddress;
        this.peerPort = peerPort;
        this.serverName = serverName;
        this.name = assocName;
        this.type = AssociationType.SERVER;
    }

    protected void start() throws Exception {
        if (this.associationListener == null) {
            throw new NullPointerException(String.format("AssociationListener is null for Associatoion=%s", this.name));
        }
        if (this.type == AssociationType.CLIENT) {
            this.scheduleConnect();
        }
        this.started = true;
        if (logger.isInfoEnabled()) {
            logger.info((Object)String.format("Started Association=%s", this));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stop() throws Exception {
        this.started = false;
        if (this.socketChannel != null && this.socketChannel.isOpen()) {
            FastList<ChangeRequest> pendingChanges;
            FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
            synchronized (fastList) {
                pendingChanges.add((Object)new ChangeRequest(this.socketChannel, this, 4, -1));
            }
            this.management.getSocketSelector().wakeup();
        }
    }

    public AssociationListener getAssociationListener() {
        return this.associationListener;
    }

    public void setAssociationListener(AssociationListener associationListener) {
        this.associationListener = associationListener;
    }

    public String getName() {
        return this.name;
    }

    public AssociationType getType() {
        return this.type;
    }

    public boolean isStarted() {
        return this.started;
    }

    public String getHostAddress() {
        return this.hostAddress;
    }

    public int getHostPort() {
        return this.hostPort;
    }

    public String getPeerAddress() {
        return this.peerAddress;
    }

    public int getPeerPort() {
        return this.peerPort;
    }

    public String getServerName() {
        return this.serverName;
    }

    protected void setSocketChannel(SctpChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    protected void setManagement(ManagementImpl management) {
        this.management = management;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(PayloadData payloadData) throws Exception {
        FastList<ChangeRequest> pendingChanges;
        if (!this.started || this.socketChannel == null || !this.socketChannel.isOpen() || this.socketChannel.association() == null) {
            throw new Exception(String.format("Underlying sctp channel doesn't have association for Association=%s", this.name));
        }
        FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
        synchronized (fastList) {
            pendingChanges.add((Object)new ChangeRequest(this.socketChannel, this, 2, 4));
            this.txQueue.add(payloadData);
        }
        this.management.getSocketSelector().wakeup();
    }

    protected void read() {
        block14: {
            try {
                this.rxBuffer.clear();
                MessageInfo messageInfo = this.socketChannel.receive(this.rxBuffer, this, this.associationHandler);
                if (messageInfo == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)String.format(" messageInfo is null for Association=%s", this.name));
                    }
                    return;
                }
                int len = messageInfo.bytes();
                if (len == -1) {
                    logger.error((Object)String.format("Rx -1 while trying to read from underlying socket for Association=%s ", this.name));
                    this.close();
                    this.scheduleConnect();
                    return;
                }
                this.rxBuffer.flip();
                byte[] data = new byte[len];
                this.rxBuffer.get(data);
                this.rxBuffer.clear();
                PayloadData payload = new PayloadData(len, data, messageInfo.isComplete(), messageInfo.isUnordered(), messageInfo.payloadProtocolID(), messageInfo.streamNumber());
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)String.format("Rx : %s", payload));
                }
                if (this.management.isSingleThread()) {
                    try {
                        this.associationListener.onPayload((Association)this, payload);
                    }
                    catch (Exception e) {
                        logger.error((Object)String.format("Error while calling Listener for Association=%s.Payload=%s", this.name, payload), (Throwable)e);
                    }
                } else {
                    Worker worker = new Worker(this, this.associationListener, payload);
                    System.out.println("payload.getStreamNumber()=" + payload.getStreamNumber() + " this.workerThreadTable[payload.getStreamNumber()]" + this.workerThreadTable[payload.getStreamNumber()]);
                    ExecutorService executorService = this.management.getExecutorService(this.workerThreadTable[payload.getStreamNumber()]);
                    try {
                        executorService.execute(worker);
                    }
                    catch (RejectedExecutionException e) {
                        logger.error((Object)String.format("Rejected %s as Executors is shutdown", payload), (Throwable)e);
                    }
                    catch (NullPointerException e) {
                        logger.error((Object)String.format("NullPointerException while submitting %s", payload), (Throwable)e);
                    }
                    catch (Exception e) {
                        logger.error((Object)String.format("Exception while submitting %s", payload), (Throwable)e);
                    }
                }
            }
            catch (IOException e) {
                ++this.ioErrors;
                logger.error((Object)String.format("IOException while trying to read from underlying socket for Association=%s IOError count=%d", this.name, this.ioErrors), (Throwable)e);
                if (this.ioErrors <= this.management.getMaxIOErrors()) break block14;
                this.close();
                this.scheduleConnect();
            }
        }
    }

    protected void write(SelectionKey key) {
        block7: {
            try {
                if (this.txBuffer.hasRemaining()) {
                    this.socketChannel.send(this.txBuffer, this.msgInfo);
                }
                if (!this.txQueue.isEmpty() && !this.txBuffer.hasRemaining()) {
                    while (!this.txQueue.isEmpty()) {
                        this.txBuffer.clear();
                        PayloadData payloadData = this.txQueue.poll();
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)String.format("Tx : %s", payloadData));
                        }
                        this.txBuffer.put(payloadData.getData());
                        int seqControl = payloadData.getStreamNumber();
                        this.msgInfo = MessageInfo.createOutgoing(null, this.slsTable[seqControl &= 0x1F]);
                        this.msgInfo.payloadProtocolID(payloadData.getPayloadProtocolId());
                        this.msgInfo.complete(payloadData.isComplete());
                        this.msgInfo.unordered(payloadData.isUnordered());
                        this.txBuffer.flip();
                        int sent = this.socketChannel.send(this.txBuffer, this.msgInfo);
                        if (!this.txBuffer.hasRemaining()) continue;
                        return;
                    }
                }
                if (this.txQueue.isEmpty()) {
                    key.interestOps(1);
                }
            }
            catch (IOException e) {
                ++this.ioErrors;
                logger.error((Object)String.format("IOException while trying to write to underlying socket for Association=%s IOError count=%d", this.name, this.ioErrors), (Throwable)e);
                if (this.ioErrors <= this.management.getMaxIOErrors()) break block7;
                this.close();
                this.scheduleConnect();
            }
        }
    }

    protected void close() {
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            }
            catch (Exception e) {
                logger.error((Object)String.format("Exception while closing the SctpScoket for Association=%s", this.name), (Throwable)e);
            }
        }
        try {
            this.associationListener.onCommunicationShutdown((Association)this);
        }
        catch (Exception e) {
            logger.error((Object)String.format("Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", this.name), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void scheduleConnect() {
        FastList<ChangeRequest> pendingChanges;
        FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
        synchronized (fastList) {
            pendingChanges.add((Object)new ChangeRequest(this, 3, System.currentTimeMillis() + (long)this.management.getConnectDelay()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initiateConnection() throws IOException {
        FastList<ChangeRequest> pendingChanges;
        if (!this.started) {
            return;
        }
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            }
            catch (Exception e) {
                logger.error((Object)String.format("Exception while trying to close existing sctp socket and initiate new socket for Association=%s", this.name), (Throwable)e);
            }
        }
        this.socketChannel = SctpChannel.open();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.bind(new InetSocketAddress(this.hostAddress, this.hostPort));
        this.socketChannel.connect(new InetSocketAddress(this.peerAddress, this.peerPort), 32, 32);
        this.ioErrors = 0;
        FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
        synchronized (fastList) {
            pendingChanges.add((Object)new ChangeRequest(this.socketChannel, this, 1, 8));
        }
        this.management.getSocketSelector().wakeup();
    }

    protected void createSLSTable(int minimumBoundStream) {
        int stream = 1;
        for (int i = 0; i < 32; ++i) {
            if (stream > minimumBoundStream) {
                stream = 1;
            }
            this.slsTable[i] = stream++;
        }
    }

    protected void createworkerThreadTable(int maximumBooundStream) {
        this.workerThreadTable = new int[maximumBooundStream];
        this.management.populateWorkerThread(this.workerThreadTable);
    }

    public String toString() {
        return "Association [hostAddress=" + this.hostAddress + ", hostPort=" + this.hostPort + ", peerAddress=" + this.peerAddress + ", peerPort=" + this.peerPort + ", serverName=" + this.serverName + ", assocName=" + this.name + ", associationType=" + (Object)((Object)this.type) + ", started=" + this.started + "]";
    }
}

