package org.apache.activemq.artemis.tests.integration.openwire;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Queue;
import jakarta.jms.QueueReceiver;
import jakarta.jms.QueueSender;
import jakarta.jms.QueueSession;
import jakarta.jms.Session;
import jakarta.jms.StreamMessage;
import jakarta.jms.TemporaryQueue;
import jakarta.jms.TemporaryTopic;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicPublisher;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
import jakarta.jms.XAConnection;
import jakarta.jms.XASession;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.class */
public class SimpleOpenWireTest extends BasicOpenWireTest {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String testString = "simple test string";
    private final String testProp = "BASE_DATE";
    private final String propValue = "2017-11-01";

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest$AsyncConsumer.class */
    private class AsyncConsumer {
        private List<Message> messages = new ArrayList();
        private CountDownLatch latch = new CountDownLatch(1);
        private int nMsgs;
        private String queueName;
        private MessageConsumer consumer;

        AsyncConsumer(String str, Connection connection, int i, long j, int i2) throws JMSException {
            this.queueName = str;
            this.nMsgs = i2;
            Session createSession = connection.createSession(false, i);
            this.consumer = createSession.createConsumer(createSession.createQueue(str));
            this.consumer.setMessageListener(message -> {
                this.messages.add(message);
                if (this.messages.size() < i2) {
                    try {
                        TimeUnit.SECONDS.sleep(j);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (i == 2) {
                    try {
                        message.acknowledge();
                    } catch (JMSException e2) {
                        System.err.println("Failed to acknowledge " + message);
                        e2.printStackTrace();
                    }
                }
                if (this.messages.size() == i2) {
                    this.latch.countDown();
                }
            });
            connection.start();
        }

        public void waitFor(long j) throws TimeoutException, InterruptedException, JMSException {
            Assertions.assertTrue(this.latch.await(j, TimeUnit.SECONDS));
            SimpleOpenWireTest.this.checkQueueEmpty(this.queueName);
            Assertions.assertEquals(this.nMsgs, this.messages.size());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    public void extraServerConfig(Configuration configuration) {
        super.extraServerConfig(configuration);
        configuration.setAddressQueueScanPeriod(100L);
    }

    @Override // org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest, org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "2");
        this.realStore = true;
        super.setUp();
    }

    @Test
    public void testSimple() throws Exception {
        Connection createConnection = this.factory.createConnection();
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < 10; i++) {
            linkedList.add(createConnection.createSession(true, 0));
        }
        createConnection.close();
    }

    @Test
    public void testDuplicateTemporaryDestination() throws Exception {
        Connection createConnection = this.factory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        for (int i = 0; i < 10; i++) {
            createSession.createProducer(createTemporaryQueue).close();
        }
        int i2 = 0;
        for (OpenWireConnection openWireConnection : this.server.getRemotingService().getConnections()) {
            if (openWireConnection instanceof OpenWireConnection) {
                OpenWireConnection openWireConnection2 = openWireConnection;
                if (openWireConnection2.getState() != null && openWireConnection2.getState().getTempDestinations() != null) {
                    i2 += openWireConnection2.getState().getTempDestinations().size();
                }
            }
        }
        Assertions.assertTrue(i2 <= 1);
        createSession.close();
        createConnection.close();
    }

    @Test
    public void testTransactionalSimple() throws Exception {
        Connection createConnection = this.factory.createConnection();
        try {
            Session createSession = createConnection.createSession(true, 0);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createProducer.send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
            createSession.commit();
            Assertions.assertNull(createConsumer.receive(100L));
            createConnection.start();
            TextMessage receive = createConsumer.receive(5000L);
            Assertions.assertEquals(AutoCreateJmsDestinationTest.QUEUE_NAME, receive.getText());
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSendEmpty() throws Exception {
        Connection createConnection = this.factory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createProducer.send(createSession.createTextMessage());
            Assertions.assertNull(createConsumer.receive(100L));
            createConnection.start();
            TextMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSendNullMapMessage() throws Exception {
        Connection createConnection = this.factory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createProducer.send(createSession.createMapMessage());
            Assertions.assertNull(createConsumer.receive(100L));
            createConnection.start();
            MapMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSendEmptyMessages() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(this.queueName);
        QueueSession createQueueSession = this.connection.createQueueSession(false, 1);
        QueueSender createSender = createQueueSession.createSender(activeMQQueue);
        createSender.setDeliveryMode(1);
        this.connection.start();
        Message createMessage = createQueueSession.createMessage();
        createMessage.setStringProperty("testName", "testSendEmptyMessages");
        createSender.send(createMessage);
        QueueReceiver createReceiver = createQueueSession.createReceiver(activeMQQueue);
        Assertions.assertNotNull(createReceiver.receive(1000L), "Didn't receive message");
        BytesMessage createBytesMessage = createQueueSession.createBytesMessage();
        createBytesMessage.setStringProperty("testName", "testSendEmptyMessages");
        createSender.send(createBytesMessage);
        Assertions.assertNotNull(createReceiver.receive(1000L), "Didn't receive message");
        MapMessage createMapMessage = createQueueSession.createMapMessage();
        createMapMessage.setStringProperty("testName", "testSendEmptyMessages");
        createSender.send(createMapMessage);
        Assertions.assertNotNull(createReceiver.receive(1000L), "Didn't receive message");
        ObjectMessage createObjectMessage = createQueueSession.createObjectMessage();
        createObjectMessage.setStringProperty("testName", "testSendEmptyMessages");
        createSender.send(createObjectMessage);
        Assertions.assertNotNull(createReceiver.receive(1000L), "Didn't receive message");
        StreamMessage createStreamMessage = createQueueSession.createStreamMessage();
        createStreamMessage.setStringProperty("testName", "testSendEmptyMessages");
        createSender.send(createStreamMessage);
        Assertions.assertNotNull(createReceiver.receive(1000L), "Didn't receive message");
        TextMessage createTextMessage = createQueueSession.createTextMessage();
        createTextMessage.setStringProperty("testName", "testSendEmptyMessages");
        createSender.send(createTextMessage);
        Assertions.assertNotNull(createReceiver.receive(1000L), "Didn't receive message");
    }

    @Test
    public void testXASimple() throws Exception {
        XAConnection createXAConnection = this.xaFactory.createXAConnection();
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < 10; i++) {
            XASession createXASession = createXAConnection.createXASession();
            createXASession.getXAResource().start(newXID(), 0);
            linkedList.add(createXASession);
        }
        createXAConnection.close();
    }

    @Test
    public void testClientACK() throws Exception {
        try {
            Connection createConnection = this.factory.createConnection();
            Session createSession = createConnection.createSession(false, 2);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createProducer.send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
            Assertions.assertNull(createConsumer.receive(100L));
            createConnection.start();
            TextMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            createConnection.close();
            System.err.println("Done!!!");
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    @Test
    public void testSessionCloseWithOpenConnection() throws Exception {
        Connection createConnection = this.factory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 2);
            Queue createQueue = createSession.createQueue(this.queueName);
            createSession.createConsumer(createQueue);
            createSession.createConsumer(createQueue);
            createConnection.start();
            Field declaredField = ActiveMQSession.class.getDeclaredField("info");
            declaredField.setAccessible(true);
            SessionInfo sessionInfo = (SessionInfo) declaredField.get(createSession);
            List connections = ((OpenWireProtocolManager) this.server.getRemotingService().getAcceptor("netty").getProtocolMap().get("OPENWIRE")).getConnections();
            Assertions.assertEquals(1, connections.size());
            SessionState sessionState = ((OpenWireConnection) connections.get(0)).getState().getSessionState(sessionInfo.getSessionId());
            Set consumerIds = sessionState.getConsumerIds();
            Objects.requireNonNull(consumerIds);
            Wait.assertEquals(2, consumerIds::size, 5000L);
            createSession.close();
            Set consumerIds2 = sessionState.getConsumerIds();
            Objects.requireNonNull(consumerIds2);
            Wait.assertEquals(0, consumerIds2::size, 5000L);
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testRollback() throws Exception {
        Connection createConnection = this.factory.createConnection();
        try {
            Session createSession = createConnection.createSession(true, 0);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createProducer.send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
            createProducer.send(createSession.createTextMessage("test2"));
            createConnection.start();
            Assertions.assertNull(createConsumer.receiveNoWait());
            createSession.rollback();
            createProducer.send(createSession.createTextMessage("test2"));
            Assertions.assertNull(createConsumer.receiveNoWait());
            createSession.commit();
            TextMessage receive = createConsumer.receive(1000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("test2", receive.getText());
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoAck() throws Exception {
        Connection createConnection = this.factory.createConnection();
        new LinkedList();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
        createTextMessage.setStringProperty("abc", "testAutoACK");
        createProducer.send(createTextMessage);
        Assertions.assertNull(createConsumer.receive(100L));
        createConnection.start();
        Assertions.assertNotNull(createConsumer.receive(5000L));
        createConnection.close();
        System.err.println("Done!!!");
    }

    @Test
    public void testProducerFlowControl() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
        activeMQConnectionFactory.setProducerWindowSize(65536);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Session createSession = createConnection.createSession(true, 0);
        createSession.createProducer(createSession.createQueue(this.queueName)).send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
        createConnection.close();
    }

    @Test
    public void testCompression() throws Exception {
        Connection connection = null;
        Connection connection2 = null;
        try {
            connection = new ActiveMQConnectionFactory(SimpleManagementTest.LOCALHOST).createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.queueName));
            connection2 = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.useCompression=true").createConnection();
            Session createSession2 = connection2.createSession(false, 1);
            MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue(this.queueName));
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession2.createTextMessage();
            createTextMessage.setText("simple test string");
            Assertions.assertEquals("simple test string", sendAndReceive(createTextMessage, createProducer, createConsumer).getText());
            MapMessage createMapMessage = createSession2.createMapMessage();
            createMapMessage.setString("BASE_DATE", "2017-11-01");
            Assertions.assertEquals("2017-11-01", sendAndReceive(createMapMessage, createProducer, createConsumer).getString("BASE_DATE"));
            ObjectMessage createObjectMessage = createSession2.createObjectMessage();
            createObjectMessage.setObject("simple test string");
            Assertions.assertEquals("simple test string", (String) sendAndReceive(createObjectMessage, createProducer, createConsumer).getObject());
            StreamMessage createStreamMessage = createSession2.createStreamMessage();
            createStreamMessage.writeString("simple test string");
            Assertions.assertEquals("simple test string", sendAndReceive(createStreamMessage, createProducer, createConsumer).readString());
            BytesMessage createBytesMessage = createSession2.createBytesMessage();
            createBytesMessage.writeBytes("simple test string".getBytes());
            BytesMessage sendAndReceive = sendAndReceive(createBytesMessage, createProducer, createConsumer);
            long bodyLength = sendAndReceive.getBodyLength();
            Assertions.assertEquals(r0.length, bodyLength, "bodylength Correct");
            byte[] bArr = new byte[(int) bodyLength];
            sendAndReceive.readBytes(bArr);
            Assertions.assertEquals("simple test string", new String(bArr));
            sendAndReceive(createSession2.createMessage(), createProducer, createConsumer);
            if (connection != null) {
                connection2.close();
            }
            if (connection2 != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection2.close();
            }
            if (connection2 != null) {
                connection.close();
            }
            throw th;
        }
    }

    private <T extends Message> T sendAndReceive(T t, MessageProducer messageProducer, MessageConsumer messageConsumer) throws JMSException {
        t.setStringProperty("BASE_DATE", "2017-11-01");
        messageProducer.send(t);
        T t2 = (T) messageConsumer.receive(1000L);
        Assertions.assertEquals("2017-11-01", t2.getStringProperty("BASE_DATE"));
        return t2;
    }

    @Test
    public void testSimpleQueue() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(this.queueName);
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        for (int i = 0; i < 1; i++) {
            createProducer.send(createSession.createTextMessage("MfromAMQ-" + i));
        }
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        for (int i2 = 0; i2 < 1; i2++) {
            Assertions.assertEquals("MfromAMQ-" + i2, createConsumer.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer.receive(1000L));
        createSession.close();
    }

    @Test
    public void testSendReceiveDifferentEncoding() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(this.queueName);
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("MfromAMQ-" + i));
        }
        ActiveMQConnection createConnection = this.looseFactory.createConnection();
        try {
            createConnection.start();
            Session createSession2 = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession2.createConsumer(activeMQQueue);
            for (int i2 = 0; i2 < 10; i2++) {
                Assertions.assertEquals("MfromAMQ-" + i2, createConsumer.receive(5000L).getText());
            }
            Assertions.assertNull(createConsumer.receive(1000L));
            createConsumer.close();
            MessageProducer createProducer2 = createSession2.createProducer(activeMQQueue);
            for (int i3 = 0; i3 < 10; i3++) {
                createProducer2.send(createSession2.createTextMessage("MfromAMQ-" + i3));
            }
            MessageConsumer createConsumer2 = createSession.createConsumer(activeMQQueue);
            for (int i4 = 0; i4 < 10; i4++) {
                TextMessage receive = createConsumer2.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals("MfromAMQ-" + i4, receive.getText());
            }
            Assertions.assertNull(createConsumer2.receive(1000L));
            createSession.close();
            createSession2.close();
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Disabled("ignored for now")
    @Test
    public void testKeepAlive() throws Exception {
        this.connection.start();
        Thread.sleep(AmqpConnection.DEFAULT_CLOSE_TIMEOUT);
        this.connection.createSession(false, 1);
    }

    @Test
    public void testSimpleTopic() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.topicName);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQTopic);
        MessageConsumer createConsumer2 = createSession.createConsumer(activeMQTopic);
        MessageProducer createProducer = createSession.createProducer(activeMQTopic);
        for (int i = 0; i < 1; i++) {
            createProducer.send(createSession.createTextMessage("MfromAMQ-" + i));
        }
        for (int i2 = 0; i2 < 1; i2++) {
            Assertions.assertEquals("MfromAMQ-" + i2, createConsumer.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer.receive(500L));
        for (int i3 = 0; i3 < 1; i3++) {
            Assertions.assertEquals("MfromAMQ-" + i3, createConsumer2.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer2.receive(500L));
        createSession.close();
    }

    @Test
    public void testTopicNoLocal() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.topicName);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQTopic, (String) null, true);
        MessageConsumer createConsumer2 = createSession.createConsumer(activeMQTopic, (String) null, false);
        MessageConsumer createConsumer3 = createSession.createConsumer(activeMQTopic, "TESTKEY = 'test'", false);
        MessageProducer createProducer = createSession.createProducer(activeMQTopic);
        createProducer.send(createSession.createTextMessage("MfromAMQ-1"));
        TextMessage createTextMessage = createSession.createTextMessage("MfromAMQ-2");
        createTextMessage.setStringProperty("TESTKEY", AutoCreateJmsDestinationTest.QUEUE_NAME);
        createProducer.send(createTextMessage);
        TextMessage receive = createConsumer.receive(1000L);
        Assertions.assertNull(receive, "nolocal consumer got: " + receive);
        TextMessage receive2 = createConsumer2.receive(1000L);
        Assertions.assertNotNull(receive2);
        Assertions.assertEquals("MfromAMQ-1", receive2.getText());
        TextMessage receive3 = createConsumer2.receive(1000L);
        Assertions.assertNotNull(receive3);
        Assertions.assertEquals("MfromAMQ-2", receive3.getText());
        Assertions.assertNull(createConsumer2.receiveNoWait());
        TextMessage receive4 = createConsumer3.receive(1000L);
        Assertions.assertNotNull(receive4);
        Assertions.assertEquals("MfromAMQ-2", receive4.getText());
        Assertions.assertEquals(AutoCreateJmsDestinationTest.QUEUE_NAME, receive4.getStringProperty("TESTKEY"));
        Assertions.assertNull(createConsumer3.receiveNoWait());
        Connection createConnection = this.factory.createConnection();
        try {
            createConnection.start();
            Session createSession2 = createConnection.createSession(false, 1);
            createSession2.createProducer(activeMQTopic).send(createSession2.createTextMessage("MfromAMQ-1"));
            Assertions.assertNotNull(createConsumer2.receive(1000L));
            Assertions.assertNull(createConsumer3.receive(1000L));
            Assertions.assertNotNull(createConsumer.receive(1000L));
            createConnection.close();
            createSession.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test
    public void testTopicNoLocalDurable() throws Exception {
        this.connection.setClientID("forNoLocal-1");
        this.connection.start();
        TopicSession createTopicSession = this.connection.createTopicSession(false, 1);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.topicName);
        TopicSubscriber createDurableSubscriber = createTopicSession.createDurableSubscriber(activeMQTopic, "nolocal-subscriber1", "", true);
        TopicSubscriber createDurableSubscriber2 = createTopicSession.createDurableSubscriber(activeMQTopic, "normal-subscriber", (String) null, false);
        TopicSubscriber createDurableSubscriber3 = createTopicSession.createDurableSubscriber(activeMQTopic, "selector-subscriber", "TESTKEY = 'test'", false);
        MessageProducer createProducer = createTopicSession.createProducer(activeMQTopic);
        createProducer.send(createTopicSession.createTextMessage("MfromAMQ-1"));
        TextMessage createTextMessage = createTopicSession.createTextMessage("MfromAMQ-2");
        createTextMessage.setStringProperty("TESTKEY", AutoCreateJmsDestinationTest.QUEUE_NAME);
        createProducer.send(createTextMessage);
        TextMessage receive = createDurableSubscriber.receive(1000L);
        Assertions.assertNull(receive, "nolocal consumer got: " + receive);
        TextMessage receive2 = createDurableSubscriber2.receive(1000L);
        Assertions.assertNotNull(receive2);
        Assertions.assertEquals("MfromAMQ-1", receive2.getText());
        TextMessage receive3 = createDurableSubscriber2.receive(1000L);
        Assertions.assertNotNull(receive3);
        Assertions.assertEquals("MfromAMQ-2", receive3.getText());
        Assertions.assertNull(createDurableSubscriber2.receiveNoWait());
        TextMessage receive4 = createDurableSubscriber3.receive(1000L);
        Assertions.assertNotNull(receive4);
        Assertions.assertEquals("MfromAMQ-2", receive4.getText());
        Assertions.assertEquals(AutoCreateJmsDestinationTest.QUEUE_NAME, receive4.getStringProperty("TESTKEY"));
        Assertions.assertNull(createDurableSubscriber3.receiveNoWait());
        Connection createConnection = this.factory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            createSession.createProducer(activeMQTopic).send(createSession.createTextMessage("MfromAMQ-1"));
            Assertions.assertNotNull(createDurableSubscriber2.receive(1000L));
            Assertions.assertNull(createDurableSubscriber3.receive(1000L));
            Assertions.assertNotNull(createDurableSubscriber.receive(1000L));
            createConnection.close();
            createTopicSession.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test
    public void testTempTopicDelete() throws Exception {
        this.connection.start();
        TemporaryTopic createTemporaryTopic = this.connection.createTopicSession(false, 1).createTemporaryTopic();
        ActiveMQConnection createConnection = this.factory.createConnection();
        try {
            TopicSession createTopicSession = createConnection.createTopicSession(false, 1);
            TopicPublisher createPublisher = createTopicSession.createPublisher(createTemporaryTopic);
            Assertions.assertTrue(Wait.waitFor(() -> {
                return createConnection.activeTempDestinations.size() == 1;
            }, 2000L, 100L));
            createPublisher.publish(createTopicSession.createTextMessage("Test Message"));
            try {
                Assertions.fail("should have gotten exception but got consumer: " + createTopicSession.createSubscriber(createTemporaryTopic));
            } catch (JMSException e) {
            }
            this.connection.close();
            try {
                createPublisher.publish(createTopicSession.createMessage());
            } catch (JMSException e2) {
            }
        } finally {
            createConnection.close();
        }
    }

    @Test
    public void testTempQueueDelete() throws Exception {
        this.connection.start();
        QueueSession createQueueSession = this.connection.createQueueSession(false, 1);
        TemporaryQueue createTemporaryQueue = createQueueSession.createTemporaryQueue();
        ActiveMQConnection createConnection = this.factory.createConnection();
        try {
            QueueSession createQueueSession2 = createConnection.createQueueSession(false, 1);
            QueueSender createSender = createQueueSession2.createSender(createTemporaryQueue);
            createSender.send(createQueueSession.createMessage());
            try {
                Assertions.fail("should have gotten exception but got consumer: " + createQueueSession2.createReceiver(createTemporaryQueue));
            } catch (JMSException e) {
            }
            this.connection.close();
            try {
                createSender.send(createQueueSession2.createMessage());
            } catch (JMSException e2) {
            }
        } finally {
            createConnection.close();
        }
    }

    @Test
    public void testSimpleTempTopic() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        TemporaryTopic createTemporaryTopic = createSession.createTemporaryTopic();
        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryTopic);
        MessageConsumer createConsumer2 = createSession.createConsumer(createTemporaryTopic);
        MessageProducer createProducer = createSession.createProducer(createTemporaryTopic);
        for (int i = 0; i < 1; i++) {
            createProducer.send(createSession.createTextMessage("MfromAMQ-" + i));
        }
        for (int i2 = 0; i2 < 1; i2++) {
            Assertions.assertEquals("MfromAMQ-" + i2, createConsumer.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer.receive(500L));
        for (int i3 = 0; i3 < 1; i3++) {
            Assertions.assertEquals("MfromAMQ-" + i3, createConsumer2.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer2.receive(500L));
        createSession.close();
    }

    @Test
    public void testSimpleTempQueue() throws Exception {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateQueues(true);
        addressSettings.setAutoCreateAddresses(true);
        this.server.getAddressSettingsRepository().addMatch("#", addressSettings);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
        MessageProducer createProducer = createSession.createProducer(createTemporaryQueue);
        for (int i = 0; i < 1; i++) {
            createProducer.send(createSession.createTextMessage("MfromAMQ-" + i));
        }
        for (int i2 = 0; i2 < 1; i2++) {
            Assertions.assertEquals("MfromAMQ-" + i2, createConsumer.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer.receive(500L));
        createSession.close();
    }

    @Test
    public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        try {
            createSession.createProducer(createSession.createQueue("foo"));
            Assertions.fail("Should have thrown an exception creating a producer here");
        } catch (JMSException e) {
        }
        createSession.close();
    }

    @Test
    public void testAutoDestinationCreationOnProducerSend() throws JMSException {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateQueues(true);
        addressSettings.setAutoCreateAddresses(true);
        this.server.getAddressSettingsRepository().addMatch("foo", addressSettings);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        TextMessage createTextMessage = createSession.createTextMessage("bar");
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("foo");
        createSession.createProducer((Destination) null).send(activeMQQueue, createTextMessage);
        Assertions.assertTrue(createSession.createConsumer(activeMQQueue).receive(1000L).getText().equals(createTextMessage.getText()));
    }

    @Test
    public void testAutoDestinationCreationAndDeletionOnConsumer() throws Exception {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateQueues(true);
        addressSettings.setAutoCreateAddresses(true);
        addressSettings.setAutoDeleteQueues(true);
        addressSettings.setAutoDeleteAddresses(true);
        this.server.getAddressSettingsRepository().addMatch("foo", addressSettings);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        TextMessage createTextMessage = createSession.createTextMessage("bar");
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("foo");
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        Assertions.assertTrue(Wait.waitFor(() -> {
            return this.server.locateQueue(SimpleString.of("foo")) != null;
        }, 2000L, 100L));
        Assertions.assertTrue(Wait.waitFor(() -> {
            return this.server.getAddressInfo(SimpleString.of("foo")) != null;
        }, 2000L, 100L));
        createSession.createProducer((Destination) null).send(activeMQQueue, createTextMessage);
        Assertions.assertTrue(createConsumer.receive(1000L).getText().equals(createTextMessage.getText()));
        Assertions.assertNotNull(this.server.locateQueue(SimpleString.of("foo")));
        createConsumer.close();
        this.connection.close();
        Assertions.assertTrue(Wait.waitFor(() -> {
            return this.server.locateQueue(SimpleString.of("foo")) == null;
        }, 2000L, 100L));
        Assertions.assertTrue(Wait.waitFor(() -> {
            return this.server.getAddressInfo(SimpleString.of("foo")) == null;
        }, 2000L, 100L));
    }

    @Test
    public void testAutoDestinationNoCreationOnConsumer() throws JMSException {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateQueues(false);
        this.server.getAddressSettingsRepository().addMatch("foo", addressSettings);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        createSession.createTextMessage("bar");
        try {
            createSession.createConsumer(new ActiveMQQueue("foo"));
            Assertions.fail("supposed to throw an exception here");
        } catch (JMSException e) {
        }
    }

    @Test
    public void testFailoverTransportReconnect() throws Exception {
        Connection connection = null;
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
            ActiveMQQueue activeMQQueue = new ActiveMQQueue(this.durableQueueName);
            connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(activeMQQueue);
            createProducer.send(createSession.createTextMessage("Test"));
            MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
            Assertions.assertNotNull(createConsumer.receive(5000L));
            this.server.stop();
            Thread.sleep(3000L);
            this.server.start();
            this.server.waitForActivation(10L, TimeUnit.SECONDS);
            createProducer.send(createSession.createTextMessage("Test2"));
            Assertions.assertNotNull(createConsumer.receive(5000L));
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testOpenWireExample() throws Exception {
        Connection connection = null;
        this.server.createQueue(QueueConfiguration.of("exampleQueue").setRoutingType(RoutingType.ANYCAST));
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            ActiveMQQueue activeMQQueue = new ActiveMQQueue(this.durableQueueName);
            connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("This is a text message"));
            Assertions.assertEquals("This is a text message", createSession.createConsumer(activeMQQueue).receive(5000L).getText());
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testMultipleConsumers() throws Exception {
        Connection connection = null;
        this.server.createQueue(QueueConfiguration.of("exampleQueue").setRoutingType(RoutingType.ANYCAST));
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            ActiveMQQueue activeMQQueue = new ActiveMQQueue(this.durableQueueName);
            connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("This is a text message"));
            Assertions.assertEquals("This is a text message", createSession.createConsumer(activeMQQueue).receive(5000L).getText());
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testMixedOpenWireExample() throws Exception {
        this.server.createQueue(QueueConfiguration.of("exampleQueue").setRoutingType(RoutingType.ANYCAST));
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("exampleQueue");
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("This is a text message"));
        Connection createConnection2 = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory().createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        createConnection2.start();
        Assertions.assertEquals("This is a text message", createSession2.createConsumer(createSession2.createQueue("exampleQueue")).receive(5000L).getText());
        createConnection.close();
        createConnection2.close();
    }

    @Test
    public void testMixedOpenWireExample2() throws Exception {
        this.server.createQueue(QueueConfiguration.of("exampleQueue").setRoutingType(RoutingType.ANYCAST));
        Queue createQueue = ActiveMQJMSClient.createQueue("exampleQueue");
        Connection createConnection = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory().createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("This is a text message"));
        }
        Connection createConnection2 = new ActiveMQConnectionFactory().createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        createConnection2.start();
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue("exampleQueue"));
        for (int i2 = 0; i2 < 10; i2++) {
            Assertions.assertEquals("This is a text message", createConsumer.receive(5000L).getText());
        }
        createConnection.close();
        createConnection2.close();
    }

    @Test
    public void testXAConsumer() throws Exception {
        Session createSession = this.connection.createSession(false, 2);
        try {
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            for (int i = 0; i < 10; i++) {
                TextMessage createTextMessage = createSession.createTextMessage("test" + i);
                createTextMessage.setStringProperty("myobj", "test" + i);
                createProducer.send(createTextMessage);
            }
            createSession.close();
            if (createSession != null) {
                createSession.close();
            }
            XAConnection createXAConnection = this.xaFactory.createXAConnection();
            try {
                XidImpl newXID = newXID();
                XASession createXASession = createXAConnection.createXASession();
                createXASession.getXAResource().start(newXID, 0);
                MessageConsumer createConsumer = createXASession.createConsumer(createQueue);
                createXAConnection.start();
                for (int i2 = 0; i2 < 5; i2++) {
                    TextMessage receive = createConsumer.receive(5000L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertEquals("test" + i2, receive.getText());
                }
                createXASession.getXAResource().end(newXID, 67108864);
                createXASession.getXAResource().rollback(newXID);
                createConsumer.close();
                createXAConnection.close();
                if (createXAConnection != null) {
                    createXAConnection.close();
                }
                createSession = this.connection.createSession(false, 1);
                try {
                    this.connection.start();
                    MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
                    for (int i3 = 0; i3 < 10; i3++) {
                        Assertions.assertNotNull(createConsumer2.receive(5000L));
                    }
                    checkDuplicate(createConsumer2);
                    createSession.close();
                    if (createSession != null) {
                        createSession.close();
                    }
                    System.err.println("Done!!!");
                } finally {
                }
            } catch (Throwable th) {
                if (createXAConnection != null) {
                    try {
                        createXAConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } finally {
        }
    }

    @Test
    public void testXASameConsumerRollback() throws Exception {
        Session createSession = this.connection.createSession(false, 2);
        try {
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            for (int i = 0; i < 10; i++) {
                TextMessage createTextMessage = createSession.createTextMessage("test" + i);
                createTextMessage.setStringProperty("myobj", "test" + i);
                createProducer.send(createTextMessage);
            }
            createSession.close();
            if (createSession != null) {
                createSession.close();
            }
            XAConnection createXAConnection = this.xaFactory.createXAConnection();
            try {
                XidImpl newXID = newXID();
                XASession createXASession = createXAConnection.createXASession();
                createXASession.getXAResource().start(newXID, 0);
                MessageConsumer createConsumer = createXASession.createConsumer(createQueue);
                createXAConnection.start();
                for (int i2 = 0; i2 < 5; i2++) {
                    TextMessage receive = createConsumer.receive(5000L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertEquals("test" + i2, receive.getText());
                }
                createXASession.getXAResource().end(newXID, 67108864);
                createXASession.getXAResource().rollback(newXID);
                XidImpl newXID2 = newXID();
                createXASession.getXAResource().start(newXID2, 0);
                for (int i3 = 0; i3 < 10; i3++) {
                    TextMessage receive2 = createConsumer.receive(5000L);
                    Assertions.assertNotNull(receive2);
                    Assertions.assertEquals("test" + i3, receive2.getText());
                }
                checkDuplicate(createConsumer);
                createXASession.getXAResource().end(newXID2, 67108864);
                createXASession.getXAResource().commit(newXID2, true);
                if (createXAConnection != null) {
                    createXAConnection.close();
                }
            } catch (Throwable th) {
                if (createXAConnection != null) {
                    try {
                        createXAConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createSession != null) {
                try {
                    createSession.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testXAPrepare() throws Exception {
        try {
            XAConnection createXAConnection = this.xaFactory.createXAConnection();
            XASession createXASession = createXAConnection.createXASession();
            XidImpl newXID = newXID();
            createXASession.getXAResource().start(newXID, 0);
            MessageProducer createProducer = createXASession.createProducer(createXASession.createQueue(this.queueName));
            createProducer.send(createXASession.createTextMessage("hello"));
            createProducer.send(createXASession.createTextMessage("hello"));
            createXASession.getXAResource().end(newXID, 67108864);
            createXASession.getXAResource().prepare(newXID);
            createXAConnection.close();
            System.err.println("Done!!!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testAutoSend() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("testXX" + i));
        }
        this.connection.start();
        for (int i2 = 0; i2 < 10; i2++) {
            Assertions.assertEquals("testXX" + i2, createConsumer.receive(5000L).getText());
        }
    }

    @Test
    public void testSendReceiveUsingTtl() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover://tcp://localhost:61616?wireFormat.maxInactivityDuration=5000&wireFormat.maxInactivityDurationInitalDelay=1000");
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        try {
            AsyncConsumer asyncConsumer = new AsyncConsumer(this.queueName, createConnection2, 2, 2L, 10);
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.queueName));
            for (int i = 0; i < 10; i++) {
                createProducer.send(createSession.createTextMessage("testXX" + i));
            }
            asyncConsumer.waitFor(40L);
            createConnection.close();
            createConnection2.close();
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testCommitCloseConsumerBefore() throws Exception {
        testCommitCloseConsumer(true);
    }

    @Test
    public void testCommitCloseConsumerAfter() throws Exception {
        testCommitCloseConsumer(false);
    }

    private void testCommitCloseConsumer(boolean z) throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("testXX" + i);
            createTextMessage.setStringProperty("count", "str " + i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        this.connection.start();
        for (int i2 = 0; i2 < 5; i2++) {
            Assertions.assertEquals("testXX" + i2, createConsumer.receive(5000L).getText());
        }
        if (z) {
            createConsumer.close();
        }
        createSession.commit();
        if (!z) {
            createConsumer.close();
        }
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        for (int i3 = 5; i3 < 10; i3++) {
            Assertions.assertEquals("testXX" + i3, createConsumer2.receive(5000L).getText());
        }
        Assertions.assertNull(createConsumer2.receiveNoWait());
    }

    @Test
    public void testRollbackWithAcked() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("testXX" + i);
            createTextMessage.setStringProperty("count", "str " + i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        this.connection.start();
        for (int i2 = 0; i2 < 5; i2++) {
            Assertions.assertEquals("testXX" + i2, createConsumer.receive(5000L).getText());
        }
        createSession.rollback();
        createConsumer.close();
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        for (int i3 = 0; i3 < 10; i3++) {
            Assertions.assertNotNull(createConsumer2.receive(5000L));
        }
        createSession.commit();
        checkDuplicate(createConsumer2);
    }

    @Test
    public void testRollbackLocal() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("testXX" + i);
            createTextMessage.setStringProperty("count", "str " + i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        this.connection.start();
        for (int i2 = 0; i2 < 5; i2++) {
            Assertions.assertEquals("testXX" + i2, createConsumer.receive(500L).getText());
        }
        createSession.rollback();
        for (int i3 = 0; i3 < 10; i3++) {
            TextMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("testXX" + i3, receive.getText());
        }
        checkDuplicate(createConsumer);
        createSession.commit();
    }

    private void checkDuplicate(MessageConsumer messageConsumer) throws JMSException {
        boolean z = false;
        while (true) {
            TextMessage receiveNoWait = messageConsumer.receiveNoWait();
            if (receiveNoWait == null) {
                Assertions.assertFalse(z, "received messages in duplicate");
                return;
            } else {
                z = true;
                logger.warn("received in duplicate:{}", receiveNoWait.getText());
            }
        }
    }

    @Test
    public void testIndividualAck() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 4);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("testXX" + i);
            createTextMessage.setStringProperty("count", "str " + i);
            createProducer.send(createTextMessage);
        }
        this.connection.start();
        for (int i2 = 0; i2 < 5; i2++) {
            TextMessage receive = createConsumer.receive(5000L);
            if (i2 == 4) {
                receive.acknowledge();
            }
            Assertions.assertEquals("testXX" + i2, receive.getText());
        }
        createConsumer.close();
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        for (int i3 = 0; i3 < 4; i3++) {
            TextMessage receive2 = createConsumer2.receive(5000L);
            receive2.acknowledge();
            Assertions.assertEquals("testXX" + i3, receive2.getText());
        }
        for (int i4 = 5; i4 < 10; i4++) {
            TextMessage receive3 = createConsumer2.receive(5000L);
            receive3.acknowledge();
            Assertions.assertEquals("testXX" + i4, receive3.getText());
        }
        checkDuplicate(createConsumer2);
        Assertions.assertNull(createConsumer2.receiveNoWait());
    }

    @Test
    public void testCommitCloseConsumeXA() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("testXX" + i);
            createTextMessage.setStringProperty("count", "str " + i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        XAConnection createXAConnection = this.xaFactory.createXAConnection();
        try {
            createXAConnection.start();
            XASession createXASession = createXAConnection.createXASession();
            XidImpl newXID = newXID();
            createXASession.getXAResource().start(newXID, 0);
            MessageConsumer createConsumer = createXASession.createConsumer(createQueue);
            for (int i2 = 0; i2 < 5; i2++) {
                Assertions.assertEquals("testXX" + i2, createConsumer.receive(5000L).getText());
            }
            createConsumer.close();
            createXASession.getXAResource().end(newXID, 67108864);
            createXASession.getXAResource().prepare(newXID);
            createXASession.getXAResource().commit(newXID, false);
            createXAConnection.close();
            if (createXAConnection != null) {
                createXAConnection.close();
            }
            this.connection.start();
            MessageConsumer createConsumer2 = this.connection.createSession(true, 0).createConsumer(createQueue);
            for (int i3 = 5; i3 < 10; i3++) {
                try {
                    Assertions.assertEquals("testXX" + i3, createConsumer2.receive(5000L).getText());
                } catch (Throwable th) {
                    if (createConsumer2 != null) {
                        try {
                            createConsumer2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (createConsumer2 != null) {
                createConsumer2.close();
            }
        } catch (Throwable th3) {
            if (createXAConnection != null) {
                try {
                    createXAConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTempQueueSendAfterConnectionClose() throws Exception {
        Connection connection = null;
        Connection createConnection = this.factory.createConnection();
        try {
            connection = this.factory.createConnection();
            connection.start();
            createConnection.start();
            Session createSession = connection.createSession(false, 1);
            TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
            Session createSession2 = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession2.createProducer(createTemporaryQueue);
            Assertions.assertTrue(Wait.waitFor(() -> {
                return ((ActiveMQConnection) createConnection).activeTempDestinations.size() == 1;
            }, 2000L, 100L));
            createProducer.setDeliveryMode(1);
            TextMessage createTextMessage = createSession2.createTextMessage("Hello temp queue");
            createProducer.send(createTextMessage);
            TextMessage receive = createSession.createConsumer(createTemporaryQueue).receive(5000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("Hello temp queue", receive.getText());
            connection.close();
            Assertions.assertTrue(Wait.waitFor(() -> {
                return ((ActiveMQConnection) createConnection).activeTempDestinations.size() == 0;
            }, 2000L, 100L));
            waitForBindings(this.server, createTemporaryQueue.getQueueName(), true, 0, 0, 5000L);
            try {
                createProducer.send(createTextMessage);
                Assertions.fail("Send should fail since temp destination should not exist anymore.");
            } catch (InvalidDestinationException e) {
            }
            if (connection != null) {
                connection.close();
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    @Test
    public void testNotificationProperties() throws Exception {
        TopicConnection createTopicConnection = this.factory.createTopicConnection();
        try {
            TopicSession createTopicSession = createTopicConnection.createTopicSession(false, 1);
            TopicSubscriber createSubscriber = createTopicSession.createSubscriber(createTopicSession.createTopic("activemq.notifications"));
            CopyOnWriteArrayList<Message> copyOnWriteArrayList = new CopyOnWriteArrayList();
            Objects.requireNonNull(copyOnWriteArrayList);
            createSubscriber.setMessageListener((v1) -> {
                r1.add(v1);
            });
            createTopicConnection.start();
            Wait.waitFor(() -> {
                return copyOnWriteArrayList.size() > 0;
            });
            Assertions.assertTrue(copyOnWriteArrayList.size() > 0);
            for (Message message : copyOnWriteArrayList) {
                Assertions.assertNotNull(message);
                Assertions.assertNotNull(message.getStringProperty("_AMQ_NotifType"));
            }
            if (createTopicConnection != null) {
                createTopicConnection.close();
            }
        } catch (Throwable th) {
            if (createTopicConnection != null) {
                try {
                    createTopicConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testXAResourceCommitSuspendedNotRemoved() throws Exception {
        XidImpl newXID = newXID();
        try {
            XAConnection createXAConnection = this.xaFactory.createXAConnection();
            try {
                XASession createXASession = createXAConnection.createXASession();
                createXASession.createQueue(this.queueName);
                createXASession.getXAResource().start(newXID, 0);
                createXASession.getXAResource().end(newXID, 33554432);
                this.server.getResourceManager().getTransaction(new XidImpl(newXID)).suspend();
                createXASession.getXAResource().commit(newXID, true);
                if (createXAConnection != null) {
                    createXAConnection.close();
                }
                Assertions.assertNotNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
            } finally {
            }
        } catch (XAException e) {
            Assertions.assertNotNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
        } catch (Throwable th) {
            Assertions.assertNotNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
            throw th;
        }
    }

    @Test
    public void testXAResourceRolledBackSuspendedNotRemoved() throws Exception {
        XidImpl newXID = newXID();
        try {
            XAConnection createXAConnection = this.xaFactory.createXAConnection();
            try {
                XASession createXASession = createXAConnection.createXASession();
                createXASession.createQueue(this.queueName);
                createXASession.getXAResource().start(newXID, 0);
                createXASession.getXAResource().end(newXID, 33554432);
                this.server.getResourceManager().getTransaction(new XidImpl(newXID)).suspend();
                createXASession.getXAResource().rollback(newXID);
                if (createXAConnection != null) {
                    createXAConnection.close();
                }
                Assertions.assertNotNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
            } finally {
            }
        } catch (XAException e) {
            Assertions.assertNotNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
        } catch (Throwable th) {
            Assertions.assertNotNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
            throw th;
        }
    }

    @Test
    public void testXAResourceCommittedRemoved() throws Exception {
        XidImpl newXID = newXID();
        XAConnection createXAConnection = this.xaFactory.createXAConnection();
        try {
            XASession createXASession = createXAConnection.createXASession();
            Queue createQueue = createXASession.createQueue(this.queueName);
            createXASession.getXAResource().start(newXID, 0);
            createXASession.createProducer(createQueue).send(createXASession.createTextMessage("xa message"));
            createXASession.getXAResource().end(newXID, 67108864);
            createXASession.getXAResource().commit(newXID, true);
            if (createXAConnection != null) {
                createXAConnection.close();
            }
            Assertions.assertNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
        } catch (Throwable th) {
            if (createXAConnection != null) {
                try {
                    createXAConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testXAResourceRolledBackRemoved() throws Exception {
        XidImpl newXID = newXID();
        XAConnection createXAConnection = this.xaFactory.createXAConnection();
        try {
            XASession createXASession = createXAConnection.createXASession();
            Queue createQueue = createXASession.createQueue(this.queueName);
            createXASession.getXAResource().start(newXID, 0);
            createXASession.createProducer(createQueue).send(createXASession.createTextMessage("xa message"));
            createXASession.getXAResource().end(newXID, 67108864);
            createXASession.getXAResource().rollback(newXID);
            if (createXAConnection != null) {
                createXAConnection.close();
            }
            Assertions.assertNull(this.server.getResourceManager().getTransaction(new XidImpl(newXID)));
        } catch (Throwable th) {
            if (createXAConnection != null) {
                try {
                    createXAConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testPropertyConversions() throws Exception {
        String randomString = RandomUtil.randomString();
        String randomString2 = RandomUtil.randomString();
        String randomString3 = RandomUtil.randomString();
        Connection createConnection = this.factory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
            createTextMessage.setStringProperty("__HDR_BROKER_PATH", randomString);
            createTextMessage.setStringProperty("__HDR_CLUSTER", randomString2);
            createTextMessage.setStringProperty("__HDR_USER_ID", randomString3);
            createProducer.send(createTextMessage);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createConnection.start();
            Assertions.assertNotNull(createConsumer.receive(5000L));
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void checkQueueEmpty(String str) {
        LocalQueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(str));
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount());
    }
}
