package org.apache.activemq.statistics;

import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

/* loaded from: input_file:org/apache/activemq/statistics/AbstractInflightMessageSizeTest.class */
public abstract class AbstractInflightMessageSizeTest {
    protected BrokerService brokerService;
    protected Connection connection;
    protected String brokerUrlString;
    protected Session session;
    protected Destination dest;
    protected org.apache.activemq.broker.region.Destination amqDestination;
    protected MessageConsumer consumer;
    protected final int ackType;
    protected final boolean optimizeAcknowledge;
    protected int prefetch = 100;
    protected final String destName = "testDest";

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{0, true}, new Object[]{1, true}, new Object[]{4, true}, new Object[]{2, true}, new Object[]{0, false}, new Object[]{1, false}, new Object[]{4, false}, new Object[]{2, false});
    }

    public AbstractInflightMessageSizeTest(int i, boolean z) {
        this.ackType = i;
        this.optimizeAcknowledge = z;
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        TransportConnector addConnector = this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.start();
        this.brokerUrlString = addConnector.getPublishableConnectString() + (this.optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "");
        this.connection = createConnectionFactory().createConnection();
        this.connection.setClientID("client1");
        this.connection.start();
        this.session = this.connection.createSession(this.ackType == 0, this.ackType);
        this.dest = mo771getDestination();
        this.consumer = getMessageConsumer();
        this.amqDestination = TestSupport.getDestination(this.brokerService, getActiveMQDestination());
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokerUrlString);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setTopicPrefetch(this.prefetch);
        activeMQPrefetchPolicy.setQueuePrefetch(this.prefetch);
        activeMQPrefetchPolicy.setOptimizeDurableTopicPrefetch(this.prefetch);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        return activeMQConnectionFactory;
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.brokerService.stop();
    }

    @Test(timeout = 15000)
    public void testInflightMessageSize() throws Exception {
        final long sendMessages = sendMessages(10);
        Assert.assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.1
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > sendMessages;
            }
        }));
        receiveMessages(10);
        Assert.assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.2
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() == 0;
            }
        }));
    }

    @Test(timeout = 15000)
    public void testInflightMessageSizePrefetchFilled() throws Exception {
        final long sendMessages = sendMessages(this.prefetch);
        Assert.assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.3
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > sendMessages;
            }
        }));
        long inFlightMessageSize = getSubscription().getInFlightMessageSize();
        sendMessages(10);
        Assert.assertEquals("Inflight message size should not change", inFlightMessageSize, getSubscription().getInFlightMessageSize());
        receiveMessages(this.prefetch + 10);
        Assert.assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.4
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() == 0;
            }
        }));
    }

    @Test(timeout = 15000)
    public void testInflightMessageSizePrefetchNotFilled() throws Exception {
        final long sendMessages = sendMessages(this.prefetch - 10);
        Assert.assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.5
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > sendMessages;
            }
        }));
        final long inFlightMessageSize = getSubscription().getInFlightMessageSize();
        sendMessages(10);
        Assert.assertTrue("Inflight message size should be greater than previous inlight size", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.6
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > inFlightMessageSize;
            }
        }));
        receiveMessages(this.prefetch);
        Assert.assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.7
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() == 0;
            }
        }));
    }

    @Test(timeout = 15000)
    public void testInflightMessageSizeRollback() throws Exception {
        Assume.assumeTrue(this.ackType == 0);
        final long sendMessages = sendMessages(10);
        Assert.assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.statistics.AbstractInflightMessageSizeTest.8
            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > sendMessages;
            }
        }));
        long inFlightMessageSize = getSubscription().getInFlightMessageSize();
        for (int i = 0; i < 10; i++) {
            this.consumer.receive();
        }
        this.session.rollback();
        Assert.assertEquals("Inflight message size should not change on rollback", inFlightMessageSize, getSubscription().getInFlightMessageSize());
    }

    protected long sendMessages(int i) throws JMSException {
        MessageProducer createProducer = this.session.createProducer(this.dest);
        long j = 0;
        for (int i2 = 0; i2 < i; i2++) {
            Random random = new Random();
            int nextInt = random.nextInt(150000);
            j += nextInt;
            byte[] bArr = new byte[nextInt > 0 ? nextInt : 1];
            random.nextBytes(bArr);
            BytesMessage createBytesMessage = this.session.createBytesMessage();
            createBytesMessage.writeBytes(bArr);
            createProducer.send(createBytesMessage);
        }
        if (this.session.getTransacted()) {
            this.session.commit();
        }
        return j;
    }

    protected void receiveMessages(int i) throws JMSException {
        for (int i2 = 0; i2 < i; i2++) {
            Message receive = this.consumer.receive();
            if (this.ackType == 0) {
                this.session.commit();
            } else if (this.ackType != 1) {
                receive.acknowledge();
            }
        }
    }

    protected abstract Subscription getSubscription();

    protected abstract ActiveMQDestination getActiveMQDestination();

    protected abstract MessageConsumer getMessageConsumer() throws JMSException;

    /* renamed from: getDestination */
    protected abstract Destination mo771getDestination() throws JMSException;
}
