/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.tcp;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HandshakeCompletedEvent;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpTransportInactiveDuringHandshakeTest {
    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportInactiveDuringHandshakeTest.class);
    public static final String KEYSTORE_TYPE = "jks";
    public static final String PASSWORD = "password";
    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
    private BrokerService brokerService;
    private DefaultTestAppender appender;
    CountDownLatch inactivityMonitorFired;
    CountDownLatch handShakeComplete;

    @Before
    public void before() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setUseJmx(false);
        this.inactivityMonitorFired = new CountDownLatch(1);
        this.handShakeComplete = new CountDownLatch(1);
        this.appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel().equals((Object)Level.WARN) && event.getRenderedMessage().contains("InactivityIOException")) {
                    TcpTransportInactiveDuringHandshakeTest.this.inactivityMonitorFired.countDown();
                }
            }
        };
        org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
        rootLogger.addAppender((Appender)this.appender);
    }

    @After
    public void after() throws Exception {
        org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
        rootLogger.removeAppender((Appender)this.appender);
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test
    public void testInactivityMonitorThreadCompletesWhenFiringDuringStart() throws Exception {
        this.brokerService.addConnector("mqtt+nio+ssl://localhost:0?transport.connectAttemptTimeout=1000&transport.closeAsync=false");
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        TransportConnector transportConnector = (TransportConnector)this.brokerService.getTransportConnectors().get(0);
        URI uri = transportConnector.getPublishableConnectURI();
        final CountDownLatch blockHandShakeCompletion = new CountDownLatch(1);
        TrustManager[] trustManagers = new TrustManager[]{new X509TrustManager(){

            @Override
            public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
                LOG.info("Check Server Trusted: " + s, new Throwable("HERE"));
                try {
                    blockHandShakeCompletion.await(20L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                LOG.info("Check Server Trusted done!");
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return new X509Certificate[0];
            }
        }};
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(null, trustManagers, new SecureRandom());
        final SSLSocket sslSocket = (SSLSocket)sslContext.getSocketFactory().createSocket("127.0.0.1", uri.getPort());
        sslSocket.addHandshakeCompletedListener(new HandshakeCompletedListener(){

            @Override
            public void handshakeCompleted(HandshakeCompletedEvent handshakeCompletedEvent) {
                TcpTransportInactiveDuringHandshakeTest.this.handShakeComplete.countDown();
            }
        });
        Executors.newCachedThreadPool().submit(new Runnable(){

            @Override
            public void run() {
                try {
                    sslSocket.startHandshake();
                    Assert.assertTrue((String)"Socket connected", (boolean)sslSocket.isConnected());
                }
                catch (IOException oops) {
                    oops.printStackTrace();
                }
            }
        });
        Assert.assertTrue((String)"inactivity fired", (boolean)this.inactivityMonitorFired.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"Found non blocked inactivity monitor thread - done its work", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Thread[] threads = new Thread[20];
                int activeCount = Thread.currentThread().getThreadGroup().enumerate(threads);
                for (int i = 0; i < activeCount; ++i) {
                    Thread thread = threads[i];
                    LOG.info("T[" + i + "]: " + thread);
                    if (!thread.getName().contains("InactivityMonitor") || !thread.getState().equals((Object)Thread.State.TIMED_WAITING)) continue;
                    LOG.info("Found inactivity monitor in timed-wait");
                    return true;
                }
                return false;
            }
        }));
        blockHandShakeCompletion.countDown();
        final OutputStream socketOutPutStream = sslSocket.getOutputStream();
        Assert.assertTrue((String)"Handshake complete", (boolean)this.handShakeComplete.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"socket error", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Expecting socket to error from remote close: " + sslSocket);
                try {
                    socketOutPutStream.write(2);
                    socketOutPutStream.flush();
                }
                catch (IOException expected) {
                    return true;
                }
                return false;
            }
        }));
        LOG.info("Socket at end: " + sslSocket);
        sslSocket.close();
    }

    static {
        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
    }
}

