package org.apache.activemq.transport.tcp;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.jetty.HttpVersions;
import org.springframework.asm.Opcodes;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.3.0.8-fuse.jar:org/apache/activemq/transport/tcp/TcpTransport.class */
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
    private static final Log LOG = LogFactory.getLog(TcpTransport.class);
    private static final ThreadPoolExecutor SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { // from class: org.apache.activemq.transport.tcp.TcpTransport.2
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
            thread.setPriority(10);
            thread.setDaemon(true);
            return thread;
        }
    });
    protected final URI remoteLocation;
    protected final URI localLocation;
    protected final WireFormat wireFormat;
    protected int connectionTimeout;
    protected int soTimeout;
    protected int socketBufferSize;
    protected int ioBufferSize;
    protected boolean closeAsync;
    protected Socket socket;
    protected DataOutputStream dataOut;
    protected DataInputStream dataIn;
    protected TcpBufferedOutputStream buffOut;
    protected boolean trace;
    protected String logWriterName;
    protected boolean dynamicManagement;
    protected boolean startLogging;
    protected int jmxPort;
    protected boolean useLocalHost;
    protected int minmumWireFormatVersion;
    protected SocketFactory socketFactory;
    protected final AtomicReference<CountDownLatch> stoppedLatch;
    private Map<String, Object> socketOptions;
    private Boolean keepAlive;
    private Boolean tcpNoDelay;
    private Thread runnerThread;

    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI uri, URI uri2) throws UnknownHostException, IOException {
        this.connectionTimeout = 30000;
        this.socketBufferSize = 65536;
        this.ioBufferSize = Opcodes.ACC_ANNOTATION;
        this.closeAsync = true;
        this.buffOut = null;
        this.trace = false;
        this.logWriterName = TransportLoggerFactory.defaultLogWriterName;
        this.dynamicManagement = false;
        this.startLogging = true;
        this.jmxPort = 1099;
        this.useLocalHost = true;
        this.stoppedLatch = new AtomicReference<>();
        this.wireFormat = wireFormat;
        this.socketFactory = socketFactory;
        try {
            this.socket = socketFactory.createSocket();
        } catch (SocketException e) {
            this.socket = null;
        }
        this.remoteLocation = uri;
        this.localLocation = uri2;
        setDaemon(false);
    }

    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
        this.connectionTimeout = 30000;
        this.socketBufferSize = 65536;
        this.ioBufferSize = Opcodes.ACC_ANNOTATION;
        this.closeAsync = true;
        this.buffOut = null;
        this.trace = false;
        this.logWriterName = TransportLoggerFactory.defaultLogWriterName;
        this.dynamicManagement = false;
        this.startLogging = true;
        this.jmxPort = 1099;
        this.useLocalHost = true;
        this.stoppedLatch = new AtomicReference<>();
        this.wireFormat = wireFormat;
        this.socket = socket;
        this.remoteLocation = null;
        this.localLocation = null;
        setDaemon(true);
    }

    @Override // org.apache.activemq.transport.Transport
    public void oneway(Object obj) throws IOException {
        checkStarted();
        this.wireFormat.marshal(obj, this.dataOut);
        this.dataOut.flush();
    }

    public String toString() {
        return "tcp://" + this.socket.getInetAddress() + Stomp.Headers.SEPERATOR + this.socket.getPort();
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread = Thread.currentThread();
        while (!isStopped()) {
            try {
                try {
                    doRun();
                } catch (IOException e) {
                    this.stoppedLatch.get().countDown();
                    onException(e);
                    this.stoppedLatch.get().countDown();
                    return;
                } catch (Throwable th) {
                    this.stoppedLatch.get().countDown();
                    IOException iOException = new IOException("Unexpected error occured");
                    iOException.initCause(th);
                    onException(iOException);
                    this.stoppedLatch.get().countDown();
                    return;
                }
            } catch (Throwable th2) {
                this.stoppedLatch.get().countDown();
                throw th2;
            }
        }
        this.stoppedLatch.get().countDown();
    }

    protected void doRun() throws IOException {
        try {
            doConsume(readCommand());
        } catch (SocketTimeoutException e) {
        } catch (InterruptedIOException e2) {
        }
    }

    protected Object readCommand() throws IOException {
        return this.wireFormat.unmarshal(this.dataIn);
    }

    public boolean isTrace() {
        return this.trace;
    }

    public void setTrace(boolean z) {
        this.trace = z;
    }

    public String getLogWriterName() {
        return this.logWriterName;
    }

    public void setLogWriterName(String str) {
        this.logWriterName = str;
    }

    public boolean isDynamicManagement() {
        return this.dynamicManagement;
    }

    public void setDynamicManagement(boolean z) {
        this.dynamicManagement = z;
    }

    public boolean isStartLogging() {
        return this.startLogging;
    }

    public void setStartLogging(boolean z) {
        this.startLogging = z;
    }

    public int getJmxPort() {
        return this.jmxPort;
    }

    public void setJmxPort(int i) {
        this.jmxPort = i;
    }

    public int getMinmumWireFormatVersion() {
        return this.minmumWireFormatVersion;
    }

    public void setMinmumWireFormatVersion(int i) {
        this.minmumWireFormatVersion = i;
    }

    public boolean isUseLocalHost() {
        return this.useLocalHost;
    }

    public void setUseLocalHost(boolean z) {
        this.useLocalHost = z;
    }

    public int getSocketBufferSize() {
        return this.socketBufferSize;
    }

    public void setSocketBufferSize(int i) {
        this.socketBufferSize = i;
    }

    public int getSoTimeout() {
        return this.soTimeout;
    }

    public void setSoTimeout(int i) {
        this.soTimeout = i;
    }

    public int getConnectionTimeout() {
        return this.connectionTimeout;
    }

    public void setConnectionTimeout(int i) {
        this.connectionTimeout = i;
    }

    public Boolean getKeepAlive() {
        return this.keepAlive;
    }

    public void setKeepAlive(Boolean bool) {
        this.keepAlive = bool;
    }

    public Boolean getTcpNoDelay() {
        return this.tcpNoDelay;
    }

    public void setTcpNoDelay(Boolean bool) {
        this.tcpNoDelay = bool;
    }

    public int getIoBufferSize() {
        return this.ioBufferSize;
    }

    public void setIoBufferSize(int i) {
        this.ioBufferSize = i;
    }

    public boolean isCloseAsync() {
        return this.closeAsync;
    }

    public void setCloseAsync(boolean z) {
        this.closeAsync = z;
    }

    protected String resolveHostName(String str) throws UnknownHostException {
        String hostName = InetAddress.getLocalHost().getHostName();
        return (hostName != null && isUseLocalHost() && hostName.equals(str)) ? BrokerService.DEFAULT_BROKER_NAME : str;
    }

    protected void initialiseSocket(Socket socket) throws SocketException {
        if (this.socketOptions != null) {
            IntrospectionSupport.setProperties(this.socket, this.socketOptions);
        }
        try {
            socket.setReceiveBufferSize(this.socketBufferSize);
            socket.setSendBufferSize(this.socketBufferSize);
        } catch (SocketException e) {
            LOG.warn("Cannot set socket buffer size = " + this.socketBufferSize);
            LOG.debug("Cannot set socket buffer size. Reason: " + e, e);
        }
        socket.setSoTimeout(this.soTimeout);
        if (this.keepAlive != null) {
            socket.setKeepAlive(this.keepAlive.booleanValue());
        }
        if (this.tcpNoDelay != null) {
            socket.setTcpNoDelay(this.tcpNoDelay.booleanValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.transport.TransportThreadSupport, org.apache.activemq.util.ServiceSupport
    public void doStart() throws Exception {
        connect();
        this.stoppedLatch.set(new CountDownLatch(1));
        super.doStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void connect() throws Exception {
        if (this.socket == null && this.socketFactory == null) {
            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
        }
        InetSocketAddress inetSocketAddress = null;
        InetSocketAddress inetSocketAddress2 = null;
        if (this.localLocation != null) {
            inetSocketAddress = new InetSocketAddress(InetAddress.getByName(this.localLocation.getHost()), this.localLocation.getPort());
        }
        if (this.remoteLocation != null) {
            inetSocketAddress2 = new InetSocketAddress(resolveHostName(this.remoteLocation.getHost()), this.remoteLocation.getPort());
        }
        if (this.socket != null) {
            if (inetSocketAddress != null) {
                this.socket.bind(inetSocketAddress);
            }
            if (inetSocketAddress2 != null) {
                if (this.connectionTimeout >= 0) {
                    this.socket.connect(inetSocketAddress2, this.connectionTimeout);
                } else {
                    this.socket.connect(inetSocketAddress2);
                }
            }
        } else if (inetSocketAddress != null) {
            this.socket = this.socketFactory.createSocket(inetSocketAddress2.getAddress(), inetSocketAddress2.getPort(), inetSocketAddress.getAddress(), inetSocketAddress.getPort());
        } else {
            this.socket = this.socketFactory.createSocket(inetSocketAddress2.getAddress(), inetSocketAddress2.getPort());
        }
        initialiseSocket(this.socket);
        initializeStreams();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.util.ServiceSupport
    public void doStop(ServiceStopper serviceStopper) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stopping transport " + this);
        }
        if (this.socket != null) {
            if (this.closeAsync) {
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                SOCKET_CLOSE.execute(new Runnable() { // from class: org.apache.activemq.transport.tcp.TcpTransport.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                TcpTransport.this.socket.close();
                                countDownLatch.countDown();
                            } catch (IOException e) {
                                TcpTransport.LOG.debug("Caught exception closing socket", e);
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    }
                });
                countDownLatch.await(1L, TimeUnit.SECONDS);
            } else {
                try {
                    this.socket.close();
                } catch (IOException e) {
                    LOG.debug("Caught exception closing socket", e);
                }
            }
        }
    }

    @Override // org.apache.activemq.util.ServiceSupport, org.apache.activemq.Service
    public void stop() throws Exception {
        super.stop();
        CountDownLatch countDownLatch = this.stoppedLatch.get();
        if (countDownLatch == null || Thread.currentThread() == this.runnerThread) {
            return;
        }
        countDownLatch.await(1L, TimeUnit.SECONDS);
    }

    protected void initializeStreams() throws Exception {
        this.dataIn = new DataInputStream(new TcpBufferedInputStream(this.socket.getInputStream(), this.ioBufferSize));
        this.buffOut = new TcpBufferedOutputStream(this.socket.getOutputStream(), this.ioBufferSize);
        this.dataOut = new DataOutputStream(this.buffOut);
    }

    protected void closeStreams() throws IOException {
        if (this.dataOut != null) {
            this.dataOut.close();
        }
        if (this.dataIn != null) {
            this.dataIn.close();
        }
    }

    public void setSocketOptions(Map<String, Object> map) {
        this.socketOptions = new HashMap(map);
    }

    @Override // org.apache.activemq.transport.Transport
    public String getRemoteAddress() {
        if (this.socket != null) {
            return HttpVersions.HTTP_0_9 + this.socket.getRemoteSocketAddress();
        }
        return null;
    }

    @Override // org.apache.activemq.transport.TransportSupport, org.apache.activemq.transport.Transport
    public <T> T narrow(Class<T> cls) {
        return cls == Socket.class ? cls.cast(this.socket) : cls == TcpBufferedOutputStream.class ? cls.cast(this.buffOut) : (T) super.narrow(cls);
    }
}
