package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.jms.JmsConnection;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.class */
public class JMSMessageConsumerTest extends JMSClientTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class);

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,OPENWIRE,CORE";
    }

    @Test(timeout = 30000)
    public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception {
        testDeliveryMode(createConnection(), createCoreConnection());
    }

    @Test(timeout = 30000)
    public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception {
        testDeliveryMode(createConnection(), createConnection());
    }

    @Test(timeout = 30000)
    public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception {
        testDeliveryMode(createCoreConnection(), createConnection());
    }

    @Test(timeout = 30000)
    public void testDeliveryModeCoreProducerCoreConsumer() throws Exception {
        testDeliveryMode(createCoreConnection(), createCoreConnection());
    }

    private void testDeliveryMode(Connection connection, Connection connection2) throws JMSException {
        try {
            Session createSession = connection.createSession(false, 1);
            Session createSession2 = connection2.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            connection.start();
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage);
            Message receive = createConsumer.receive(100L);
            assertNotNull("Should have received a message by now.", receive);
            assertTrue("Should be an instance of TextMessage", receive instanceof TextMessage);
            assertEquals(2L, receive.getJMSDeliveryMode());
            connection.close();
            connection2.close();
        } catch (Throwable th) {
            connection.close();
            connection2.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testPriorityAMQPProducerCoreConsumer() throws Exception {
        testPriority(createConnection(), createCoreConnection());
    }

    @Test(timeout = 30000)
    public void testPriorityAMQPProducerAMQPConsumer() throws Exception {
        testPriority(createConnection(), createConnection());
    }

    @Test(timeout = 30000)
    public void testPriorityModeCoreProducerAMQPConsumer() throws Exception {
        testPriority(createCoreConnection(), createConnection());
    }

    @Test(timeout = 30000)
    public void testPriorityCoreProducerCoreConsumer() throws Exception {
        testPriority(createCoreConnection(), createCoreConnection());
    }

    private void testPriority(Connection connection, Connection connection2) throws JMSException {
        try {
            Session createSession = connection.createSession(false, 1);
            Session createSession2 = connection2.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setPriority(2);
            connection.start();
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage);
            Message receive = createConsumer.receive(100L);
            assertNotNull("Should have received a message by now.", receive);
            assertTrue("Should be an instance of TextMessage", receive instanceof TextMessage);
            assertEquals(2L, receive.getJMSPriority());
            connection.close();
            connection2.close();
        } catch (Throwable th) {
            connection.close();
            connection2.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSelectorOnTopic() throws Exception {
        doTestSelector(true);
    }

    @Test(timeout = 60000)
    public void testSelectorOnQueue() throws Exception {
        doTestSelector(false);
    }

    private void doTestSelector(boolean z) throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic, "color = 'RED'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("msg:0");
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("msg:1");
            createTextMessage2.setStringProperty("color", "RED");
            createProducer.send(createTextMessage2);
            createConnection.start();
            TextMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            assertEquals("msg:1", receive.getText());
            assertEquals(receive.getStringProperty("color"), "RED");
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSTypeOnTopic() throws Exception {
        doTestSelectorsWithJMSType(true);
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSTypeOnQueue() throws Exception {
        doTestSelectorsWithJMSType(false);
    }

    private void doTestSelectorsWithJMSType(boolean z) throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic, "JMSType = 'myJMSType'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("text");
            createProducer.send(createTextMessage, 1, 4, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setJMSType("myJMSType");
            createTextMessage2.setText("text + type");
            createProducer.send(createTextMessage2, 1, 4, 0L);
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("Unexpected JMSType value", "myJMSType", receive.getJMSType());
            assertEquals("Unexpected message content", "text + type", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSCorrelationID() throws Exception {
        Connection createConnection = createConnection();
        String uuid = UUID.randomUUID().toString();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("text");
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setJMSCorrelationID(uuid);
            createTextMessage2.setText("JMSCorrelationID");
            createProducer.send(createTextMessage2);
            Enumeration enumeration = createSession.createBrowser(createQueue).getEnumeration();
            int i = 0;
            while (enumeration.hasMoreElements()) {
                assertTrue(((Message) enumeration.nextElement()) instanceof TextMessage);
                i++;
            }
            assertEquals(2L, i);
            TextMessage receive = createSession.createConsumer(createQueue, "JMSCorrelationID = '" + uuid + "'").receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("Unexpected JMSCorrelationID value", uuid, receive.getJMSCorrelationID());
            assertEquals("Unexpected message content", "JMSCorrelationID", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSPriority() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage, 2, 5, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("hello + 9");
            createProducer.send(createTextMessage2, 2, 9, 0L);
            Enumeration enumeration = createSession.createBrowser(createQueue).getEnumeration();
            int i = 0;
            while (enumeration.hasMoreElements()) {
                assertTrue(((Message) enumeration.nextElement()) instanceof TextMessage);
                i++;
            }
            assertEquals(2L, i);
            TextMessage receive = createSession.createConsumer(createQueue, "JMSPriority > 8").receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("hello + 9", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSXGroupIDOnTopic() throws Exception {
        doTestSelectorsWithJMSXGroupID(true);
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSXGroupIDOnQueue() throws Exception {
        doTestSelectorsWithJMSXGroupID(false);
    }

    private void doTestSelectorsWithJMSXGroupID(boolean z) throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic, "JMSXGroupID = '1'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("group 1 - 1");
            createTextMessage.setStringProperty("JMSXGroupID", "1");
            createTextMessage.setIntProperty("JMSXGroupSeq", 1);
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("group 2");
            createTextMessage2.setStringProperty("JMSXGroupID", "2");
            createProducer.send(createTextMessage2);
            TextMessage createTextMessage3 = createSession.createTextMessage();
            createTextMessage3.setText("group 1 - 2");
            createTextMessage3.setStringProperty("JMSXGroupID", "1");
            createTextMessage3.setIntProperty("JMSXGroupSeq", -1);
            createProducer.send(createTextMessage3);
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("group 1 - 1", receive.getText());
            TextMessage receive2 = createConsumer.receive(2000L);
            assertNotNull(receive2);
            assertTrue(receive2 instanceof TextMessage);
            assertEquals("group 1 - 2", receive2.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSDeliveryOnQueue() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSDeliveryMode = 'PERSISTENT'");
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("non-persistent");
            createProducer.send(createTextMessage, 1, 4, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("persistent");
            createProducer.send(createTextMessage2, 2, 4, 0L);
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("Unexpected JMSDeliveryMode value", 2L, receive.getJMSDeliveryMode());
            assertEquals("Unexpected message content", "persistent", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSTimestampOnQueue() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("filtered");
            createProducer.send(createTextMessage, 2, 4, 0L);
            Thread.sleep(2L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("expected");
            createProducer.send(createTextMessage2, 2, 4, 0L);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSTimestamp = " + createTextMessage2.getJMSTimestamp());
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("Unexpected JMSTimestamp value", createTextMessage2.getJMSTimestamp(), receive.getJMSTimestamp());
            assertEquals("Unexpected message content", "expected", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testSelectorsWithJMSExpirationOnQueue() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("filtered");
            createProducer.send(createTextMessage, 2, 4, 0L);
            TextMessage createTextMessage2 = createSession.createTextMessage();
            createTextMessage2.setText("expected");
            createProducer.send(createTextMessage2, 2, 4, 60000L);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSExpiration = " + createTextMessage2.getJMSExpiration());
            createConnection.start();
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("Unexpected JMSExpiration value", createTextMessage2.getJMSExpiration(), receive.getJMSExpiration());
            assertEquals("Unexpected message content", "expected", receive.getText());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testJMSSelectorFiltersJMSMessageIDOnTopic() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createProducer.send(createTextMessage);
            createProducer.send(createSession.createTextMessage());
            createConnection.start();
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "JMSMessageID = '" + createTextMessage.getJMSMessageID() + "'");
            TextMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            assertEquals(createTextMessage.getJMSMessageID(), receive.getJMSMessageID());
            assertNull(createConsumer.receive(1000L));
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testZeroPrefetchWithTwoConsumers() throws Exception {
        JmsConnection createConnection = createConnection();
        createConnection.getPrefetchPolicy().setAll(0);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(getQueueName());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.send(createSession.createTextMessage("Msg1"));
        createProducer.send(createSession.createTextMessage("Msg2"));
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        TextMessage receive = createConsumer.receive(5000L);
        assertNotNull(receive);
        assertEquals("Should have received a message!", receive.getText(), "Msg1");
        TextMessage receive2 = createConsumer2.receive(5000L);
        assertNotNull(receive2);
        assertEquals("Should have received a message!", receive2.getText(), "Msg2");
        assertNull("Should have not received a message!", createConsumer2.receiveNoWait());
    }

    @Test(timeout = 30000)
    public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(true, 2);
    }

    @Test(timeout = 30000)
    public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(false, 2);
    }

    @Test(timeout = 30000)
    public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(true, 1);
    }

    @Test(timeout = 30000)
    public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
        doTestProduceAndConsumeLargeNumbersOfMessages(false, 1);
    }

    public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean z, int i) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        JmsConnection createConnection = createConnection();
        createConnection.setForceAsyncSend(true);
        createConnection.start();
        Session createSession = createConnection.createSession(false, i);
        Topic createTopic = z ? createSession.createTopic(getTopicName()) : createSession.createQueue(getQueueName());
        createSession.createConsumer(createTopic).setMessageListener(new MessageListener() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSMessageConsumerTest.1
            public void onMessage(Message message) {
                try {
                    message.acknowledge();
                    countDownLatch.countDown();
                } catch (JMSException e) {
                    JMSMessageConsumerTest.LOG.info("Caught exception.", e);
                }
            }
        });
        MessageProducer createProducer = createSession.createProducer(createTopic);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("messageText");
        for (int i2 = 0; i2 < 1000; i2++) {
            createProducer.send(createTextMessage);
        }
        assertTrue("Did not receive all messages: 1000", countDownLatch.await(15L, TimeUnit.SECONDS));
    }

    @Test(timeout = 60000)
    public void testPrefetchedMessagesAreNotConsumedOnConsumerClose() throws Exception {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            new Random().nextBytes(new byte[2048]);
            for (int i = 0; i < 10; i++) {
                TextMessage createTextMessage = createSession.createTextMessage();
                createTextMessage.setText("msg:" + i);
                createProducer.send(createTextMessage);
            }
            createConnection.close();
            org.apache.activemq.artemis.core.server.Queue proxyToQueue = getProxyToQueue(getQueueName());
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(10L, proxyToQueue::getMessageCount);
            createConnection = createConnection();
            MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(createQueue);
            Thread.sleep(100L);
            createConsumer.close();
            createConnection.close();
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(10L, proxyToQueue::getMessageCount);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMessagesReceivedInParallel() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        final ArrayList arrayList = new ArrayList();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSMessageConsumerTest.2
            @Override // java.lang.Runnable
            public void run() {
                Connection connection = null;
                try {
                    try {
                        connection = JMSMessageConsumerTest.this.createConnection();
                        connection.start();
                        Session createSession = connection.createSession(false, 1);
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(JMSMessageConsumerTest.this.getQueueName()));
                        long j = 0;
                        for (int i = 50000; i > 0; i--) {
                            try {
                                long j2 = j + 1;
                                j = j2;
                                if (j2 % 1000 == 0) {
                                    System.out.println("received " + j + " messages");
                                }
                                Assert.assertNotNull("Could not receive message count=" + i + " on consumer", createConsumer.receive(5000L));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        try {
                            connection.close();
                        } catch (Throwable th) {
                        }
                    } catch (Throwable th2) {
                        arrayList.add(th2);
                        th2.printStackTrace();
                        try {
                            connection.close();
                        } catch (Throwable th3) {
                        }
                    }
                } catch (Throwable th4) {
                    try {
                        connection.close();
                    } catch (Throwable th5) {
                    }
                    throw th4;
                }
            }
        });
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(getQueueName());
        thread.start();
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(1);
        for (int i = 0; i < 50000; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeUTF("Hello world!!!!" + i);
            createBytesMessage.setIntProperty("count", i);
            createProducer.send(createBytesMessage);
        }
        thread.join();
        if (!arrayList.isEmpty()) {
            throw ((Throwable) arrayList.get(0));
        }
        org.apache.activemq.artemis.core.server.Queue proxyToQueue = getProxyToQueue(getQueueName());
        createConnection.close();
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(0L, proxyToQueue::getMessageCount);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        System.out.println("Microbenchamrk ran in " + currentTimeMillis2 + " milliseconds, sending/receiving 50000");
        System.out.println(((int) ((50000.0d / currentTimeMillis2) * 1000.0d)) + " messages per second");
    }

    @Test(timeout = 60000)
    public void testClientAckMessages() throws Exception {
        Connection createConnection = createConnection();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(getQueueName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            new Random().nextBytes(new byte[2048]);
            for (int i = 0; i < 10; i++) {
                TextMessage createTextMessage = createSession.createTextMessage();
                createTextMessage.setText("msg:" + i);
                createProducer.send(createTextMessage);
            }
            createConnection.close();
            org.apache.activemq.artemis.core.server.Queue proxyToQueue = getProxyToQueue(getQueueName());
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(10L, proxyToQueue::getMessageCount);
            createConnection = createConnection();
            MessageConsumer createConsumer = createConnection.createSession(false, 2).createConsumer(createQueue);
            for (int i2 = 0; i2 < 10; i2++) {
                TextMessage receive = createConsumer.receive(5000L);
                if (receive == null) {
                    System.out.println("ProtonTest.testManyMessages");
                }
                Assert.assertNotNull("" + i2, receive);
                Assert.assertTrue("" + receive, receive instanceof TextMessage);
                Assert.assertEquals(receive.getText(), "msg:" + i2);
                receive.acknowledge();
            }
            createConsumer.close();
            createConnection.close();
            Thread.sleep(500L);
            Objects.requireNonNull(proxyToQueue);
            Wait.assertEquals(0L, proxyToQueue::getMessageCount);
            System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 240000)
    public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
        Connection createConnection = createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("exampleQueue1"));
            for (int i = 0; i < 40; i++) {
                TextMessage createTextMessage = createSession.createTextMessage();
                createTextMessage.setText("Message temporary");
                createProducer.send(createTextMessage);
            }
            createProducer.close();
            createSession.close();
            for (int i2 = 0; i2 < 40; i2++) {
                Session createSession2 = createConnection.createSession(false, 1);
                createSession2.createConsumer(createSession2.createQueue("exampleQueue1")).receive(1000L);
                createProducer.close();
                createSession2.close();
            }
            Session createSession3 = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession3.createConsumer(createSession3.createQueue("exampleQueue1"));
            for (int i3 = 0; i3 < 40; i3++) {
                createConsumer.receive(1000L);
            }
            createProducer.close();
            createSession3.close();
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }
}
