package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.qpid.jms.JmsConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.class */
public class JMSMessageConsumerTest extends JMSClientTestSupport {
    protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Timeout(60)
    @Test
    public void testSelectorOnTopic() throws Exception {
        doTestSelector(true);
    }

    @Timeout(60)
    @Test
    public void testSelectorOnQueue() throws Exception {
        doTestSelector(false);
    }

    private void doTestSelector(boolean z) throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic, "color = 'RED'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("msg:0");
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("msg:1");
            createTextMessage2.setStringProperty("color", "RED");
            createProducer.send(createTextMessage2);
            createConnection.start();
            TextMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("msg:1", receive.getText());
            Assertions.assertEquals(receive.getStringProperty("color"), "RED");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSTypeOnTopic() throws Exception {
        doTestSelectorsWithJMSType(true);
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSTypeOnQueue() throws Exception {
        doTestSelectorsWithJMSType(false);
    }

    private void doTestSelectorsWithJMSType(boolean z) throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic, "JMSType = 'myJMSType'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("text");
            createProducer.send(createTextMessage, 1, 4, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setJMSType("myJMSType");
            createTextMessage2.setText("text + type");
            createProducer.send(createTextMessage2, 1, 4, 0L);
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals("myJMSType", receive.getJMSType(), "Unexpected JMSType value");
            Assertions.assertEquals("text + type", receive.getText(), "Unexpected message content");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSCorrelationID() throws Exception {
        Connection createConnection = createConnection();
        String uuid = UUID.randomUUID().toString();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("text");
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setJMSCorrelationID(uuid);
            createTextMessage2.setText("JMSCorrelationID");
            createProducer.send(createTextMessage2);
            Enumeration enumeration = createSession.createBrowser(createQueue).getEnumeration();
            int i = 0;
            while (enumeration.hasMoreElements()) {
                Assertions.assertTrue(((Message) enumeration.nextElement()) instanceof TextMessage);
                i++;
            }
            Assertions.assertEquals(2, i);
            TextMessage receive = createSession.createConsumer(createQueue, "JMSCorrelationID = '" + uuid + "'").receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals(uuid, receive.getJMSCorrelationID(), "Unexpected JMSCorrelationID value");
            Assertions.assertEquals("JMSCorrelationID", receive.getText(), "Unexpected message content");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSPriority() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage, 2, 5, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("hello + 9");
            createProducer.send(createTextMessage2, 2, 9, 0L);
            Enumeration enumeration = createSession.createBrowser(createQueue).getEnumeration();
            int i = 0;
            while (enumeration.hasMoreElements()) {
                Assertions.assertTrue(((Message) enumeration.nextElement()) instanceof TextMessage);
                i++;
            }
            Assertions.assertEquals(2, i);
            TextMessage receive = createSession.createConsumer(createQueue, "JMSPriority > 8").receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals("hello + 9", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSXGroupIDOnTopic() throws Exception {
        doTestSelectorsWithJMSXGroupID(true);
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSXGroupIDOnQueue() throws Exception {
        doTestSelectorsWithJMSXGroupID(false);
    }

    private void doTestSelectorsWithJMSXGroupID(boolean z) throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic, "JMSXGroupID = '1'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("group 1 - 1");
            createTextMessage.setStringProperty("JMSXGroupID", "1");
            createTextMessage.setIntProperty("JMSXGroupSeq", 1);
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("group 2");
            createTextMessage2.setStringProperty("JMSXGroupID", "2");
            createProducer.send(createTextMessage2);
            TextMessage createTextMessage3 = createSession.createTextMessage();
            createTextMessage3.setText("group 1 - 2");
            createTextMessage3.setStringProperty("JMSXGroupID", "1");
            createTextMessage3.setIntProperty("JMSXGroupSeq", -1);
            createProducer.send(createTextMessage3);
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals("group 1 - 1", receive.getText());
            TextMessage receive2 = createConsumer.receive(2000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertTrue(receive2 instanceof TextMessage);
            Assertions.assertEquals("group 1 - 2", receive2.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSDeliveryOnQueue() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSDeliveryMode = 'PERSISTENT'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("non-persistent");
            createProducer.send(createTextMessage, 1, 4, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("persistent");
            createProducer.send(createTextMessage2, 2, 4, 0L);
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals(2, receive.getJMSDeliveryMode(), "Unexpected JMSDeliveryMode value");
            Assertions.assertEquals("persistent", receive.getText(), "Unexpected message content");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSTimestampOnQueue() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("filtered");
            createProducer.send(createTextMessage, 2, 4, 0L);
            Thread.sleep(2L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("expected");
            createProducer.send(createTextMessage2, 2, 4, 0L);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSTimestamp = " + createTextMessage2.getJMSTimestamp());
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals(createTextMessage2.getJMSTimestamp(), receive.getJMSTimestamp(), "Unexpected JMSTimestamp value");
            Assertions.assertEquals("expected", receive.getText(), "Unexpected message content");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testSelectorsWithJMSExpirationOnQueue() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("filtered");
            createProducer.send(createTextMessage, 2, 4, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("expected");
            createProducer.send(createTextMessage2, 2, 4, AmqpConnection.DEFAULT_DRAIN_TIMEOUT);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSExpiration = " + createTextMessage2.getJMSExpiration());
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            Assertions.assertNotNull(receive);
            Assertions.assertTrue(receive instanceof TextMessage);
            Assertions.assertEquals(createTextMessage2.getJMSExpiration(), receive.getJMSExpiration(), "Unexpected JMSExpiration value");
            Assertions.assertEquals("expected", receive.getText(), "Unexpected message content");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testJMSSelectorFiltersJMSMessageIDOnTopic() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createProducer.send(createTextMessage);
            createProducer.send(createSession.createTextMessage());
            createConnection.start();
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSMessageID = '" + createTextMessage.getJMSMessageID() + "'");
            TextMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals(createTextMessage.getJMSMessageID(), receive.getJMSMessageID());
            Assertions.assertNull(createConsumer.receive(1000L));
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testZeroPrefetchWithTwoConsumers() throws Exception {
        JmsConnection createConnection = createConnection();
        createConnection.getPrefetchPolicy().setAll(0);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(getQueueName());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.send(createSession.createTextMessage("Msg1"));
        createProducer.send(createSession.createTextMessage("Msg2"));
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        TextMessage receive = createConsumer.receive(5000L);
        Assertions.assertNotNull(receive);
        Assertions.assertEquals(receive.getText(), "Msg1", "Should have received a message!");
        TextMessage receive2 = createConsumer2.receive(5000L);
        Assertions.assertNotNull(receive2);
        Assertions.assertEquals(receive2.getText(), "Msg2", "Should have received a message!");
        Assertions.assertNull(createConsumer2.receiveNoWait(), "Should have not received a message!");
    }

    @Timeout(30)
    @Test
    public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(true, 2);
    }

    @Timeout(30)
    @Test
    public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(false, 2);
    }

    @Timeout(30)
    @Test
    public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(true, 1);
    }

    @Timeout(30)
    @Test
    public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(false, 1);
    }

    public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean z, int i) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        JmsConnection createConnection = createConnection();
        createConnection.setForceAsyncSend(true);
        createConnection.start();
        Session createSession = createConnection.createSession(false, i);
        Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
        createSession.createConsumer(createTopic).setMessageListener(message -> {
            try {
                message.acknowledge();
                countDownLatch.countDown();
            } catch (JMSException e) {
                logger.debug("Caught exception.", e);
            }
        });
        MessageProducer createProducer = createSession.createProducer(createTopic);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("messageText");
        for (int i2 = 0; i2 < 1000; i2++) {
            createProducer.send(createTextMessage);
        }
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS), "Did not receive all messages: 1000");
    }

    @Timeout(60)
    @Test
    public void testPrefetchedMessagesAreNotConsumedOnConsumerClose() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            new Random().nextBytes(new byte[2048]);
            for (int i = 0; i < 10; i++) {
                TextMessage createTextMessage = createSession.createTextMessage();
                createTextMessage.setText("msg:" + i);
                createProducer.send(createTextMessage);
            }
            createConnection.close();
            org.apache.activemq.artemis.core.server.Queue proxyToQueue = getProxyToQueue(getQueueName());
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(10L, proxyToQueue::getMessageCount);
            createConnection = createConnection();
            MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(createQueue);
            Thread.sleep(100L);
            createConsumer.close();
            createConnection.close();
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(10L, proxyToQueue::getMessageCount);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testMessagesReceivedInParallel() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        Thread thread = new Thread(() -> {
            Connection connection = null;
            try {
                try {
                    connection = createConnection();
                    connection.start();
                    Session createSession = connection.createSession(false, 1);
                    MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(getQueueName()));
                    long j = 0;
                    for (int i = 50000; i > 0; i--) {
                        try {
                            long j2 = j + 1;
                            j = j2;
                            if (j2 % 1000 == 0) {
                                logger.debug("received {} messages", Long.valueOf(j));
                            }
                            Assertions.assertNotNull(createConsumer.receive(5000L), "Could not receive message count=" + i + " on consumer");
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        connection.close();
                    } catch (Throwable th) {
                    }
                } catch (Throwable th2) {
                    arrayList.add(th2);
                    th2.printStackTrace();
                    try {
                        connection.close();
                    } catch (Throwable th3) {
                    }
                }
            } catch (Throwable th4) {
                try {
                    connection.close();
                } catch (Throwable th5) {
                }
                throw th4;
            }
        });
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(getQueueName());
        thread.start();
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(1);
        for (int i = 0; i < 50000; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeUTF("Hello world!!!!" + i);
            createBytesMessage.setIntProperty("count", i);
            createProducer.send(createBytesMessage);
        }
        thread.join();
        if (!arrayList.isEmpty()) {
            throw ((Throwable) arrayList.get(0));
        }
        org.apache.activemq.artemis.core.server.Queue proxyToQueue = getProxyToQueue(getQueueName());
        createConnection.close();
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(0L, proxyToQueue::getMessageCount);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        logger.debug("Microbenchamrk ran in {} milliseconds, sending/receiving {}", Long.valueOf(currentTimeMillis2), 50000);
        logger.debug("{} messages per second", Integer.valueOf((int) ((50000.0d / currentTimeMillis2) * 1000.0d)));
    }

    @Timeout(60)
    @Test
    public void testClientAckMessages() throws Exception {
        Connection createConnection = createConnection();
        try {
            System.currentTimeMillis();
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            new Random().nextBytes(new byte[2048]);
            for (int i = 0; i < 10; i++) {
                TextMessage createTextMessage = createSession.createTextMessage();
                createTextMessage.setText("msg:" + i);
                createProducer.send(createTextMessage);
            }
            createConnection.close();
            org.apache.activemq.artemis.core.server.Queue proxyToQueue = getProxyToQueue(getQueueName());
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(10L, proxyToQueue::getMessageCount);
            createConnection = createConnection();
            MessageConsumer createConsumer = createConnection.createSession(false, 2).createConsumer(createQueue);
            for (int i2 = 0; i2 < 10; i2++) {
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive, i2);
                Assertions.assertTrue(receive instanceof TextMessage, receive);
                Assertions.assertEquals(receive.getText(), "msg:" + i2);
                receive.acknowledge();
            }
            createConsumer.close();
            createConnection.close();
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(0L, proxyToQueue::getMessageCount);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteQueues(false));
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("exampleQueue1"));
            for (int i = 0; i < 40; i++) {
                TextMessage createTextMessage = createSession.createTextMessage();
                createTextMessage.setText("Message temporary");
                createProducer.send(createTextMessage);
            }
            createProducer.close();
            createSession.close();
            for (int i2 = 0; i2 < 40; i2++) {
                Session createSession2 = createConnection.createSession(false, 1);
                Assertions.assertNotNull(createSession2.createConsumer(createSession2.createQueue("exampleQueue1")).receive(1000L));
                createSession2.close();
            }
            Session createSession3 = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession3.createConsumer(createSession3.createQueue("exampleQueue1"));
            for (int i3 = 0; i3 < 40; i3++) {
                Assertions.assertNull(createConsumer.receive(1L));
            }
            createProducer.close();
            createSession3.close();
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test
    public void testConcurrentSharedConsumerConnections() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        for (int i = 1; i < 20; i++) {
            newFixedThreadPool.submit(() -> {
                try {
                    Connection createConnection = createConnection();
                    try {
                        createConnection.start();
                        Session createSession = createConnection.createSession();
                        createSession.createSharedConsumer(createSession.createTopic("topics.foo"), "MY_SUB");
                        Thread.sleep(100L);
                        if (createConnection != null) {
                            createConnection.close();
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                }
            });
        }
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Timeout(30)
    @Test
    public void testBrokerRestartAMQPProducerAMQPConsumer() throws Exception {
        testBrokerRestart(createFailoverConnection(), createFailoverConnection());
    }

    private void testBrokerRestart(Connection connection, Connection connection2) throws Exception {
        try {
            Session createSession = connection.createSession(false, 1);
            Session createSession2 = connection2.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            connection.start();
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage);
            Message receive = createConsumer.receive(100L);
            Assertions.assertNotNull(receive, "Should have received a message by now.");
            Assertions.assertTrue(receive instanceof TextMessage, "Should be an instance of TextMessage");
            Assertions.assertEquals(2, receive.getJMSDeliveryMode());
            this.server.stop();
            Wait.waitFor(() -> {
                return !this.server.isStarted();
            }, 1000L);
            this.server.start();
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("hello");
            createProducer.send(createTextMessage2);
            Message receive2 = createConsumer.receive(100L);
            Assertions.assertNotNull(receive2, "Should have received a message by now.");
            Assertions.assertTrue(receive2 instanceof TextMessage, "Should be an instance of TextMessage");
            Assertions.assertEquals(2, receive2.getJMSDeliveryMode());
            connection.close();
            connection2.close();
        } catch (Throwable th) {
            connection.close();
            connection2.close();
            throw th;
        }
    }
}
