package org.apache.activemq.transport.nio;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/nio/NIOSSThreadLeakTest.class */
public class NIOSSThreadLeakTest {
    private static final Logger LOG = LoggerFactory.getLogger(NIOSSThreadLeakTest.class);
    BrokerService broker;
    TransportConnector connector;
    public static final String KEYSTORE_TYPE = "jks";
    public static final String PASSWORD = "password";
    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";

    @Before
    public void setUp() throws Exception {
        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
        System.setProperty("javax.net.ssl.trustStorePassword", "password");
        System.setProperty("javax.net.ssl.trustStoreType", "jks");
        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
        System.setProperty("javax.net.ssl.keyStoreType", "jks");
        System.setProperty("javax.net.ssl.keyStorePassword", "password");
        System.setProperty("javax.net.ssl.keyStorePassword", "password");
        System.setProperty("org.apache.activemq.transport.nio.SelectorManager.maxChannelsPerWorker", "2");
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.connector = this.broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true");
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test(timeout = 360000)
    public void testThreadUsage() throws Exception {
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("nio+ssl://localhost:" + this.connector.getConnectUri().getPort());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        final CountDownLatch[] countDownLatchArr = {new CountDownLatch(10)};
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.transport.nio.NIOSSThreadLeakTest.1
            @Override // java.lang.Runnable
            public void run() {
                Connection connection = null;
                try {
                    try {
                        connection = activeMQConnectionFactory.createConnection();
                        connection.start();
                        connection.createSession(false, 1);
                        TimeUnit.MILLISECONDS.sleep(600L);
                        try {
                            connection.close();
                        } catch (Exception e) {
                        }
                        countDownLatchArr[0].countDown();
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        atomicInteger.incrementAndGet();
                        try {
                            connection.close();
                        } catch (Exception e3) {
                        }
                        countDownLatchArr[0].countDown();
                    }
                } catch (Throwable th) {
                    try {
                        connection.close();
                    } catch (Exception e4) {
                    }
                    countDownLatchArr[0].countDown();
                    throw th;
                }
            }
        };
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.submit(runnable);
        }
        countDownLatchArr[0].await(5L, TimeUnit.MINUTES);
        LOG.info("First batch...");
        TestCase.assertTrue(noMoreThanXSelectorThreads(10, countSelectorThreads()));
        for (int i2 = 0; i2 < 4; i2++) {
            LOG.info("batch..." + (i2 + 2));
            countDownLatchArr[0] = new CountDownLatch(10);
            for (int i3 = 0; i3 < 10; i3++) {
                newFixedThreadPool.submit(runnable);
            }
            countDownLatchArr[0].await(5L, TimeUnit.MINUTES);
        }
        TestCase.assertTrue(noMoreThanXSelectorThreads(10, countSelectorThreads()));
        LOG.info("errors " + atomicInteger.get());
        Assert.assertEquals(0L, atomicInteger.get());
        newFixedThreadPool.shutdownNow();
    }

    private boolean noMoreThanXSelectorThreads(int i, int i2) {
        boolean z = i2 <= i;
        if (!z) {
            LOG.error("too many threads: " + i2);
        }
        return z;
    }

    private int countSelectorThreads() {
        int i = 0;
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        for (long j : threadMXBean.getAllThreadIds()) {
            ThreadInfo threadInfo = threadMXBean.getThreadInfo(j);
            if (threadInfo != null && threadInfo.getThreadName().contains("Selector Worker:")) {
                LOG.info(threadMXBean.getThreadInfo(j).getThreadName());
                i++;
            }
        }
        return i;
    }
}
