/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class AMQ5212Test {
    BrokerService brokerService;
    @Parameterized.Parameter(value=0)
    public boolean concurrentStoreAndDispatchQ = true;

    @Parameterized.Parameters(name="concurrentStoreAndDispatch={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList({Boolean.TRUE}, {Boolean.FALSE});
    }

    @Before
    public void setUp() throws Exception {
        this.start(true);
    }

    public void start(boolean deleteAllMessages) throws Exception {
        this.brokerService = new BrokerService();
        if (deleteAllMessages) {
            this.brokerService.deleteAllMessages();
        }
        ((KahaDBPersistenceAdapter)this.brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(this.concurrentStoreAndDispatchQ);
        this.brokerService.addConnector("tcp://localhost:0");
        this.brokerService.setAdvisorySupport(false);
        this.brokerService.start();
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void verifyDuplicateSuppressionWithConsumer() throws Exception {
        this.doVerifyDuplicateSuppression(100, 100, true);
    }

    @Test
    public void verifyDuplicateSuppression() throws Exception {
        this.doVerifyDuplicateSuppression(100, 100, false);
    }

    public void doVerifyDuplicateSuppression(int numToSend, final int expectedTotalEnqueue, final boolean demand) throws Exception {
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setWatchTopicAdvisories(false);
        int concurrency = 40;
        final AtomicInteger workCount = new AtomicInteger(numToSend);
        ExecutorService executorService = Executors.newFixedThreadPool(40);
        for (int i = 0; i < 40; ++i) {
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        int i;
                        while ((i = workCount.getAndDecrement()) > 0) {
                            ActiveMQConnection activeMQConnection = (ActiveMQConnection)connectionFactory.createConnection();
                            activeMQConnection.start();
                            ActiveMQSession activeMQSession = (ActiveMQSession)activeMQConnection.createSession(false, 2);
                            ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-" + AMQ5212Test.class.getSimpleName());
                            ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)activeMQSession.createProducer((Destination)dest);
                            if (demand) {
                                activeMQSession.createConsumer((Destination)dest);
                            }
                            ActiveMQTextMessage message = new ActiveMQTextMessage();
                            message.setDestination((ActiveMQDestination)dest);
                            activeMQMessageProducer.send((Message)message, null);
                            activeMQConnection.syncSendPacket((Command)message);
                            activeMQConnection.close();
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        TimeUnit.SECONDS.sleep(1L);
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return (long)expectedTotalEnqueue == AMQ5212Test.this.brokerService.getAdminView().getTotalEnqueueCount();
            }
        });
        Assert.assertEquals((String)"total enqueue as expected", (long)expectedTotalEnqueue, (long)this.brokerService.getAdminView().getTotalEnqueueCount());
    }

    @Test
    public void verifyConsumptionOnDuplicate() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setWatchTopicAdvisories(false);
        ActiveMQConnection activeMQConnection = (ActiveMQConnection)connectionFactory.createConnection();
        activeMQConnection.start();
        ActiveMQSession activeMQSession = (ActiveMQSession)activeMQConnection.createSession(false, 1);
        ActiveMQQueue dest = new ActiveMQQueue("Q");
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)activeMQSession.createProducer((Destination)dest);
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setDestination((ActiveMQDestination)dest);
        activeMQMessageProducer.send((Message)message, null);
        activeMQConnection.syncSendPacket((Command)message);
        activeMQConnection.close();
        this.brokerService.stop();
        this.brokerService.start(false);
        connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setWatchTopicAdvisories(false);
        activeMQConnection = (ActiveMQConnection)connectionFactory.createConnection();
        activeMQConnection.start();
        activeMQSession = (ActiveMQSession)activeMQConnection.createSession(false, 1);
        MessageConsumer messageConsumer = activeMQSession.createConsumer((Destination)dest);
        Message received = messageConsumer.receive(4000L);
        Assert.assertNotNull((String)"Got message", (Object)received);
        Assert.assertEquals((String)"match", (Object)message.getJMSMessageID(), (Object)received.getJMSMessageID());
        activeMQConnection.close();
    }

    @Test
    public void verifyClientAckConsumptionOnDuplicate() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setWatchTopicAdvisories(false);
        ActiveMQConnection activeMQConnection = (ActiveMQConnection)connectionFactory.createConnection();
        activeMQConnection.start();
        ActiveMQSession activeMQSession = (ActiveMQSession)activeMQConnection.createSession(false, 2);
        ActiveMQQueue dest = new ActiveMQQueue("Q");
        MessageConsumer messageConsumer = activeMQSession.createConsumer((Destination)dest);
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)activeMQSession.createProducer((Destination)dest);
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setDestination((ActiveMQDestination)dest);
        activeMQMessageProducer.send((Message)message, null);
        activeMQConnection.syncSendPacket((Command)message);
        Message received = messageConsumer.receive(4000L);
        Assert.assertNotNull((String)"Got message", (Object)received);
        Assert.assertEquals((String)"match", (Object)message.getJMSMessageID(), (Object)received.getJMSMessageID());
        messageConsumer.close();
        messageConsumer = activeMQSession.createConsumer((Destination)dest);
        received = messageConsumer.receive(4000L);
        Assert.assertNotNull((String)"Got message", (Object)received);
        Assert.assertEquals((String)"match", (Object)message.getJMSMessageID(), (Object)received.getJMSMessageID());
        received.acknowledge();
        activeMQConnection.close();
    }
}

