package org.apache.activemq.artemis.tests.integration.server;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.class */
public class ExpiryRunnerTest extends ActiveMQTestBase {
    private ActiveMQServer server;
    private ClientSession clientSession;
    private final SimpleString qName = new SimpleString("ExpiryRunnerTestQ");
    private final SimpleString qName2 = new SimpleString("ExpiryRunnerTestQ2");
    private SimpleString expiryQueue;
    private SimpleString expiryAddress;
    private ServerLocator locator;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest$DummyMessageHandler.class */
    private static class DummyMessageHandler implements Runnable {
        List<String> payloads;
        private final ClientConsumer consumer;
        private final CountDownLatch latch;

        private DummyMessageHandler(ClientConsumer clientConsumer, CountDownLatch countDownLatch) {
            this.payloads = new ArrayList();
            this.consumer = clientConsumer;
            this.latch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            ClientMessage receive;
            while (true) {
                try {
                    receive = this.consumer.receive(5000L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (receive == null) {
                    this.latch.countDown();
                    return;
                } else {
                    receive.acknowledge();
                    this.payloads.add(receive.getBodyBuffer().readString());
                    Thread.sleep(110L);
                }
            }
        }
    }

    @Test
    public void testBasicExpire() throws Exception {
        ClientProducer createProducer = this.clientSession.createProducer(this.qName);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(this.clientSession, "m" + i);
            createTextMessage.setExpiration(currentTimeMillis);
            createProducer.send(createTextMessage);
        }
        Thread.sleep(1600L);
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getMessageCount());
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getDeliveringCount());
    }

    @Test
    public void testExpireFromMultipleQueues() throws Exception {
        ClientProducer createProducer = this.clientSession.createProducer(this.qName);
        this.clientSession.createQueue(this.qName2, RoutingType.MULTICAST, this.qName2, (SimpleString) null, false);
        this.server.getAddressSettingsRepository().addMatch(this.qName2.toString(), new AddressSettings().setExpiryAddress(this.expiryAddress));
        ClientProducer createProducer2 = this.clientSession.createProducer(this.qName2);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(this.clientSession, "m" + i);
            createTextMessage.setExpiration(currentTimeMillis);
            createProducer.send(createTextMessage);
            ClientMessage createTextMessage2 = createTextMessage(this.clientSession, "m" + i);
            createTextMessage2.setExpiration(currentTimeMillis);
            createProducer2.send(createTextMessage2);
        }
        Thread.sleep(1600L);
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getMessageCount());
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getDeliveringCount());
    }

    @Test
    public void testExpireHalf() throws Exception {
        ClientProducer createProducer = this.clientSession.createProducer(this.qName);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(this.clientSession, "m" + i);
            if (i % 2 == 0) {
                createTextMessage.setExpiration(currentTimeMillis);
            }
            createProducer.send(createTextMessage);
        }
        Thread.sleep(1600L);
        Assert.assertEquals(100 / 2, this.server.getPostOffice().getBinding(this.qName).getBindable().getMessageCount());
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getDeliveringCount());
    }

    @Test
    public void testExpireConsumeHalf() throws Exception {
        ClientProducer createProducer = this.clientSession.createProducer(this.qName);
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(this.clientSession, "m" + i);
            createTextMessage.setExpiration(currentTimeMillis);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = this.clientSession.createConsumer(this.qName);
        this.clientSession.start();
        for (int i2 = 0; i2 < 100 / 2; i2++) {
            ClientMessage receive = createConsumer.receive(500L);
            Assert.assertNotNull("message not received " + i2, receive);
            receive.acknowledge();
            Assert.assertEquals("m" + i2, receive.getBodyBuffer().readString());
        }
        createConsumer.close();
        Thread.sleep(2100L);
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getMessageCount());
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getDeliveringCount());
    }

    @Test
    public void testExpireToExpiryQueue() throws Exception {
        this.server.getAddressSettingsRepository().addMatch(this.qName2.toString(), new AddressSettings().setExpiryAddress(this.expiryAddress));
        this.clientSession.deleteQueue(this.qName);
        this.clientSession.createQueue(this.qName, RoutingType.MULTICAST, this.qName, (SimpleString) null, false);
        this.clientSession.createQueue(this.qName, RoutingType.MULTICAST, this.qName2, (SimpleString) null, false);
        ClientProducer createProducer = this.clientSession.createProducer(this.qName);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(this.clientSession, "m" + i);
            createTextMessage.setExpiration(currentTimeMillis);
            createProducer.send(createTextMessage);
        }
        Thread.sleep(1600L);
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getMessageCount());
        Assert.assertEquals(0L, this.server.getPostOffice().getBinding(this.qName).getBindable().getDeliveringCount());
        ClientConsumer createConsumer = this.clientSession.createConsumer(this.expiryQueue);
        this.clientSession.start();
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertNotNull(createConsumer.receive(500L));
        }
        for (int i3 = 0; i3 < 100; i3++) {
            Assert.assertNotNull(createConsumer.receive(500L));
        }
        Assert.assertEquals(100L, this.server.getPostOffice().getBinding(this.qName).getBindable().getMessagesExpired());
        createConsumer.close();
    }

    @Test
    public void testExpireWhilstConsumingMessagesStillInOrder() throws Exception {
        ClientProducer createProducer = this.clientSession.createProducer(this.qName);
        ClientConsumer createConsumer = this.clientSession.createConsumer(this.qName);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(createConsumer, countDownLatch);
        this.clientSession.start();
        Thread thread = new Thread(dummyMessageHandler);
        thread.start();
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        int i = 0;
        long currentTimeMillis2 = System.currentTimeMillis() + 2000;
        do {
            int i2 = i;
            i++;
            ClientMessage createTextMessage = createTextMessage(this.clientSession, "m" + i2);
            createTextMessage.setExpiration(currentTimeMillis);
            createProducer.send(createTextMessage);
            Thread.sleep(100L);
        } while (System.currentTimeMillis() < currentTimeMillis2);
        Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
        createConsumer.close();
        ClientConsumer createConsumer2 = this.clientSession.createConsumer(this.expiryQueue);
        while (true) {
            ClientMessage receive = createConsumer2.receive(2000L);
            if (receive == null) {
                break;
            }
            String readString = receive.getBodyBuffer().readString();
            receive.acknowledge();
            Assert.assertFalse(dummyMessageHandler.payloads.contains(readString));
            dummyMessageHandler.payloads.add(readString);
        }
        for (int i3 = 0; i3 < i && !dummyMessageHandler.payloads.isEmpty(); i3++) {
            Assert.assertTrue("m" + i3, dummyMessageHandler.payloads.remove("m" + i3));
        }
        createConsumer2.close();
        thread.join();
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(1000L), false));
        this.server.start();
        this.locator = createInVMNonHALocator().setBlockOnAcknowledge(true);
        this.clientSession = createSessionFactory(this.locator).createSession(false, true, true);
        this.clientSession.createQueue(this.qName, RoutingType.MULTICAST, this.qName, (SimpleString) null, false);
        this.expiryAddress = new SimpleString("EA");
        this.expiryQueue = new SimpleString("expiryQ");
        AddressSettings expiryAddress = new AddressSettings().setExpiryAddress(this.expiryAddress);
        this.server.getAddressSettingsRepository().addMatch(this.qName.toString(), expiryAddress);
        this.server.getAddressSettingsRepository().addMatch(this.qName2.toString(), expiryAddress);
        this.clientSession.createQueue(this.expiryAddress, RoutingType.MULTICAST, this.expiryQueue, (SimpleString) null, false);
    }
}
