package org.apache.activemq;

import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/OptimizedAckTest.class */
public class OptimizedAckTest extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class);
    private ActiveMQConnection connection;

    protected void setUp() throws Exception {
        super.setUp();
        this.connection = createConnection();
        this.connection.setOptimizeAcknowledge(true);
        this.connection.setOptimizeAcknowledgeTimeOut(0L);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(10);
        this.connection.setPrefetchPolicy(activeMQPrefetchPolicy);
    }

    protected void tearDown() throws Exception {
        this.connection.close();
        super.tearDown();
    }

    public void testReceivedMessageStillInflight() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("test");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("Hello" + i));
        }
        final RegionBroker regionBroker = BrokerRegistry.getInstance().findFirst().getRegionBroker();
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.1
            public boolean isSatisified() throws Exception {
                OptimizedAckTest.LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
        for (int i2 = 0; i2 < 6; i2++) {
            assertNotNull(createConsumer.receive(4000L));
            assertEquals("all prefetch is still in flight: " + i2, 10L, regionBroker.getDestinationStatistics().getInflight().getCount());
        }
        for (int i3 = 6; i3 < 10; i3++) {
            assertNotNull(createConsumer.receive(4000L));
            assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.2
                public boolean isSatisified() throws Exception {
                    return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
                }
            }));
        }
    }

    public void testVerySlowReceivedMessageStillInflight() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("test");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("Hello" + i));
        }
        final RegionBroker regionBroker = BrokerRegistry.getInstance().findFirst().getRegionBroker();
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.3
            public boolean isSatisified() throws Exception {
                OptimizedAckTest.LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
        for (int i2 = 0; i2 < 6; i2++) {
            Thread.sleep(400L);
            assertNotNull(createConsumer.receive(4000L));
            assertEquals("all prefetch is still in flight: " + i2, 10L, regionBroker.getDestinationStatistics().getInflight().getCount());
        }
        for (int i3 = 6; i3 < 10; i3++) {
            Thread.sleep(400L);
            assertNotNull(createConsumer.receive(4000L));
            assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.4
                public boolean isSatisified() throws Exception {
                    return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
                }
            }));
        }
    }

    public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
        this.connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10L));
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("test");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("Hello" + i));
        }
        final RegionBroker regionBroker = BrokerRegistry.getInstance().findFirst().getRegionBroker();
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.5
            public boolean isSatisified() throws Exception {
                OptimizedAckTest.LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
        for (int i2 = 0; i2 < 6; i2++) {
            assertNotNull(createConsumer.receive(4000L));
            assertEquals("all prefetch is still in flight: " + i2, 10L, regionBroker.getDestinationStatistics().getInflight().getCount());
        }
        for (int i3 = 6; i3 < 10; i3++) {
            assertNotNull(createConsumer.receive(4000L));
            assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.6
                public boolean isSatisified() throws Exception {
                    return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
                }
            }));
        }
        assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.OptimizedAckTest.7
            public boolean isSatisified() throws Exception {
                OptimizedAckTest.LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
                return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
            }
        }));
    }
}
