/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;

public class AMQ2910Test
extends JmsMultipleClientsTestSupport {
    final int maxConcurrency = 60;
    final int msgCount = 200;
    final Vector<Throwable> exceptions = new Vector();

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        broker.addConnector("tcp://localhost:0");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new FilePendingQueueMessageStoragePolicy());
        defaultEntry.setCursorMemoryHighWaterMark(50);
        defaultEntry.setMemoryLimit(512000L);
        defaultEntry.setProducerFlowControl(false);
        policyMap.setDefaultEntry(defaultEntry);
        broker.setDestinationPolicy(policyMap);
        broker.getSystemUsage().getMemoryUsage().setLimit(1024000L);
        return broker;
    }

    public void testConcurrentSendToPendingCursor() throws Exception {
        ActiveMQQueue dest;
        int i;
        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri());
        factory.setCloseTimeout(30000);
        ExecutorService executor = Executors.newCachedThreadPool();
        for (i = 0; i < 60; ++i) {
            dest = new ActiveMQQueue("Queue-" + i);
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        AMQ2910Test.this.sendMessages(factory.createConnection(), (Destination)dest, 200);
                    }
                    catch (Throwable t) {
                        AMQ2910Test.this.exceptions.add(t);
                    }
                }
            });
        }
        executor.shutdown();
        AMQ2910Test.assertTrue((String)"send completed", (boolean)executor.awaitTermination(60L, TimeUnit.SECONDS));
        this.assertNoExceptions();
        executor = Executors.newCachedThreadPool();
        for (i = 0; i < 60; ++i) {
            dest = new ActiveMQQueue("Queue-" + i);
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        AMQ2910Test.this.startConsumers((ConnectionFactory)factory, (Destination)dest);
                    }
                    catch (Throwable t) {
                        AMQ2910Test.this.exceptions.add(t);
                    }
                }
            });
        }
        executor.shutdown();
        AMQ2910Test.assertTrue((String)"consumers completed", (boolean)executor.awaitTermination(60L, TimeUnit.SECONDS));
        this.allMessagesList.setMaximumDuration(120000L);
        int numExpected = 12000;
        this.allMessagesList.waitForMessagesToArrive(12000);
        if (this.allMessagesList.getMessageCount() != 12000) {
            AMQ2910Test.dumpAllThreads((String)this.getName());
        }
        this.allMessagesList.assertMessagesReceivedNoWait(12000);
        AMQ2910Test.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    private void assertNoExceptions() {
        if (!this.exceptions.isEmpty()) {
            for (Throwable t : this.exceptions) {
                t.printStackTrace();
            }
        }
        AMQ2910Test.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }
}

