package org.apache.activemq.bugs;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/bugs/AMQ5212Test.class */
public class AMQ5212Test {
    BrokerService brokerService;

    @Parameterized.Parameter(0)
    public boolean concurrentStoreAndDispatchQ = true;

    @Parameterized.Parameters(name = "concurrentStoreAndDispatch={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE});
    }

    @Before
    public void setUp() throws Exception {
        start(true);
    }

    public void start(boolean z) throws Exception {
        this.brokerService = new BrokerService();
        if (z) {
            this.brokerService.deleteAllMessages();
        }
        this.brokerService.getPersistenceAdapter().setConcurrentStoreAndDispatchQueues(this.concurrentStoreAndDispatchQ);
        this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.setAdvisorySupport(false);
        this.brokerService.getManagementContext().setCreateConnector(false);
        this.brokerService.start();
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void verifyDuplicateSuppressionWithConsumer() throws Exception {
        doVerifyDuplicateSuppression(100, 100, true);
    }

    @Test
    public void verifyDuplicateSuppression() throws Exception {
        doVerifyDuplicateSuppression(100, 100, false);
    }

    public void doVerifyDuplicateSuppression(int i, final int i2, final boolean z) throws Exception {
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        final AtomicInteger atomicInteger = new AtomicInteger(i);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(40);
        for (int i3 = 0; i3 < 40; i3++) {
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ5212Test.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            int andDecrement = atomicInteger.getAndDecrement();
                            if (andDecrement <= 0) {
                                return;
                            }
                            ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
                            createConnection.start();
                            ActiveMQSession createSession = createConnection.createSession(false, 2);
                            ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-" + andDecrement + "-" + AMQ5212Test.class.getSimpleName());
                            ActiveMQMessageProducer createProducer = createSession.createProducer(activeMQQueue);
                            if (z) {
                                createSession.createConsumer(activeMQQueue);
                            }
                            ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
                            activeMQTextMessage.setDestination(activeMQQueue);
                            createProducer.send(activeMQTextMessage, (AsyncCallback) null);
                            createConnection.syncSendPacket(activeMQTextMessage);
                            createConnection.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                            return;
                        }
                    }
                }
            });
        }
        TimeUnit.SECONDS.sleep(1L);
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ5212Test.2
            public boolean isSatisified() throws Exception {
                return ((long) i2) == AMQ5212Test.this.brokerService.getAdminView().getTotalEnqueueCount();
            }
        });
        Assert.assertEquals("total enqueue as expected", i2, this.brokerService.getAdminView().getTotalEnqueueCount());
    }

    @Test
    public void verifyConsumptionOnDuplicate() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        ActiveMQSession createSession = createConnection.createSession(false, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("Q");
        ActiveMQMessageProducer createProducer = createSession.createProducer(activeMQQueue);
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setDestination(activeMQQueue);
        createProducer.send(activeMQTextMessage, (AsyncCallback) null);
        createConnection.syncSendPacket(activeMQTextMessage);
        createConnection.close();
        this.brokerService.stop();
        this.brokerService.start(false);
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory2.setCopyMessageOnSend(false);
        activeMQConnectionFactory2.setWatchTopicAdvisories(false);
        ActiveMQConnection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        Message receive = createConnection2.createSession(false, 1).createConsumer(activeMQQueue).receive(4000L);
        Assert.assertNotNull("Got message", receive);
        Assert.assertEquals("match", activeMQTextMessage.getJMSMessageID(), receive.getJMSMessageID());
        createConnection2.close();
    }

    @Test
    public void verifyClientAckConsumptionOnDuplicate() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        ActiveMQSession createSession = createConnection.createSession(false, 2);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("Q");
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        ActiveMQMessageProducer createProducer = createSession.createProducer(activeMQQueue);
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setDestination(activeMQQueue);
        createProducer.send(activeMQTextMessage, (AsyncCallback) null);
        createConnection.syncSendPacket(activeMQTextMessage);
        Message receive = createConsumer.receive(4000L);
        Assert.assertNotNull("Got message", receive);
        Assert.assertEquals("match", activeMQTextMessage.getJMSMessageID(), receive.getJMSMessageID());
        createConsumer.close();
        Message receive2 = createSession.createConsumer(activeMQQueue).receive(4000L);
        Assert.assertNotNull("Got message", receive2);
        Assert.assertEquals("match", activeMQTextMessage.getJMSMessageID(), receive2.getJMSMessageID());
        receive2.acknowledge();
        createConnection.close();
    }
}
