package org.apache.activemq.transport.vm;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DurableConduitBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.class */
public class VmTransportNetworkBrokerTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class);
    private static final String VM_BROKER_URI = "vm://localhost?create=false";

    public void testNoThreadLeak() throws Exception {
        final int length = filterDaemonThreads(ThreadExplorer.listThreads()).length;
        LOG.debug(ThreadExplorer.show("threads at beginning"));
        BrokerService brokerService = new BrokerService();
        brokerService.setDedicatedTaskRunner(true);
        brokerService.setPersistent(false);
        brokerService.addConnector(NetworkedSyncTest.broker1URL);
        brokerService.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false").setDuplex(true);
        brokerService.start();
        Connection createConnection = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI)).createConnection("system", "manager");
        createConnection.start();
        TimeUnit.SECONDS.sleep(5L);
        int activeCount = Thread.activeCount();
        TimeUnit.SECONDS.sleep(20L);
        int activeCount2 = Thread.activeCount();
        assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + activeCount + " threadCountAfterSleep=" + activeCount2, activeCount2 < 2 * activeCount);
        createConnection.close();
        brokerService.stop();
        brokerService.waitUntilStopped();
        BrokerService brokerService2 = new BrokerService();
        brokerService2.setSchedulerSupport(true);
        brokerService2.setDedicatedTaskRunner(true);
        brokerService2.setPersistent(false);
        brokerService2.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
        brokerService2.start();
        Connection createConnection2 = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000").createConnection("system", "manager");
        createConnection2.start();
        createConnection2.close();
        brokerService2.stop();
        brokerService2.waitUntilStopped();
        final AtomicInteger atomicInteger = new AtomicInteger();
        boolean waitFor = Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VmTransportNetworkBrokerTest.1
            public boolean isSatisified() throws Exception {
                VmTransportNetworkBrokerTest.LOG.info(ThreadExplorer.show("active after stop"));
                atomicInteger.set(VmTransportNetworkBrokerTest.this.filterDaemonThreads(ThreadExplorer.listThreads()).length);
                return atomicInteger.get() <= length;
            }
        });
        LOG.info("originalThreadCount=" + length + " threadCountAfterStop=" + atomicInteger);
        assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". originalThreadCount=" + length + " threadCountAfterStop=" + atomicInteger.get(), waitFor);
    }

    public void testInvalidClientIdAndDurableSubs() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(false);
        brokerService.setDedicatedTaskRunner(true);
        brokerService.setPersistent(false);
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        brokerService.start();
        Connection createConnection = new ActiveMQConnectionFactory(((TransportConnector) brokerService.getTransportConnectors().get(0)).getPublishableConnectString()).createConnection("system", "manager");
        createConnection.setClientID("F1_forwarder_outbound");
        createConnection.start();
        BrokerService brokerService2 = new BrokerService();
        brokerService2.setBrokerName("forwarder");
        brokerService2.setPersistent(false);
        brokerService2.setUseJmx(false);
        brokerService2.start();
        Connection createConnection2 = new ActiveMQConnectionFactory("vm://forwarder").createConnection("system", "manager");
        createConnection2.setClientID("vm_local");
        createConnection2.start();
        Session createSession = createConnection2.createSession(false, 1);
        for (int i = 0; i < 5000; i++) {
            createSession.createDurableSubscriber(new ActiveMQTopic("T" + i), i);
        }
        createConnection2.close();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(DurableConduitBridge.class));
        Appender appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.transport.vm.VmTransportNetworkBrokerTest.2
        }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.transport.vm.VmTransportNetworkBrokerTest.3
            public void append(LogEvent logEvent) {
                if (Level.ERROR.equals(logEvent.getLevel())) {
                    atomicInteger.incrementAndGet();
                }
            }
        };
        appender.start();
        logger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.transport.vm.VmTransportNetworkBrokerTest.4
        });
        logger.addAppender(appender);
        try {
            NetworkConnector addNetworkConnector = brokerService2.addNetworkConnector("static:(" + ((TransportConnector) brokerService.getTransportConnectors().get(0)).getPublishableConnectString() + ")");
            addNetworkConnector.setName("F1");
            brokerService2.addNetworkConnector(addNetworkConnector);
            brokerService2.startAllConnectors();
            TimeUnit.SECONDS.sleep(1L);
            createConnection.close();
            brokerService2.stop();
            brokerService.stop();
            assertEquals("no errors", 0, atomicInteger.get());
            logger.removeAppender(appender);
        } catch (Throwable th) {
            logger.removeAppender(appender);
            throw th;
        }
    }

    public Thread[] filterDaemonThreads(Thread[] threadArr) throws Exception {
        ArrayList arrayList = new ArrayList(Arrays.asList(threadArr));
        for (int i = 0; i < arrayList.size(); i++) {
            Thread thread = (Thread) arrayList.get(i);
            LOG.debug("Inspecting thread " + thread.getName());
            if (thread.isDaemon() && !thread.getName().contains("ActiveMQ")) {
                LOG.debug("Removing deamon thread.");
                arrayList.remove(thread);
                Thread.sleep(100L);
            }
        }
        LOG.debug("Converting list back to Array");
        return (Thread[]) arrayList.toArray(new Thread[0]);
    }
}
