package org.apache.activemq.bugs;

import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usecases.VerifyNetworkConsumersDisconnectTest;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;

@RunWith(BlockJUnit4ClassRunner.class)
/* loaded from: input_file:org/apache/activemq/bugs/AMQ2910Test.class */
public class AMQ2910Test extends JmsMultipleClientsTestSupport {
    final int maxConcurrency = 60;
    final int msgCount = 200;
    final Vector<Throwable> exceptions = new Vector<>();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleClientsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
        policyEntry.setCursorMemoryHighWaterMark(50);
        policyEntry.setMemoryLimit(512000L);
        policyEntry.setProducerFlowControl(false);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.getSystemUsage().getMemoryUsage().setLimit(1024000L);
        return brokerService;
    }

    @Test(timeout = 60000)
    public void testConcurrentSendToPendingCursor() throws Exception {
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri());
        activeMQConnectionFactory.setCloseTimeout(VerifyNetworkConsumersDisconnectTest.TIMEOUT);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 60; i++) {
            final ActiveMQQueue activeMQQueue = new ActiveMQQueue("Queue-" + i);
            newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ2910Test.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AMQ2910Test.this.sendMessages(activeMQConnectionFactory.createConnection(), activeMQQueue, 200);
                    } catch (Throwable th) {
                        AMQ2910Test.this.exceptions.add(th);
                    }
                }
            });
        }
        newCachedThreadPool.shutdown();
        Assert.assertTrue("send completed", newCachedThreadPool.awaitTermination(60L, TimeUnit.SECONDS));
        assertNoExceptions();
        ExecutorService newCachedThreadPool2 = Executors.newCachedThreadPool();
        for (int i2 = 0; i2 < 60; i2++) {
            final ActiveMQQueue activeMQQueue2 = new ActiveMQQueue("Queue-" + i2);
            newCachedThreadPool2.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ2910Test.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AMQ2910Test.this.startConsumers(activeMQConnectionFactory, activeMQQueue2);
                    } catch (Throwable th) {
                        AMQ2910Test.this.exceptions.add(th);
                    }
                }
            });
        }
        newCachedThreadPool2.shutdown();
        Assert.assertTrue("consumers completed", newCachedThreadPool2.awaitTermination(60L, TimeUnit.SECONDS));
        this.allMessagesList.setMaximumDuration(120000L);
        this.allMessagesList.waitForMessagesToArrive(12000);
        if (this.allMessagesList.getMessageCount() != 12000) {
            dumpAllThreads(getName());
        }
        this.allMessagesList.assertMessagesReceivedNoWait(12000);
        Assert.assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    private void assertNoExceptions() {
        if (!this.exceptions.isEmpty()) {
            Iterator<Throwable> it = this.exceptions.iterator();
            while (it.hasNext()) {
                it.next().printStackTrace();
            }
        }
        Assert.assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }
}
