package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.jgroups.util.UUID;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.class */
public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    public boolean isAutoCreateQueues() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    public boolean isAutoCreateAddresses() {
        return false;
    }

    @Test(timeout = 60000)
    public void testAcceptWithoutSettling() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
        sendMessages(getQueueName(), 10);
        for (int i = 0; i < 10; i++) {
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive();
            receive.accept(false);
            receive.settle();
        }
        createReceiver.close();
        addConnection.close();
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(0L, proxyToQueue::getMessageCount);
    }

    @Test(timeout = 60000)
    public void testQueueReceiverReadMessage() throws Exception {
        sendMessages(getQueueName(), 1);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        createReceiver.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCoreBridge() throws Exception {
        this.server.getRemotingService().createAcceptor("acceptor", "vm://0").start();
        this.server.getConfiguration().addConnectorConfiguration("connector", "vm://0");
        this.server.deployBridge(new BridgeConfiguration().setName(getTestName()).setQueueName(getQueueName()).setForwardingAddress(getQueueName(1)).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("connector")));
        sendMessages(getQueueName(), 1);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName(1));
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1, proxyToQueue::getConsumerCount);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(0L, proxyToQueue::getMessageCount);
        Queue proxyToQueue2 = getProxyToQueue(getQueueName(1));
        Objects.requireNonNull(proxyToQueue2);
        Wait.assertEquals(1, proxyToQueue2::getConsumerCount);
        Objects.requireNonNull(proxyToQueue2);
        Wait.assertEquals(1L, proxyToQueue2::getMessageCount);
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.accept();
        createReceiver.close();
        Objects.requireNonNull(proxyToQueue2);
        Wait.assertEquals(0L, proxyToQueue2::getMessageCount);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testMessageDurableFalse() throws Exception {
        sendMessages(getQueueName(), 1, false);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        assertFalse(receive.isDurable());
        createReceiver.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testMessageDurableTrue() throws Exception {
        assertNotNull(this.server.locateQueue(getQueueName()));
        sendMessages(getQueueName(), 1, true);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        assertTrue(receive.isDurable());
        createReceiver.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
        sendMessages(getQueueName(), 4);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertEquals(4, proxyToQueue.getMessageCount());
        createReceiver.flow(2);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        assertEquals(2L, this.server.getTotalConsumerCount());
        createReceiver2.flow(2);
        assertNotNull(createReceiver2.receive(5L, TimeUnit.SECONDS));
        assertNotNull(createReceiver2.receive(5L, TimeUnit.SECONDS));
        assertEquals(0L, proxyToQueue.getMessagesAcknowledged());
        createReceiver.close();
        createReceiver2.close();
        assertEquals(4, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
        sendMessages(getQueueName(), 4);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        final Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertEquals(4, proxyToQueue.getMessageCount());
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.accept();
        AmqpMessage receive2 = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive2);
        receive2.accept();
        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.1
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getMessagesAcknowledged() == 2;
            }
        }, TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS.toMillis(50L)));
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        assertEquals(2L, this.server.getTotalConsumerCount());
        createReceiver2.flow(2);
        AmqpMessage receive3 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive3);
        receive3.accept();
        AmqpMessage receive4 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive4);
        receive4.accept();
        assertTrue("Queue should be empty now", org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.2
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getMessagesAcknowledged() == 4;
            }
        }, TimeUnit.SECONDS.toMillis(15L), TimeUnit.MILLISECONDS.toMillis(10L)));
        createReceiver.close();
        createReceiver2.close();
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(0L, proxyToQueue::getMessageCount);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
        sendMessages(getQueueName(), 20);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        final Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertEquals(20, proxyToQueue.getMessageCount());
        createReceiver.flow(20);
        assertTrue("Should have dispatch to prefetch", org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.3
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getDeliveringCount() >= 2;
            }
        }, TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS.toMillis(50L)));
        createReceiver.close();
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        assertEquals(1L, this.server.getTotalConsumerCount());
        createReceiver2.flow(20 * 2);
        AmqpMessage receive = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.accept();
        AmqpMessage receive2 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive2);
        receive2.accept();
        assertTrue("Should have ack'd two", org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.4
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getMessagesAcknowledged() == 2;
            }
        }, TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS.toMillis(50L)));
        createReceiver2.close();
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(20 - 2, proxyToQueue::getMessageCount);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSimpleSendOneReceiveOne() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("msg1");
        amqpMessage.setMessageAnnotation("serialNo", 1);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        logger.debug("Attempting to read message with receiver");
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message", receive);
        assertEquals("msg1", receive.getMessageId());
        receive.accept();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendFilterAnnotation() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("msg1");
        amqpMessage.setMessageAnnotation("x-opt-serialNo", 1);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setMessageId("msg2");
        amqpMessage2.setMessageAnnotation("x-opt-serialNo", 2);
        amqpMessage2.setText("Test-Message 2");
        createSender.send(amqpMessage2);
        createSender.close();
        logger.debug("Attempting to read message with receiver");
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), "\"m.x-opt-serialNo\"=2");
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message", receive);
        assertEquals("msg2", receive.getMessageId());
        receive.accept();
        Assert.assertNull(createReceiver.receiveNoWait());
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCloseBusyReceiver() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            amqpMessage.setMessageAnnotation("serialNo", Integer.valueOf(i));
            amqpMessage.setText("Test-Message");
            logger.debug("Sending message: {}", amqpMessage.getMessageId());
            createSender.send(amqpMessage);
        }
        createSender.close();
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(20L, proxyToQueue::getMessageCount);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(20);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull("Should have got a message", receive);
        assertEquals("msg0", receive.getMessageId());
        createReceiver.close();
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        createReceiver2.flow(200);
        for (int i2 = 0; i2 < 20; i2++) {
            AmqpMessage receive2 = createReceiver2.receive(5L, TimeUnit.SECONDS);
            assertNotNull("Should have got a message", receive2);
            logger.debug("Read message: {}", receive2.getMessageId());
            assertEquals("msg" + i2, receive2.getMessageId());
        }
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveWithJMSSelectorFilter() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setGroupId("abcdefg");
        amqpMessage.setApplicationProperty("sn", 100);
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setGroupId("hijklm");
        amqpMessage2.setApplicationProperty("sn", 200);
        AmqpSender createSender = createSession.createSender(getQueueName());
        createSender.send(amqpMessage);
        createSender.send(amqpMessage2);
        createSender.close();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), "sn = 100");
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive);
        assertEquals(100, receive.getApplicationProperty("sn"));
        assertEquals("abcdefg", receive.getGroupId());
        receive.accept();
        assertNull(createReceiver.receive(1L, TimeUnit.SECONDS));
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveWithJMSSelectorFilterOnJMSType() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setText("msg:1");
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setSubject("target");
        amqpMessage2.setText("msg:2");
        AmqpSender createSender = createSession.createSender(getQueueName());
        createSender.send(amqpMessage);
        createSender.send(amqpMessage2);
        createSender.close();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), "JMSType = 'target'");
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive);
        assertEquals("target", receive.getSubject());
        receive.accept();
        assertNull(createReceiver.receive(1L, TimeUnit.SECONDS));
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testAdvancedLinkFlowControl() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            amqpMessage.setMessageAnnotation("serialNo", Integer.valueOf(i));
            amqpMessage.setText("Test-Message");
            createSender.send(amqpMessage);
        }
        createSender.close();
        logger.debug("Attempting to read first two messages with receiver #1");
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 1", receive);
        assertNotNull("Should have read message 2", receive2);
        assertEquals("msg0", receive.getMessageId());
        assertEquals("msg1", receive2.getMessageId());
        receive.accept();
        receive2.accept();
        logger.debug("Attempting to read next two messages with receiver #2");
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        createReceiver2.flow(2);
        AmqpMessage receive3 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive4 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 3", receive3);
        assertNotNull("Should have read message 4", receive4);
        assertEquals("msg2", receive3.getMessageId());
        assertEquals("msg3", receive4.getMessageId());
        receive3.accept();
        receive4.accept();
        logger.debug("Attempting to read remaining messages with receiver #1");
        createReceiver.flow(16);
        for (int i2 = 4; i2 < 20; i2++) {
            AmqpMessage receive5 = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Should have read a message", receive5);
            assertEquals("msg" + i2, receive5.getMessageId());
            receive5.accept();
        }
        createReceiver.close();
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testDispatchOrderWithPrefetchOfOne() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            amqpMessage.setMessageAnnotation("serialNo", Integer.valueOf(i));
            amqpMessage.setText("Test-Message");
            createSender.send(amqpMessage);
        }
        createSender.close();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        createReceiver2.flow(1);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 1", receive);
        assertNotNull("Should have read message 2", receive2);
        assertEquals("msg0", receive.getMessageId());
        assertEquals("msg1", receive2.getMessageId());
        receive.accept();
        receive2.accept();
        createReceiver.flow(1);
        AmqpMessage receive3 = createReceiver.receive(10L, TimeUnit.SECONDS);
        createReceiver2.flow(1);
        AmqpMessage receive4 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 3", receive3);
        assertNotNull("Should have read message 4", receive4);
        assertEquals("msg2", receive3.getMessageId());
        assertEquals("msg3", receive4.getMessageId());
        receive3.accept();
        receive4.accept();
        logger.debug("*** Attempting to read remaining messages with both receivers");
        logger.debug("**** Receiver #1 granting credit[{}] for its block of messages", 8);
        createReceiver.flow(8);
        for (int i2 = 0; i2 < 8; i2++) {
            AmqpMessage receive5 = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #1 should have read a message", receive5);
            logger.debug("Receiver #1 read message: {}", receive5.getMessageId());
            receive5.accept();
        }
        logger.debug("**** Receiver #2 granting credit[{}] for its block of messages", 8);
        createReceiver2.flow(8);
        for (int i3 = 0; i3 < 8; i3++) {
            AmqpMessage receive6 = createReceiver2.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #2 should have read message[" + i3 + "]", receive6);
            logger.debug("Receiver #2 read message: {}", receive6.getMessageId());
            receive6.accept();
        }
        createReceiver.close();
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveMessageAndRefillCreditBeforeAccept() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String queueName = getQueueName();
        AmqpReceiver createReceiver = createSession.createReceiver(queueName);
        AmqpSender createSender = createSession.createSender(queueName);
        for (int i = 0; i < 2; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        createSender.close();
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        createReceiver.flow(1);
        receive.accept();
        AmqpMessage receive2 = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull(receive2);
        receive2.accept();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception {
        final AmqpClient createAmqpClient = createAmqpClient();
        final LinkedList linkedList = new LinkedList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        final String queueName = getQueueName();
        newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AmqpSendReceiveTest.logger.debug("Starting consumer connection");
                    AmqpConnection addConnection = AmqpSendReceiveTest.this.addConnection(createAmqpClient.connect());
                    AmqpReceiver createReceiver = addConnection.createSession().createReceiver(queueName);
                    createReceiver.flow(1);
                    countDownLatch.countDown();
                    AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    createReceiver.flow(1);
                    receive.accept();
                    AmqpMessage receive2 = createReceiver.receive(5L, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive2);
                    receive2.accept();
                    createReceiver.close();
                    addConnection.close();
                } catch (Exception e) {
                    linkedList.add(e);
                }
            }
        });
        newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.6
            @Override // java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await(20L, TimeUnit.SECONDS);
                    AmqpConnection addConnection = AmqpSendReceiveTest.this.addConnection(createAmqpClient.connect());
                    AmqpSender createSender = addConnection.createSession().createSender(queueName);
                    for (int i = 0; i < 2; i++) {
                        AmqpMessage amqpMessage = new AmqpMessage();
                        amqpMessage.setMessageId("msg" + i);
                        createSender.send(amqpMessage);
                    }
                    createSender.close();
                    addConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(20L, TimeUnit.SECONDS);
        assertTrue("no errors: " + linkedList, linkedList.isEmpty());
    }

    @Test(timeout = 60000)
    public void testMessageDurabliltyFollowsSpec() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setText("Test-Message -> non-durable");
        amqpMessage.setDurable(false);
        amqpMessage.setMessageId("ID:Message:1");
        createSender.send(amqpMessage);
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(50L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive);
        assertFalse("First message sent should not be durable", receive.isDurable());
        receive.accept();
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setText("Test-Message -> durable");
        amqpMessage2.setDurable(true);
        amqpMessage2.setMessageId("ID:Message:2");
        createSender.send(amqpMessage2);
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        createReceiver.flow(1);
        AmqpMessage receive2 = createReceiver.receive(50L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive2);
        assertTrue("Second message sent should be durable", receive2.isDurable());
        receive2.accept();
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testMessageWithHeaderMarkedDurableIsPersisted() throws Exception {
        doTestBrokerRestartAndDurability(true, true, false);
    }

    @Test(timeout = 60000)
    public void testMessageWithHeaderMarkedNonDurableIsNotPersisted() throws Exception {
        doTestBrokerRestartAndDurability(false, true, true);
    }

    @Test(timeout = 60000)
    public void testMessageWithHeaderDefaultedNonDurableIsNotPersisted() throws Exception {
        doTestBrokerRestartAndDurability(false, true, false);
    }

    @Test(timeout = 60000)
    public void testMessageWithNoHeaderIsNotPersisted() throws Exception {
        doTestBrokerRestartAndDurability(false, false, false);
    }

    private void doTestBrokerRestartAndDurability(boolean z, boolean z2, boolean z3) throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Message create = Message.Factory.create();
        create.setMessageId("ID:Message:1");
        create.setBody(new AmqpValue("Test-Message -> " + (z ? "durable" : "non-durable")));
        if (z || z2) {
            Header header = new Header();
            if (z) {
                header.setDurable(true);
            } else if (z3) {
                header.setDurable(false);
            } else {
                header.setPriority(UnsignedByte.valueOf((byte) 5));
                assertNull(header.getDurable());
            }
            create.setHeader(header);
        } else {
            assertNull("Should not have a header", create.getHeader());
        }
        createSender.send(new AmqpMessage(create));
        addConnection.close();
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        this.server.stop();
        this.server.start();
        AmqpConnection addConnection2 = addConnection(createAmqpClient.connect());
        AmqpReceiver createReceiver = addConnection2.createSession().createReceiver(getQueueName());
        Queue proxyToQueue2 = getProxyToQueue(getQueueName());
        if (z) {
            org.apache.activemq.artemis.tests.util.Wait.assertTrue("Message should not have returned", () -> {
                return proxyToQueue2.getMessageCount() == 1;
            });
        } else {
            org.apache.activemq.artemis.tests.util.Wait.assertTrue("Message should have been restored", () -> {
                return proxyToQueue2.getMessageCount() == 0;
            });
        }
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(1L, TimeUnit.SECONDS);
        if (z) {
            assertNotNull("Should have read a message", receive);
        } else {
            assertNull("Should not have read a message", receive);
        }
        addConnection2.close();
    }

    @Test(timeout = 60000)
    public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String queueName = getQueueName();
        AmqpReceiver createReceiver = createSession.createReceiver(queueName);
        AmqpSender createSender = createSession.createSender(queueName);
        Queue proxyToQueue = getProxyToQueue(queueName);
        for (int i = 0; i < 50; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 50; i2++) {
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull(receive);
            arrayList.add(receive);
        }
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setMessageId("msg-final");
        createSender.send(amqpMessage2);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((AmqpMessage) it.next()).accept();
        }
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(0, proxyToQueue::getDeliveringCount);
        createSender.close();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String queueName = getQueueName();
        AmqpSender createSender = createSession.createSender(queueName);
        AmqpReceiver createReceiver = createSession.createReceiver(queueName, (String) null, false, true);
        AmqpReceiver createReceiver2 = createSession.createReceiver(queueName, (String) null, false, true);
        for (int i = 0; i < 100; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        logger.debug("Attempting to read first two messages with receiver #1");
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 1", receive);
        assertNotNull("Should have read message 2", receive2);
        assertEquals("msg0", receive.getMessageId());
        assertEquals("msg1", receive2.getMessageId());
        receive.accept();
        receive2.accept();
        logger.debug("Attempting to read next two messages with receiver #2");
        createReceiver2.flow(2);
        AmqpMessage receive3 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive4 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 3", receive3);
        assertNotNull("Should have read message 4", receive4);
        assertEquals("msg2", receive3.getMessageId());
        assertEquals("msg3", receive4.getMessageId());
        receive3.accept();
        receive4.accept();
        logger.debug("*** Attempting to read remaining messages with both receivers");
        logger.debug("**** Receiver #1 granting credit[{}] for its block of messages", 48);
        createReceiver.flow(48);
        for (int i2 = 0; i2 < 48; i2++) {
            AmqpMessage receive5 = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #1 should have read a message", receive5);
            logger.debug("Receiver #1 read message: {}", receive5.getMessageId());
            receive5.accept();
        }
        logger.debug("**** Receiver #2 granting credit[{}] for its block of messages", 48);
        createReceiver2.flow(48);
        for (int i3 = 0; i3 < 48; i3++) {
            AmqpMessage receive6 = createReceiver2.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #2 should have read a message[" + i3 + "]", receive6);
            logger.debug("Receiver #2 read message: {}", receive6.getMessageId());
            receive6.accept();
        }
        createReceiver.close();
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testDeliveryDelayOfferedWhenRequested() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.7
            public void inspectOpenedResource(Sender sender) {
                if (AmqpSupport.contains(sender.getRemoteOfferedCapabilities(), org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY)) {
                    return;
                }
                markAsInvalid("Broker did not indicate it support delayed message delivery");
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        AmqpSender createSender = addConnection.createSession().createSender(getQueueName(), new Symbol[]{org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY});
        assertNotNull(createSender);
        addConnection.getStateInspector().assertValid();
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testMessageWithToFieldSetToSenderAddress() throws Exception {
        doTestMessageWithToFieldSet(false, getQueueName());
    }

    @Test(timeout = 60000)
    public void testMessageWithToFieldSetToRandomAddress() throws Exception {
        doTestMessageWithToFieldSet(false, UUID.randomUUID().toString());
    }

    @Test(timeout = 60000)
    public void testMessageWithToFieldSetToEmpty() throws Exception {
        doTestMessageWithToFieldSet(false, "");
    }

    @Test(timeout = 60000)
    public void testMessageWithToFieldSetToNull() throws Exception {
        doTestMessageWithToFieldSet(false, null);
    }

    @Test(timeout = 60000)
    public void testMessageWithToFieldSetWithAnonymousSender() throws Exception {
        doTestMessageWithToFieldSet(true, getQueueName());
    }

    private void doTestMessageWithToFieldSet(boolean z, String str) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String queueName = getQueueName();
        AmqpSender createSender = createSession.createSender(z ? null : queueName);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setAddress(str);
        amqpMessage.setMessageId("msg:1");
        createSender.send(amqpMessage);
        AmqpReceiver createReceiver = createSession.createReceiver(queueName);
        Queue proxyToQueue = getProxyToQueue(queueName);
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        assertEquals(str, receive.getAddress());
        createReceiver.close();
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        Exception exc = null;
        try {
            addConnection.createSession().createSender("AnAddressThatDoesNotExist");
            fail("Creating a sender here on an address that doesn't exist should fail");
        } catch (Exception e) {
            exc = e;
        }
        assertNotNull(exc);
        assertTrue(exc.getMessage().contains("amqp:not-found"));
        assertTrue(exc.getMessage().contains("target address AnAddressThatDoesNotExist does not exist"));
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
        this.server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("TestAddress"), RoutingType.ANYCAST));
        this.server.createQueue(new QueueConfiguration(new SimpleString("TestQueueName")).setAddress(new SimpleString("TestAddress")).setRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender("TestAddress");
            AmqpReceiver createReceiver = createSession.createReceiver("TestAddress");
            createReceiver.flow(1);
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setText("TestPayload");
            createSender.send(amqpMessage);
            assertNotNull(createReceiver.receive(5000L, TimeUnit.MILLISECONDS));
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception {
        doTestSendReceiveLotsOfDurableMessages(Queue.class);
    }

    @Test(timeout = 60000)
    public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception {
        doTestSendReceiveLotsOfDurableMessages(Topic.class);
    }

    private void doTestSendReceiveLotsOfDurableMessages(Class<?> cls) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        String queueName = Queue.class.equals(cls) ? getQueueName() : getTopicName();
        final AmqpReceiver createReceiver = createSession.createReceiver(queueName);
        createReceiver.flow(1000);
        AmqpSender createSender = createSession.createSender(queueName);
        Queue proxyToQueue = getProxyToQueue(queueName);
        newSingleThreadExecutor.execute(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.8
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    try {
                        createReceiver.receive(5L, TimeUnit.SECONDS).accept();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        AmqpSendReceiveTest.logger.debug("Caught error: {}", e.getClass().getSimpleName());
                        atomicBoolean.set(true);
                    }
                }
            }
        });
        for (int i = 0; i < 1000; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        assertTrue("did not read all messages, waiting on: " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
        assertFalse("should not be any errors on receive", atomicBoolean.get());
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(0, proxyToQueue::getDeliveringCount);
        createSender.close();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveRejecting() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String queueName = getQueueName();
        AmqpSender createSender = createSession.createSender(queueName);
        for (int i = 0; i < 1000; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        Queue proxyToQueue = getProxyToQueue(queueName);
        for (int i2 = 0; i2 < 1000; i2++) {
            AmqpReceiver createReceiver = createSession.createReceiver(queueName);
            createReceiver.flow(1000);
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals("msg" + i2, receive.getMessageId());
            receive.accept();
            createReceiver.close();
        }
        AmqpReceiver createReceiver2 = createSession.createReceiver(queueName);
        createReceiver2.flow(1000);
        Assert.assertNull(createReceiver2.receive(1L, TimeUnit.MILLISECONDS));
        Objects.requireNonNull(proxyToQueue);
        org.apache.activemq.artemis.tests.util.Wait.assertEquals(0, proxyToQueue::getDeliveringCount);
        addConnection.close();
    }
}
