package org.apache.activemq.artemis.tests.integration.scheduling;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.class */
public class ScheduledMessageTest extends ActiveMQTestBase {
    private final SimpleString atestq = SimpleString.of("ascheduledtestq");
    private final SimpleString atestq2 = SimpleString.of("ascheduledtestq2");
    private Configuration configuration;
    private ActiveMQServer server;
    private ServerLocator locator;

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        startServer();
    }

    protected void startServer() throws Exception {
        this.configuration = createDefaultInVMConfig();
        this.server = createServer(true, this.configuration);
        this.server.start();
        this.locator = createInVMNonHALocator();
    }

    @Test
    public void testRecoveredMessageDeliveredCorrectly() throws Exception {
        testMessageDeliveredCorrectly(true);
    }

    @Test
    public void testMessageDeliveredCorrectly() throws Exception {
        testMessageDeliveredCorrectly(false);
    }

    @Test
    public void testScheduledMessagesDeliveredCorrectly() throws Exception {
        testScheduledMessagesDeliveredCorrectly(false);
    }

    @Test
    public void testRecoveredScheduledMessagesDeliveredCorrectly() throws Exception {
        testScheduledMessagesDeliveredCorrectly(true);
    }

    @Test
    public void testScheduledMessagesDeliveredCorrectlyDifferentOrder() throws Exception {
        testScheduledMessagesDeliveredCorrectlyDifferentOrder(false);
    }

    @Test
    public void testRecoveredScheduledMessagesDeliveredCorrectlyDifferentOrder() throws Exception {
        testScheduledMessagesDeliveredCorrectlyDifferentOrder(true);
    }

    @Test
    public void testScheduledAndNormalMessagesDeliveredCorrectly() throws Exception {
        testScheduledAndNormalMessagesDeliveredCorrectly(false);
    }

    @Test
    public void testRecoveredScheduledAndNormalMessagesDeliveredCorrectly() throws Exception {
        testScheduledAndNormalMessagesDeliveredCorrectly(true);
    }

    @Test
    public void testTxMessageDeliveredCorrectly() throws Exception {
        testTxMessageDeliveredCorrectly(false);
    }

    @Test
    public void testRecoveredTxMessageDeliveredCorrectly() throws Exception {
        testTxMessageDeliveredCorrectly(true);
    }

    @Test
    public void testPagedMessageDeliveredCorrectly() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientMessage createDurableMessage = createDurableMessage(createSession, "m1");
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        createDurableMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createDurableMessage);
        createProducer.close();
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        ClientMessage receive = createConsumer.receive(10250L);
        Assertions.assertTrue(System.currentTimeMillis() >= currentTimeMillis);
        Assertions.assertEquals("m1", receive.getBodyBuffer().readString());
        receive.acknowledge();
        createConsumer.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    @Test
    public void testPagedMessageDeliveredMultipleConsumersCorrectly() throws Exception {
        this.server.getAddressSettingsRepository().addMatch(this.atestq.toString(), new AddressSettings().setRedeliveryDelay(5000L));
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        createSession.createQueue(QueueConfiguration.of(this.atestq2).setAddress(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        createProducer.send(createDurableMessage(createSession, "m1"));
        createProducer.close();
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        ClientConsumer createConsumer2 = createSession.createConsumer(this.atestq2);
        createSession.start();
        ClientMessage receive = createConsumer.receive(1000L);
        receive.acknowledge();
        ClientMessage receive2 = createConsumer2.receive(1000L);
        receive2.acknowledge();
        Assertions.assertEquals("m1", receive.getBodyBuffer().readString());
        Assertions.assertEquals("m1", receive2.getBodyBuffer().readString());
        long currentTimeMillis = System.currentTimeMillis();
        createConsumer.close();
        createConsumer2.close();
        createSession.rollback();
        ClientConsumer createConsumer3 = createSession.createConsumer(this.atestq);
        ClientConsumer createConsumer4 = createSession.createConsumer(this.atestq2);
        ClientMessage receive3 = createConsumer3.receive(5250L);
        ClientMessage receive4 = createConsumer4.receive(1000L);
        Assertions.assertTrue(System.currentTimeMillis() >= currentTimeMillis + 5000);
        Assertions.assertEquals("m1", receive3.getBodyBuffer().readString());
        Assertions.assertEquals("m1", receive4.getBodyBuffer().readString());
        receive4.acknowledge();
        receive3.acknowledge();
        createConsumer3.close();
        createConsumer4.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    @Test
    public void testPagedMessageDeliveredMultipleConsumersAfterRecoverCorrectly() throws Exception {
        this.server.getAddressSettingsRepository().addMatch(this.atestq.toString(), new AddressSettings().setRedeliveryDelay(5000L));
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        createSession.createQueue(QueueConfiguration.of(this.atestq2).setAddress(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        createProducer.send(createDurableMessage(createSession, "m1"));
        createProducer.close();
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        ClientConsumer createConsumer2 = createSession.createConsumer(this.atestq2);
        createSession.start();
        ClientMessage receive = createConsumer.receive(1000L);
        Assertions.assertNotNull(receive);
        receive.acknowledge();
        ClientMessage receive2 = createConsumer2.receive(1000L);
        Assertions.assertNotNull(receive2);
        receive2.acknowledge();
        Assertions.assertEquals("m1", receive.getBodyBuffer().readString());
        Assertions.assertEquals("m1", receive2.getBodyBuffer().readString());
        long currentTimeMillis = System.currentTimeMillis();
        createConsumer.close();
        createConsumer2.close();
        createSession.rollback();
        createProducer.close();
        createSession.close();
        this.server.stop();
        this.server = null;
        this.server = createServer(true, this.configuration);
        this.server.start();
        ClientSession createSession2 = createSessionFactory(this.locator).createSession(false, true, true);
        ClientConsumer createConsumer3 = createSession2.createConsumer(this.atestq);
        ClientConsumer createConsumer4 = createSession2.createConsumer(this.atestq2);
        createSession2.start();
        ClientMessage receive3 = createConsumer3.receive(5250L);
        Assertions.assertNotNull(receive3);
        ClientMessage receive4 = createConsumer4.receive(1000L);
        Assertions.assertNotNull(receive4);
        Assertions.assertTrue(System.currentTimeMillis() >= currentTimeMillis + 5000);
        Assertions.assertEquals("m1", receive3.getBodyBuffer().readString());
        Assertions.assertEquals("m1", receive4.getBodyBuffer().readString());
        receive4.acknowledge();
        receive3.acknowledge();
        createConsumer3.close();
        createConsumer4.close();
        Assertions.assertNull(createSession2.createConsumer(this.atestq).receiveImmediate());
        createSession2.close();
    }

    public void testMessageDeliveredCorrectly(boolean z) throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientMessage createMessage = createSession.createMessage((byte) 3, false, 0L, System.currentTimeMillis(), (byte) 1);
        createMessage.getBodyBuffer().writeString("testINVMCoreClient");
        createMessage.setDurable(true);
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createMessage);
        if (z) {
            createProducer.close();
            createSession.close();
            this.server.stop();
            this.server = null;
            this.server = createServer(true, this.configuration);
            this.server.start();
            createSession = createSessionFactory(this.locator).createSession(false, true, true);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        ClientMessage receive = createConsumer.receive(11000L);
        Assertions.assertTrue(System.currentTimeMillis() >= currentTimeMillis);
        Assertions.assertEquals("testINVMCoreClient", receive.getBodyBuffer().readString());
        receive.acknowledge();
        createConsumer.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    public void testScheduledMessagesDeliveredCorrectly(boolean z) throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientMessage createDurableMessage = createDurableMessage(createSession, "m1");
        ClientMessage createDurableMessage2 = createDurableMessage(createSession, "m2");
        ClientMessage createDurableMessage3 = createDurableMessage(createSession, "m3");
        ClientMessage createDurableMessage4 = createDurableMessage(createSession, "m4");
        ClientMessage createDurableMessage5 = createDurableMessage(createSession, "m5");
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        createDurableMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createDurableMessage);
        long j = currentTimeMillis + 1000;
        createDurableMessage2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j);
        createProducer.send(createDurableMessage2);
        long j2 = j + 1000;
        createDurableMessage3.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j2);
        createProducer.send(createDurableMessage3);
        long j3 = j2 + 1000;
        createDurableMessage4.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j3);
        createProducer.send(createDurableMessage4);
        long j4 = j3 + 1000;
        createDurableMessage5.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j4);
        createProducer.send(createDurableMessage5);
        long j5 = j4 - 4000;
        if (z) {
            createProducer.close();
            createSession.close();
            this.server.stop();
            this.server = null;
            this.server = createServer(true, this.configuration);
            this.server.start();
            createSession = createSessionFactory(this.locator).createSession(false, true, true);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        ClientMessage receive = createConsumer.receive(11000L);
        Assertions.assertTrue(System.currentTimeMillis() >= j5);
        Assertions.assertEquals("m1", receive.getBodyBuffer().readString());
        receive.acknowledge();
        long j6 = j5 + 1000;
        ClientMessage receive2 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j6);
        Assertions.assertEquals("m2", receive2.getBodyBuffer().readString());
        receive2.acknowledge();
        long j7 = j6 + 1000;
        ClientMessage receive3 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j7);
        Assertions.assertEquals("m3", receive3.getBodyBuffer().readString());
        receive3.acknowledge();
        long j8 = j7 + 1000;
        ClientMessage receive4 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j8);
        Assertions.assertEquals("m4", receive4.getBodyBuffer().readString());
        receive4.acknowledge();
        long j9 = j8 + 1000;
        ClientMessage receive5 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j9);
        Assertions.assertEquals("m5", receive5.getBodyBuffer().readString());
        receive5.acknowledge();
        createConsumer.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    public void testScheduledMessagesDeliveredCorrectlyDifferentOrder(boolean z) throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientMessage createDurableMessage = createDurableMessage(createSession, "m1");
        ClientMessage createDurableMessage2 = createDurableMessage(createSession, "m2");
        ClientMessage createDurableMessage3 = createDurableMessage(createSession, "m3");
        ClientMessage createDurableMessage4 = createDurableMessage(createSession, "m4");
        ClientMessage createDurableMessage5 = createDurableMessage(createSession, "m5");
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        createDurableMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createDurableMessage);
        long j = currentTimeMillis + 3000;
        createDurableMessage2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j);
        createProducer.send(createDurableMessage2);
        long j2 = j - 2000;
        createDurableMessage3.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j2);
        createProducer.send(createDurableMessage3);
        long j3 = j2 + 3000;
        createDurableMessage4.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j3);
        createProducer.send(createDurableMessage4);
        long j4 = j3 - 2000;
        createDurableMessage5.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j4);
        createProducer.send(createDurableMessage5);
        long j5 = j4 - 2000;
        if (z) {
            createProducer.close();
            createSession.close();
            this.server.stop();
            this.server = null;
            this.server = createServer(true, this.configuration);
            this.server.start();
            createSession = createSessionFactory(this.locator).createSession(false, true, true);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        ClientMessage receive = createConsumer.receive(10250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j5);
        Assertions.assertEquals("m1", receive.getBodyBuffer().readString());
        receive.acknowledge();
        long j6 = j5 + 1000;
        ClientMessage receive2 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j6);
        Assertions.assertEquals("m3", receive2.getBodyBuffer().readString());
        receive2.acknowledge();
        long j7 = j6 + 1000;
        ClientMessage receive3 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j7);
        Assertions.assertEquals("m5", receive3.getBodyBuffer().readString());
        receive3.acknowledge();
        long j8 = j7 + 1000;
        ClientMessage receive4 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j8);
        Assertions.assertEquals("m2", receive4.getBodyBuffer().readString());
        receive4.acknowledge();
        long j9 = j8 + 1000;
        ClientMessage receive5 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j9);
        Assertions.assertEquals("m4", receive5.getBodyBuffer().readString());
        receive5.acknowledge();
        createConsumer.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    @Test
    public void testManyMessagesSameTime() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("value", i);
            createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
            createProducer.send(createMessage);
        }
        createSession.commit();
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(AmqpSender.DEFAULT_SEND_TIMEOUT);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            Assertions.assertEquals(i2, receive.getIntProperty("value").intValue());
        }
        createSession.commit();
        Assertions.assertNull(createConsumer.receiveImmediate());
        createSession.close();
    }

    @Test
    public void testManagementDeliveryById() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        long currentTimeMillis = System.currentTimeMillis() + 999999999;
        ClientMessage createMessage = createSession.createMessage(true);
        createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createMessage);
        createSession.commit();
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        Assertions.assertNull(createConsumer.receive(500L));
        QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource("queue." + this.atestq);
        queueControl.deliverScheduledMessage(((Long) queueControl.listScheduledMessages()[0].get("messageID")).longValue());
        ClientMessage receive = createConsumer.receive(500L);
        Assertions.assertNotNull(receive);
        receive.acknowledge();
        createSession.commit();
        Assertions.assertNull(createConsumer.receiveImmediate());
        createSession.close();
    }

    @Test
    public void testManagementDeleteById() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        try {
            ClientSession createSession = createSessionFactory.createSession(false, false, false);
            createSession.createQueue(QueueConfiguration.of(this.atestq));
            ClientProducer createProducer = createSession.createProducer(this.atestq);
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 999999999);
            createProducer.send(createMessage);
            createSession.commit();
            if (createSessionFactory != null) {
                createSessionFactory.close();
            }
            QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource("queue." + this.atestq);
            Assertions.assertEquals(1L, queueControl.getMessageCount());
            Assertions.assertEquals(1L, queueControl.getScheduledCount());
            Assertions.assertTrue(queueControl.removeMessage(((Long) queueControl.listScheduledMessages()[0].get("messageID")).longValue()));
            Assertions.assertEquals(0L, queueControl.getMessageCount());
            Assertions.assertEquals(0L, queueControl.getScheduledCount());
        } catch (Throwable th) {
            if (createSessionFactory != null) {
                try {
                    createSessionFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testManagementDeliveryByFilter() throws Exception {
        String randomString = RandomUtil.randomString();
        String str = "X" + RandomUtil.randomString().replace("-", "");
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        long currentTimeMillis = System.currentTimeMillis() + 999999999;
        ClientMessage createMessage = createSession.createMessage(true);
        createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createMessage.putStringProperty(str, randomString);
        createProducer.send(createMessage);
        ClientMessage createMessage2 = createSession.createMessage(true);
        createMessage2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createMessage2.putStringProperty(str, randomString);
        createProducer.send(createMessage2);
        createSession.commit();
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        Assertions.assertNull(createConsumer.receive(500L));
        ((QueueControl) this.server.getManagementService().getResource("queue." + this.atestq)).deliverScheduledMessages(str + " = '" + randomString + "'");
        ClientMessage receive = createConsumer.receive(500L);
        Assertions.assertNotNull(receive);
        receive.acknowledge();
        ClientMessage receive2 = createConsumer.receive(500L);
        Assertions.assertNotNull(receive2);
        receive2.acknowledge();
        createSession.commit();
        Assertions.assertNull(createConsumer.receiveImmediate());
        createSession.close();
    }

    public void testScheduledAndNormalMessagesDeliveredCorrectly(boolean z) throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientMessage createDurableMessage = createDurableMessage(createSession, "m1");
        ClientMessage createDurableMessage2 = createDurableMessage(createSession, "m2");
        ClientMessage createDurableMessage3 = createDurableMessage(createSession, "m3");
        ClientMessage createDurableMessage4 = createDurableMessage(createSession, "m4");
        ClientMessage createDurableMessage5 = createDurableMessage(createSession, "m5");
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        createDurableMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createDurableMessage);
        createProducer.send(createDurableMessage2);
        long j = currentTimeMillis + 1000;
        createDurableMessage3.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j);
        createProducer.send(createDurableMessage3);
        createProducer.send(createDurableMessage4);
        long j2 = j + 1000;
        createDurableMessage5.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, j2);
        createProducer.send(createDurableMessage5);
        long j3 = j2 - 2000;
        if (z) {
            createProducer.close();
            createSession.close();
            this.server.stop();
            this.server = null;
            this.server = createServer(true, this.configuration);
            this.server.start();
            createSession = createSessionFactory(this.locator).createSession(false, true, true);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        ClientMessage receive = createConsumer.receive(1000L);
        Assertions.assertEquals("m2", receive.getBodyBuffer().readString());
        receive.acknowledge();
        ClientMessage receive2 = createConsumer.receive(1000L);
        Assertions.assertEquals("m4", receive2.getBodyBuffer().readString());
        receive2.acknowledge();
        ClientMessage receive3 = createConsumer.receive(10250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j3);
        Assertions.assertEquals("m1", receive3.getBodyBuffer().readString());
        receive3.acknowledge();
        long j4 = j3 + 1000;
        ClientMessage receive4 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j4);
        Assertions.assertEquals("m3", receive4.getBodyBuffer().readString());
        receive4.acknowledge();
        long j5 = j4 + 1000;
        ClientMessage receive5 = createConsumer.receive(1250L);
        Assertions.assertTrue(System.currentTimeMillis() >= j5);
        Assertions.assertEquals("m5", receive5.getBodyBuffer().readString());
        receive5.acknowledge();
        createConsumer.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    public void testTxMessageDeliveredCorrectly(boolean z) throws Exception {
        XidImpl xidImpl = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
        XidImpl xidImpl2 = new XidImpl("xa2".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
        ClientSession createSession = createSessionFactory(this.locator).createSession(true, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        createSession.start(xidImpl, 0);
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientMessage createDurableMessage = createDurableMessage(createSession, "testINVMCoreClient");
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        createDurableMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
        createProducer.send(createDurableMessage);
        createSession.end(xidImpl, 67108864);
        createSession.prepare(xidImpl);
        if (z) {
            createProducer.close();
            createSession.close();
            this.server.stop();
            this.server = null;
            this.server = createServer(true, this.configuration);
            this.server.start();
            createSession = createSessionFactory(this.locator).createSession(true, false, false);
        }
        createSession.commit(xidImpl, false);
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        createSession.start(xidImpl2, 0);
        ClientMessage receive = createConsumer.receive(11000L);
        Assertions.assertTrue(System.currentTimeMillis() >= currentTimeMillis);
        Assertions.assertNotNull(receive);
        Assertions.assertEquals("testINVMCoreClient", receive.getBodyBuffer().readString());
        receive.acknowledge();
        createSession.end(xidImpl2, 67108864);
        createSession.prepare(xidImpl2);
        createSession.commit(xidImpl2, false);
        createConsumer.close();
        Assertions.assertNull(createSession.createConsumer(this.atestq).receiveImmediate());
        createSession.close();
    }

    @Test
    public void testPendingACKOnPrepared() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(true, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("value", i);
            createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis);
            createProducer.send(createMessage);
        }
        createSession.close();
        for (int i2 = 0; i2 < 100; i2++) {
            XidImpl newXID = newXID();
            ClientSession createSession2 = createSessionFactory.createSession(true, false, false);
            ClientConsumer createConsumer = createSession2.createConsumer(this.atestq);
            createSession2.start();
            createSession2.start(newXID, 0);
            ClientMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            createSession2.end(newXID, 67108864);
            createSession2.prepare(newXID);
            createSession2.close();
        }
        createSessionFactory.close();
        this.locator.close();
        this.server.stop();
        startServer();
        ClientSessionFactory createSessionFactory2 = createSessionFactory(this.locator);
        ClientSession createSession3 = createSessionFactory2.createSession(false, false);
        ClientConsumer createConsumer2 = createSession3.createConsumer(this.atestq);
        createSession3.start();
        Assertions.assertNull(createConsumer2.receive(1000L));
        createSession3.close();
        createSessionFactory2.close();
    }

    @Test
    public void testScheduledDeliveryTX() throws Exception {
        scheduledDelivery(true);
    }

    @Test
    public void testScheduledDeliveryNoTX() throws Exception {
        scheduledDelivery(false);
    }

    @Test
    public void testRedeliveryAfterPrepare() throws Exception {
        AddressSettings redeliveryDelay = new AddressSettings().setRedeliveryDelay(5000L);
        this.server.getAddressSettingsRepository().addMatch(this.atestq.toString(), redeliveryDelay);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("key", i);
            createProducer.send(createMessage);
            createSession.commit();
        }
        createSession.close();
        ClientSession createSession2 = createSessionFactory.createSession(true, false, false);
        ClientConsumer createConsumer = createSession2.createConsumer(this.atestq);
        ArrayList arrayList = new ArrayList();
        createSession2.start();
        for (int i2 = 0; i2 < 100; i2++) {
            XidImpl newXID = newXID();
            createSession2.start(newXID, 0);
            ClientMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            createSession2.end(newXID, 67108864);
            createSession2.prepare(newXID);
            arrayList.add(newXID);
        }
        createSession2.rollback((Xid) arrayList.get(0));
        arrayList.set(0, null);
        createSession2.close();
        this.server.stop();
        this.configuration = createDefaultInVMConfig().addAddressSetting(this.atestq.toString(), redeliveryDelay);
        this.server = createServer(true, this.configuration);
        this.server.start();
        this.locator = createInVMNonHALocator();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Thread thread = new Thread(() -> {
            try {
                ClientSessionFactory createSessionFactory2 = createSessionFactory(this.locator);
                ClientSession createSession3 = createSessionFactory2.createSession(false, false);
                createSession3.start();
                ClientConsumer createConsumer2 = createSession3.createConsumer(this.atestq);
                for (int i3 = 0; i3 < 100; i3++) {
                    ClientMessage receive2 = createConsumer2.receive(100000L);
                    Assertions.assertNotNull(receive2);
                    atomicInteger.incrementAndGet();
                    receive2.acknowledge();
                    createSession3.commit();
                }
                createSession3.close();
                createSessionFactory2.close();
            } catch (Throwable th) {
                th.printStackTrace();
                atomicInteger.set(-1);
            }
        });
        thread.start();
        ClientSession createSession3 = createSessionFactory(this.locator).createSession(true, false, false);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Xid xid = (Xid) it.next();
            if (xid != null) {
                createSession3.rollback(xid);
            }
        }
        createSession3.close();
        thread.join();
        Assertions.assertEquals(100, atomicInteger.get());
    }

    private void scheduledDelivery(boolean z) throws Exception {
        ActiveMQTestBase.forceGC();
        XidImpl xidImpl = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(z, false, false);
        createSession.createQueue(QueueConfiguration.of(this.atestq));
        ClientProducer createProducer = createSession.createProducer(this.atestq);
        ClientConsumer createConsumer = createSession.createConsumer(this.atestq);
        createSession.start();
        if (z) {
            createSession.start(xidImpl, 0);
        }
        long currentTimeMillis = System.currentTimeMillis();
        ClientMessage createDurableMessage = createDurableMessage(createSession, "testScheduled1");
        createDurableMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis + 7000);
        createProducer.send(createDurableMessage);
        createProducer.send(createDurableMessage(createSession, "testScheduled2"));
        createProducer.send(createDurableMessage(createSession, "testScheduled3"));
        createProducer.send(createDurableMessage(createSession, "testScheduled4"));
        ClientMessage createDurableMessage2 = createDurableMessage(createSession, "testScheduled5");
        createDurableMessage2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis + 5000);
        createProducer.send(createDurableMessage2);
        ClientMessage createDurableMessage3 = createDurableMessage(createSession, "testScheduled6");
        createDurableMessage3.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis + 4000);
        createProducer.send(createDurableMessage3);
        ClientMessage createDurableMessage4 = createDurableMessage(createSession, "testScheduled7");
        createDurableMessage4.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis + 3000);
        createProducer.send(createDurableMessage4);
        ClientMessage createDurableMessage5 = createDurableMessage(createSession, "testScheduled8");
        createDurableMessage5.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis + 6000);
        createProducer.send(createDurableMessage5);
        ClientMessage createDurableMessage6 = createDurableMessage(createSession, "testScheduled9");
        createDurableMessage6.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, -3L);
        createProducer.send(createDurableMessage6);
        if (z) {
            createSession.end(xidImpl, 67108864);
            createSession.prepare(xidImpl);
            createSession.commit(xidImpl, false);
        } else {
            createSession.commit();
        }
        if (z) {
            createSession.start(xidImpl, 0);
        }
        ClientMessage receive = createConsumer.receive(250L);
        Assertions.assertNotNull(receive);
        Assertions.assertEquals("testScheduled2", receive.getBodyBuffer().readString());
        ClientMessage receive2 = createConsumer.receive(250L);
        Assertions.assertNotNull(receive2);
        Assertions.assertEquals("testScheduled3", receive2.getBodyBuffer().readString());
        ClientMessage receive3 = createConsumer.receive(250L);
        Assertions.assertNotNull(receive3);
        Assertions.assertEquals("testScheduled4", receive3.getBodyBuffer().readString());
        ClientMessage receive4 = createConsumer.receive(250L);
        Assertions.assertNotNull(receive4);
        Assertions.assertEquals("testScheduled9", receive4.getBodyBuffer().readString());
        ClientMessage receive5 = createConsumer.receive(3250L);
        Assertions.assertNotNull(receive5);
        Assertions.assertEquals("testScheduled7", receive5.getBodyBuffer().readString());
        Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 3000);
        ClientMessage receive6 = createConsumer.receive(1250L);
        Assertions.assertNotNull(receive6);
        Assertions.assertEquals("testScheduled6", receive6.getBodyBuffer().readString());
        Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 4000);
        ClientMessage receive7 = createConsumer.receive(1250L);
        Assertions.assertNotNull(receive7);
        Assertions.assertEquals("testScheduled5", receive7.getBodyBuffer().readString());
        Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 5000);
        ClientMessage receive8 = createConsumer.receive(1250L);
        Assertions.assertNotNull(receive8);
        Assertions.assertEquals("testScheduled8", receive8.getBodyBuffer().readString());
        Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 6000);
        ClientMessage receive9 = createConsumer.receive(1250L);
        Assertions.assertNotNull(receive9);
        Assertions.assertEquals("testScheduled1", receive9.getBodyBuffer().readString());
        Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 7000);
        if (z) {
            createSession.end(xidImpl, 67108864);
            createSession.prepare(xidImpl);
            createSession.commit(xidImpl, false);
        }
        createSession.close();
        createSessionFactory.close();
    }

    private ClientMessage createDurableMessage(ClientSession clientSession, String str) {
        ClientMessage createMessage = clientSession.createMessage((byte) 3, true, 0L, System.currentTimeMillis(), (byte) 1);
        createMessage.getBodyBuffer().writeString(str);
        return createMessage;
    }
}
