package org.apache.activemq.broker.virtual;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.DestinationInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/virtual/AMQ7088Test.class */
public class AMQ7088Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ7088Test.class);
    BrokerService brokerService;
    ConnectionFactory connectionFactory;

    @Before
    public void createBroker() throws Exception {
        createBroker(true);
    }

    public void createBroker(boolean z) throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(z);
        this.brokerService.setAdvisorySupport(false);
        this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        this.connectionFactory = activeMQConnectionFactory;
    }

    @After
    public void stopBroker() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void testDeadlockOnAddRemoveDest() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(100);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.broker.virtual.AMQ7088Test.1
            @Override // java.lang.Runnable
            public void run() {
                do {
                    try {
                        int decrementAndGet = atomicInteger.decrementAndGet();
                        if (decrementAndGet >= 0) {
                            ActiveMQConnection createConnection = AMQ7088Test.this.connectionFactory.createConnection();
                            createConnection.start();
                            Session createSession = createConnection.createSession(false, 1);
                            ActiveMQQueue activeMQQueue = new ActiveMQQueue("Consumer." + decrementAndGet + ".VirtualTopic.TEST.*");
                            createSession.createConsumer(activeMQQueue).close();
                            ActiveMQConnection activeMQConnection = createConnection;
                            DestinationInfo destinationInfo = new DestinationInfo();
                            destinationInfo.setConnectionId(activeMQConnection.getConnectionInfo().getConnectionId());
                            destinationInfo.setDestination(activeMQQueue);
                            destinationInfo.setOperationType((byte) 1);
                            activeMQConnection.getTransport().request(destinationInfo);
                            createConnection.close();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return;
                    }
                } while (atomicInteger.get() > 0);
            }
        };
        for (int i = 0; i < 100; i++) {
            newFixedThreadPool.execute(runnable);
        }
        LOG.info("Letting it run to completion...");
        newFixedThreadPool.shutdown();
        Assert.assertTrue("all done", newFixedThreadPool.awaitTermination(5L, TimeUnit.MINUTES));
    }
}
