package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4083Test.class */
public class AMQ4083Test {
    private static BrokerService brokerService;
    private final int messageCount = 100;
    private String connectionUri;
    private String[] data;
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class);
    private static String BROKER_ADDRESS = JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT;
    private static String TEST_QUEUE = "testQueue";
    private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);

    @Before
    public void setUp() throws Exception {
        brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        this.connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
        brokerService.start();
        brokerService.waitUntilStarted();
        this.data = new String[100];
        for (int i = 0; i < 100; i++) {
            this.data[i] = "Text for message: " + i + " at " + new Date();
        }
    }

    @After
    public void tearDown() throws Exception {
        brokerService.stop();
        brokerService.waitUntilStopped();
    }

    @Test
    public void testExpiredMsgsBeforeNonExpired() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        createConnection.getPrefetchPolicy().setQueuePrefetch(DurableSubProcessWithRestartTest.CARGO_SIZE);
        Session createSession = createConnection.createSession(false, 2);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(queue);
        MessageConsumer createConsumer = createSession.createConsumer(queue);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage(), 2, 4, 4000L);
        }
        createProducer.send(createSession.createTextMessage());
        TimeUnit.SECONDS.sleep(5L);
        final QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean();
        Assert.assertEquals(101L, proxyToQueueViewMBean.getInFlightCount());
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4083Test.1
            public void onMessage(Message message) {
                try {
                    message.acknowledge();
                } catch (JMSException e) {
                }
            }
        });
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertEquals(0L, proxyToQueueViewMBean.getInFlightCount());
        for (int i2 = 0; i2 < 200; i2++) {
            createProducer.send(createSession.createTextMessage());
        }
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        LOG.info("Dequeued Count: {}", Long.valueOf(proxyToQueueViewMBean.getDequeueCount()));
        LOG.info("Dispatch Count: {}", Long.valueOf(proxyToQueueViewMBean.getDispatchCount()));
        LOG.info("Enqueue Count: {}", Long.valueOf(proxyToQueueViewMBean.getEnqueueCount()));
        LOG.info("Expired Count: {}", Long.valueOf(proxyToQueueViewMBean.getExpiredCount()));
        LOG.info("InFlight Count: {}", Long.valueOf(proxyToQueueViewMBean.getInFlightCount()));
    }

    @Test
    public void testExpiredMsgsBeforeNonExpiredWithTX() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        createConnection.getPrefetchPolicy().setQueuePrefetch(DurableSubProcessWithRestartTest.CARGO_SIZE);
        final Session createSession = createConnection.createSession(true, 0);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(queue);
        MessageConsumer createConsumer = createSession.createConsumer(queue);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage(), 2, 4, 4000L);
        }
        createProducer.send(createSession.createTextMessage());
        createSession.commit();
        TimeUnit.SECONDS.sleep(5L);
        final QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean();
        Assert.assertEquals(101L, proxyToQueueViewMBean.getInFlightCount());
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4083Test.3
            public void onMessage(Message message) {
                try {
                    createSession.commit();
                } catch (JMSException e) {
                }
            }
        });
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertEquals(0L, proxyToQueueViewMBean.getInFlightCount());
        for (int i2 = 0; i2 < 200; i2++) {
            createProducer.send(createSession.createTextMessage());
        }
        createSession.commit();
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.4
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        LOG.info("Dequeued Count: {}", Long.valueOf(proxyToQueueViewMBean.getDequeueCount()));
        LOG.info("Dispatch Count: {}", Long.valueOf(proxyToQueueViewMBean.getDispatchCount()));
        LOG.info("Enqueue Count: {}", Long.valueOf(proxyToQueueViewMBean.getEnqueueCount()));
        LOG.info("Expired Count: {}", Long.valueOf(proxyToQueueViewMBean.getExpiredCount()));
        LOG.info("InFlight Count: {}", Long.valueOf(proxyToQueueViewMBean.getInFlightCount()));
    }

    @Test
    public void testExpiredMsgsInterleavedWithNonExpired() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        createConnection.getPrefetchPolicy().setQueuePrefetch(DurableSubProcessWithRestartTest.CARGO_SIZE);
        Session createSession = createConnection.createSession(false, 2);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(queue);
        MessageConsumer createConsumer = createSession.createConsumer(queue);
        for (int i = 0; i < 200; i++) {
            if (i % 2 == 0) {
                createProducer.send(createSession.createTextMessage(), 2, 4, 4000L);
            } else {
                createProducer.send(createSession.createTextMessage());
            }
        }
        TimeUnit.SECONDS.sleep(5L);
        final QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean();
        Assert.assertEquals(200L, proxyToQueueViewMBean.getInFlightCount());
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4083Test.5
            public void onMessage(Message message) {
                try {
                    AMQ4083Test.LOG.debug("Acking message: {}", message);
                    message.acknowledge();
                } catch (JMSException e) {
                }
            }
        });
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.6
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        for (int i2 = 0; i2 < 200; i2++) {
            createProducer.send(createSession.createTextMessage());
        }
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.7
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        LOG.info("Dequeued Count: {}", Long.valueOf(proxyToQueueViewMBean.getDequeueCount()));
        LOG.info("Dispatch Count: {}", Long.valueOf(proxyToQueueViewMBean.getDispatchCount()));
        LOG.info("Enqueue Count: {}", Long.valueOf(proxyToQueueViewMBean.getEnqueueCount()));
        LOG.info("Expired Count: {}", Long.valueOf(proxyToQueueViewMBean.getExpiredCount()));
        LOG.info("InFlight Count: {}", Long.valueOf(proxyToQueueViewMBean.getInFlightCount()));
    }

    @Test
    public void testExpiredMsgsInterleavedWithNonExpiredCumulativeAck() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        createConnection.getPrefetchPolicy().setQueuePrefetch(DurableSubProcessWithRestartTest.CARGO_SIZE);
        Session createSession = createConnection.createSession(false, 2);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(queue);
        MessageConsumer createConsumer = createSession.createConsumer(queue);
        for (int i = 0; i < 200; i++) {
            if (i % 2 == 0) {
                createProducer.send(createSession.createTextMessage(), 2, 4, 4000L);
            } else {
                createProducer.send(createSession.createTextMessage());
            }
        }
        TimeUnit.SECONDS.sleep(5L);
        final QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean();
        Assert.assertEquals(200L, proxyToQueueViewMBean.getInFlightCount());
        final AtomicInteger atomicInteger = new AtomicInteger();
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4083Test.8
            public void onMessage(Message message) {
                try {
                    if (atomicInteger.incrementAndGet() == 100) {
                        AMQ4083Test.LOG.debug("Acking message: {}", message);
                        message.acknowledge();
                    }
                } catch (JMSException e) {
                }
            }
        });
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.9
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4083Test.10
            public void onMessage(Message message) {
                try {
                    AMQ4083Test.LOG.debug("Acking message: {}", message);
                    message.acknowledge();
                } catch (JMSException e) {
                }
            }
        });
        for (int i2 = 0; i2 < 200; i2++) {
            createProducer.send(createSession.createTextMessage());
        }
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.11
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        LOG.info("Dequeued Count: {}", Long.valueOf(proxyToQueueViewMBean.getDequeueCount()));
        LOG.info("Dispatch Count: {}", Long.valueOf(proxyToQueueViewMBean.getDispatchCount()));
        LOG.info("Enqueue Count: {}", Long.valueOf(proxyToQueueViewMBean.getEnqueueCount()));
        LOG.info("Expired Count: {}", Long.valueOf(proxyToQueueViewMBean.getExpiredCount()));
        LOG.info("InFlight Count: {}", Long.valueOf(proxyToQueueViewMBean.getInFlightCount()));
    }

    @Test
    public void testExpiredBatchBetweenNonExpiredMessages() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        createConnection.getPrefetchPolicy().setQueuePrefetch(DurableSubProcessWithRestartTest.CARGO_SIZE);
        Session createSession = createConnection.createSession(false, 2);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(queue);
        MessageConsumer createConsumer = createSession.createConsumer(queue);
        createProducer.send(createSession.createTextMessage());
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage(), 2, 4, 4000L);
        }
        createProducer.send(createSession.createTextMessage());
        TimeUnit.SECONDS.sleep(5L);
        final QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean();
        Assert.assertEquals(102L, proxyToQueueViewMBean.getInFlightCount());
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4083Test.12
            public void onMessage(Message message) {
                try {
                    message.acknowledge();
                } catch (JMSException e) {
                }
            }
        });
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.13
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        for (int i2 = 0; i2 < 200; i2++) {
            createProducer.send(createSession.createTextMessage());
        }
        Assert.assertTrue("Inflight count should reach zero, currently: " + proxyToQueueViewMBean.getInFlightCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4083Test.14
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return proxyToQueueViewMBean.getInFlightCount() == 0;
            }
        }));
        LOG.info("Dequeued Count: {}", Long.valueOf(proxyToQueueViewMBean.getDequeueCount()));
        LOG.info("Dispatch Count: {}", Long.valueOf(proxyToQueueViewMBean.getDispatchCount()));
        LOG.info("Enqueue Count: {}", Long.valueOf(proxyToQueueViewMBean.getEnqueueCount()));
        LOG.info("Expired Count: {}", Long.valueOf(proxyToQueueViewMBean.getExpiredCount()));
        LOG.info("InFlight Count: {}", Long.valueOf(proxyToQueueViewMBean.getInFlightCount()));
    }

    @Test
    public void testConsumeExpiredQueueAndDlq() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(queue);
        MessageProducer createProducer2 = createSession.createProducer(queue);
        createProducer2.setTimeToLive(500L);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("ActiveMQ.DLQ"));
        createConnection.start();
        ActiveMQConnection createConnection2 = activeMQConnectionFactory.createConnection();
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(10);
        createConnection2.setPrefetchPolicy(activeMQPrefetchPolicy);
        MessageConsumer createConsumer2 = createConnection2.createSession(false, 2).createConsumer(queue);
        createConnection2.start();
        String str = new String(new byte[20480]);
        for (int i = 0; i < this.data.length; i++) {
            createProducer2.send(queue, createSession.createTextMessage(str));
        }
        for (int i2 = 0; i2 < this.data.length; i2++) {
            createProducer.send(queue, createSession.createTextMessage(str));
        }
        ArrayList arrayList = new ArrayList();
        while (true) {
            Message receive = createConsumer2.receive(1000L);
            if (receive == null) {
                break;
            }
            arrayList.add(receive);
            if (arrayList.size() == 1) {
                TimeUnit.SECONDS.sleep(1L);
            }
            receive.acknowledge();
        }
        Assert.assertEquals("got messages", 101L, arrayList.size());
        ArrayList arrayList2 = new ArrayList();
        while (true) {
            Message receive2 = createConsumer.receive(1000L);
            if (receive2 == null) {
                Assert.assertEquals("got dlq messages", this.data.length - 1, arrayList2.size());
                QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean();
                LOG.info("Dequeued Count: {}", Long.valueOf(proxyToQueueViewMBean.getDequeueCount()));
                LOG.info("Dispatch Count: {}", Long.valueOf(proxyToQueueViewMBean.getDispatchCount()));
                LOG.info("Enqueue Count: {}", Long.valueOf(proxyToQueueViewMBean.getEnqueueCount()));
                LOG.info("Expired Count: {}", Long.valueOf(proxyToQueueViewMBean.getExpiredCount()));
                LOG.info("InFlight Count: {}", Long.valueOf(proxyToQueueViewMBean.getInFlightCount()));
                return;
            }
            arrayList2.add(receive2);
        }
    }

    private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
        return (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:Type=Queue,Destination=" + queue.getQueueName() + ",BrokerName=localhost"), QueueViewMBean.class, true);
    }
}
