package org.apache.activemq.artemis.tests.integration.persistence.metrics;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToLongFunction;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.persistence.XmlImportExportTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.commons.lang.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.class */
public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport {
    protected String defaultQueueName = "test.queue";
    protected String defaultTopicName = "test.topic";
    protected static final Logger LOG = LoggerFactory.getLogger(JournalPendingMessageTest.class);
    protected static int maxMessageSize = 1000;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest$MessageSizeCalculator.class */
    public interface MessageSizeCalculator {
        long getMessageSize() throws Exception;
    }

    @Before
    public void setupAddresses() throws Exception {
        this.server.getPostOffice().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(this.defaultQueueName), RoutingType.ANYCAST));
        this.server.createQueue(SimpleString.toSimpleString(this.defaultQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(this.defaultQueueName), (SimpleString) null, true, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.util.JMSTestBase
    public Configuration createDefaultConfig(boolean z) throws Exception {
        Configuration createDefaultConfig = super.createDefaultConfig(z);
        createDefaultConfig.setGlobalMaxSize(100000L);
        return createDefaultConfig;
    }

    @Test
    public void testQueueMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        publishTestQueueMessages(200, atomicLong);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 200, atomicLong.get());
        killServer();
        restartServer();
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 200, atomicLong.get());
    }

    @Test
    public void testQueueMessageSizeTx() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        publishTestQueueMessagesTx(200, atomicLong);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 200, atomicLong.get());
        killServer();
        restartServer();
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 200, atomicLong.get());
    }

    @Test
    public void testQueueLargeMessageSize() throws Exception {
        this.cf.setMinLargeMessageSize(1000);
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        ActiveMQTextMessage createTextMessage = createSession.createTextMessage(StringUtils.repeat("t", XmlImportExportTest.CONSUMER_TIMEOUT));
        createSession.createProducer(createSession.createQueue(this.defaultQueueName)).send(createTextMessage);
        verifyPendingStats(this.defaultQueueName, 1, createTextMessage.getCoreMessage().getPersistentSize());
        verifyPendingDurableStats(this.defaultQueueName, 1, createTextMessage.getCoreMessage().getPersistentSize());
        createConnection.close();
        killServer();
        restartServer();
        verifyPendingStats(this.defaultQueueName, 1, createTextMessage.getCoreMessage().getPersistentSize());
        verifyPendingDurableStats(this.defaultQueueName, 1, createTextMessage.getCoreMessage().getPersistentSize());
    }

    @Test
    public void testQueueLargeMessageSizeTX() throws Exception {
        this.cf.setMinLargeMessageSize(1000);
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        String repeat = StringUtils.repeat("t", 2000);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.defaultQueueName));
        ActiveMQTextMessage createTextMessage = createSession.createTextMessage(repeat);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createTextMessage);
        }
        verifyPendingStats(this.defaultQueueName, 0, createTextMessage.getCoreMessage().getPersistentSize() * 10);
        verifyPendingDurableStats(this.defaultQueueName, 0, createTextMessage.getCoreMessage().getPersistentSize() * 10);
        createSession.commit();
        verifyPendingStats(this.defaultQueueName, 10, createTextMessage.getCoreMessage().getPersistentSize() * 10);
        verifyPendingDurableStats(this.defaultQueueName, 10, createTextMessage.getCoreMessage().getPersistentSize() * 10);
        createConnection.close();
        killServer();
        restartServer();
        verifyPendingStats(this.defaultQueueName, 10, createTextMessage.getCoreMessage().getPersistentSize());
        verifyPendingDurableStats(this.defaultQueueName, 10, createTextMessage.getCoreMessage().getPersistentSize());
    }

    @Test
    public void testQueueBrowserMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        publishTestQueueMessages(200, atomicLong);
        browseTestQueueMessages(this.defaultQueueName);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 200, atomicLong.get());
    }

    @Test
    public void testQueueMessageSizeNonPersistent() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        publishTestQueueMessages(200, 1, atomicLong);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
    }

    @Test
    public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        publishTestQueueMessages(100, 2, atomicLong2);
        publishTestQueueMessages(100, 1, atomicLong);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong2.get() + atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 100, atomicLong2.get());
    }

    @Test
    public void testQueueMessageSizeAfterConsumption() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        publishTestQueueMessages(200, atomicLong);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 200, atomicLong.get());
        consumeTestQueueMessages(200);
        verifyPendingStats(this.defaultQueueName, 0, 0L);
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
    }

    @Test
    public void testScheduledStats() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.defaultQueueName));
        createProducer.setDeliveryDelay(2000L);
        createProducer.send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
        verifyPendingStats(this.defaultQueueName, 1, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 1, atomicLong.get());
        verifyScheduledStats(this.defaultQueueName, 1, atomicLong.get());
        consumeTestQueueMessages(1);
        verifyPendingStats(this.defaultQueueName, 0, 0L);
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
        verifyScheduledStats(this.defaultQueueName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testDeliveringStats() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 2);
        createSession.createProducer(createSession.createQueue(this.defaultQueueName)).send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
        verifyPendingStats(this.defaultQueueName, 1, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 1, atomicLong.get());
        verifyDeliveringStats(this.defaultQueueName, 0, 0L);
        Message receive = createSession.createConsumer(createSession.createQueue(this.defaultQueueName)).receive();
        verifyDeliveringStats(this.defaultQueueName, 1, atomicLong.get());
        receive.acknowledge();
        verifyPendingStats(this.defaultQueueName, 0, 0L);
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
        verifyDeliveringStats(this.defaultQueueName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        publishTestQueueMessages(200, 1, atomicLong);
        verifyPendingStats(this.defaultQueueName, 200, atomicLong.get());
        consumeTestQueueMessages(200);
        verifyPendingStats(this.defaultQueueName, 0, 0L);
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
    }

    @Test
    public void testTopicMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(this.defaultTopicName));
        publishTestTopicMessages(200, atomicLong);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
        consumeTestMessages(createConsumer, 200);
        verifyPendingStats(this.defaultTopicName, 0, 0L);
        verifyPendingDurableStats(this.defaultQueueName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testTopicMessageSizeShared() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createSharedConsumer = createSession.createSharedConsumer(createSession.createTopic(this.defaultTopicName), "sub1");
        MessageConsumer createSharedConsumer2 = createSession.createSharedConsumer(createSession.createTopic(this.defaultTopicName), "sub1");
        publishTestTopicMessages(200, atomicLong);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 0, 0L);
        createSharedConsumer2.close();
        consumeTestMessages(createSharedConsumer, 200);
        verifyPendingStats(this.defaultTopicName, 0, 0L);
        verifyPendingDurableStats(this.defaultTopicName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testTopicNonPersistentMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(this.defaultTopicName));
        publishTestTopicMessages(200, 1, atomicLong);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        consumeTestMessages(createConsumer, 200);
        verifyPendingStats(this.defaultTopicName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(this.defaultTopicName));
        publishTestTopicMessages(100, 1, atomicLong2);
        publishTestTopicMessages(100, 2, atomicLong);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get() + atomicLong2.get());
        consumeTestMessages(createConsumer, 200);
        verifyPendingStats(this.defaultTopicName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testMessageSizeOneDurable() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2, false);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 200, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        verifyPendingStats(this.defaultTopicName, 0, 0L);
        verifyPendingDurableStats(this.defaultTopicName, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2, false);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 200, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 50, atomicLong);
        verifyPendingStats(this.defaultTopicName, 150, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 150, (long) (0.75d * atomicLong.get()));
        createConnection.close();
    }

    @Test
    public void testMessageSizeTwoDurables() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        publishTestMessagesDurable(createConnection, new String[]{"sub1", "sub2"}, 200, atomicLong, 2, false);
        verifyPendingStats(this.defaultTopicName, 400, 2 * atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 400, 2 * atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 200, atomicLong.get());
        createConnection.close();
        killServer();
        restartServer();
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 200, atomicLong.get());
    }

    @Test
    public void testMessageSizeSharedDurable() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession();
        MessageConsumer createSharedDurableConsumer = createSession.createSharedDurableConsumer(createSession.createTopic(this.defaultTopicName), "sub1");
        publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2, true);
        verifyPendingStats(this.defaultTopicName, 200, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 200, atomicLong.get());
        createSharedDurableConsumer.close();
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        verifyPendingStats(this.defaultTopicName, 0, atomicLong.get());
        verifyPendingDurableStats(this.defaultTopicName, 0, atomicLong.get());
        createConnection.close();
    }

    protected List<Queue> getQueues(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (LocalQueueBinding localQueueBinding : this.server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(str)).getBindings()) {
            if (localQueueBinding.getType() == BindingType.LOCAL_QUEUE) {
                arrayList.add(localQueueBinding.getQueue());
            }
        }
        return arrayList;
    }

    protected void verifyDeliveringStats(String str, int i, long j) throws Exception {
        verifyStats(str, i, j, (v0) -> {
            return v0.getDeliveringCount();
        }, (v0) -> {
            return v0.getDeliveringSize();
        });
        verifyStats(str, i, j, (v0) -> {
            return v0.getDurableDeliveringCount();
        }, (v0) -> {
            return v0.getDurableDeliveringSize();
        });
    }

    protected void verifyScheduledStats(String str, int i, long j) throws Exception {
        verifyStats(str, i, j, (v0) -> {
            return v0.getScheduledCount();
        }, (v0) -> {
            return v0.getScheduledSize();
        });
        verifyStats(str, i, j, (v0) -> {
            return v0.getDurableScheduledCount();
        }, (v0) -> {
            return v0.getDurableScheduledSize();
        });
    }

    protected void verifyPendingStats(String str, int i, long j) throws Exception {
        verifyStats(str, i, j, (v0) -> {
            return v0.getMessageCount();
        }, (v0) -> {
            return v0.getPersistentSize();
        });
    }

    protected void verifyPendingDurableStats(String str, int i, long j) throws Exception {
        verifyStats(str, i, j, (v0) -> {
            return v0.getDurableMessageCount();
        }, (v0) -> {
            return v0.getDurablePersistentSize();
        });
    }

    protected void verifyStats(String str, final int i, long j, final ToLongFunction<Queue> toLongFunction, final ToLongFunction<Queue> toLongFunction2) throws Exception {
        final List<Queue> queues = getQueues(str);
        assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.persistence.metrics.JournalPendingMessageTest.1
            public boolean isSatisfied() throws Exception {
                return queues.stream().mapToLong(toLongFunction).sum() == ((long) i);
            }
        }));
        verifySize(i, new MessageSizeCalculator() { // from class: org.apache.activemq.artemis.tests.integration.persistence.metrics.JournalPendingMessageTest.2
            @Override // org.apache.activemq.artemis.tests.integration.persistence.metrics.JournalPendingMessageTest.MessageSizeCalculator
            public long getMessageSize() throws Exception {
                return queues.stream().mapToLong(toLongFunction2).sum();
            }
        }, j);
    }

    protected void verifySize(int i, final MessageSizeCalculator messageSizeCalculator, final long j) throws Exception {
        if (i > 0) {
            assertTrue(org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.persistence.metrics.JournalPendingMessageTest.3
                public boolean isSatisfied() throws Exception {
                    return messageSizeCalculator.getMessageSize() > j;
                }
            }));
        } else {
            assertTrue(org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.persistence.metrics.JournalPendingMessageTest.4
                public boolean isSatisfied() throws Exception {
                    return messageSizeCalculator.getMessageSize() == 0;
                }
            }));
        }
    }

    protected void consumeTestMessages(MessageConsumer messageConsumer, int i) throws Exception {
        consumeTestMessages(messageConsumer, i, this.defaultTopicName);
    }

    protected void consumeTestMessages(MessageConsumer messageConsumer, int i, String str) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            messageConsumer.receive();
        }
    }

    protected void consumeDurableTestMessages(Connection connection, String str, int i, AtomicLong atomicLong) throws Exception {
        consumeDurableTestMessages(connection, str, i, this.defaultTopicName, atomicLong);
    }

    protected void publishTestMessagesDurable(Connection connection, String[] strArr, int i, AtomicLong atomicLong, int i2, boolean z) throws Exception {
        publishTestMessagesDurable(connection, strArr, this.defaultTopicName, i, 0, AbstractPersistentStatTestSupport.defaultMessageSize, atomicLong, false, i2, z);
    }

    protected void publishTestTopicMessages(int i, AtomicLong atomicLong) throws Exception {
        publishTestTopicMessages(i, 2, atomicLong);
    }

    protected void publishTestTopicMessages(int i, int i2, AtomicLong atomicLong) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("clientId2");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        try {
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(this.defaultTopicName));
            createProducer.setDeliveryMode(i2);
            for (int i3 = 0; i3 < i; i3++) {
                createProducer.send(createMessage(i3, createSession, maxMessageSize, atomicLong));
            }
        } finally {
            createConnection.close();
        }
    }

    protected void publishTestQueueMessagesTx(int i, AtomicLong atomicLong) throws Exception {
        publishTestQueueMessages(i, this.defaultQueueName, 2, maxMessageSize, atomicLong, true);
    }

    protected void publishTestQueueMessages(int i, AtomicLong atomicLong) throws Exception {
        publishTestQueueMessages(i, this.defaultQueueName, 2, maxMessageSize, atomicLong, false);
    }

    protected void publishTestQueueMessages(int i, int i2, AtomicLong atomicLong) throws Exception {
        publishTestQueueMessages(i, this.defaultQueueName, i2, maxMessageSize, atomicLong, false);
    }

    protected void consumeTestQueueMessages(int i) throws Exception {
        consumeTestQueueMessages(this.defaultQueueName, i);
    }
}
