package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.class */
public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);

    @Test(timeout = 60000)
    public void testCreateQueueReceiver() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getTestName());
        assertNotNull(getProxyToQueue(getTestName()));
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCreateQueueReceiverWithJMSSelector() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.1
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) {
                    markAsInvalid("Broker did not return the JMS Filter on Attach");
                }
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        addConnection.createSession().createReceiver(getTestName(), "JMSPriority > 8");
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.2
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
                    markAsInvalid("Broker did not return the NoLocal Filter on Attach");
                }
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        addConnection.createSession().createReceiver(getTestName(), (String) null, true);
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testInvalidFilter() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            addConnection.createSession().createReceiver(getTestName(), "null = 'f''", true);
            fail("should throw exception");
        } catch (Exception e) {
            assertTrue(e.getCause() instanceof JMSException);
        }
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testQueueReceiverReadMessage() throws Exception {
        sendMessages(getTestName(), 1);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getTestName());
        Queue proxyToQueue = getProxyToQueue(getTestName());
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        createReceiver.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
        sendMessages(getTestName(), 4);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        Queue proxyToQueue = getProxyToQueue(getTestName());
        assertEquals(4, proxyToQueue.getMessageCount());
        createReceiver.flow(2);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        AmqpReceiver createReceiver2 = createSession.createReceiver(getTestName());
        assertEquals(2L, this.server.getTotalConsumerCount());
        createReceiver2.flow(2);
        assertNotNull(createReceiver2.receive(5L, TimeUnit.SECONDS));
        assertNotNull(createReceiver2.receive(5L, TimeUnit.SECONDS));
        assertEquals(0L, proxyToQueue.getMessagesAcknowledged());
        createReceiver.close();
        createReceiver2.close();
        assertEquals(4, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
        sendMessages(getTestName(), 4);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        final Queue proxyToQueue = getProxyToQueue(getTestName());
        assertEquals(4, proxyToQueue.getMessageCount());
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.accept();
        AmqpMessage receive2 = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive2);
        receive2.accept();
        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.3
            @Override // org.apache.activemq.artemis.tests.util.Wait.Condition
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getMessagesAcknowledged() == 2;
            }
        }, TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS.toMillis(50L)));
        AmqpReceiver createReceiver2 = createSession.createReceiver(getTestName());
        assertEquals(2L, this.server.getTotalConsumerCount());
        createReceiver2.flow(2);
        AmqpMessage receive3 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive3);
        receive3.accept();
        AmqpMessage receive4 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive4);
        receive4.accept();
        assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.4
            @Override // org.apache.activemq.artemis.tests.util.Wait.Condition
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getMessagesAcknowledged() == 4;
            }
        }, TimeUnit.SECONDS.toMillis(15L), TimeUnit.MILLISECONDS.toMillis(10L)));
        createReceiver.close();
        createReceiver2.close();
        assertEquals(0L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
        sendMessages(getTestName(), 20);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        final Queue proxyToQueue = getProxyToQueue(getTestName());
        assertEquals(20, proxyToQueue.getMessageCount());
        createReceiver.flow(20);
        assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.5
            @Override // org.apache.activemq.artemis.tests.util.Wait.Condition
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getDeliveringCount() >= 2;
            }
        }, TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS.toMillis(50L)));
        createReceiver.close();
        AmqpReceiver createReceiver2 = createSession.createReceiver(getTestName());
        assertEquals(1L, this.server.getTotalConsumerCount());
        createReceiver2.flow(20 * 2);
        AmqpMessage receive = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.accept();
        AmqpMessage receive2 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive2);
        receive2.accept();
        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.6
            @Override // org.apache.activemq.artemis.tests.util.Wait.Condition
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getMessagesAcknowledged() == 2;
            }
        }, TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS.toMillis(50L)));
        createReceiver2.close();
        assertEquals(20 - 2, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSimpleSendOneReceiveOne() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("msg1");
        amqpMessage.setMessageAnnotation("serialNo", 1);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        LOG.info("Attempting to read message with receiver");
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message", receive);
        assertEquals("msg1", receive.getMessageId());
        receive.accept();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testCloseBusyReceiver() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            amqpMessage.setMessageAnnotation("serialNo", Integer.valueOf(i));
            amqpMessage.setText("Test-Message");
            System.out.println("Sending message: " + amqpMessage.getMessageId());
            createSender.send(amqpMessage);
        }
        createSender.close();
        assertEquals(20L, getProxyToQueue(getTestName()).getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        createReceiver.flow(20);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull("Should have got a message", receive);
        assertEquals("msg0", receive.getMessageId());
        createReceiver.close();
        AmqpReceiver createReceiver2 = createSession.createReceiver(getTestName());
        createReceiver2.flow(200);
        for (int i2 = 0; i2 < 20; i2++) {
            AmqpMessage receive2 = createReceiver2.receive(5L, TimeUnit.SECONDS);
            assertNotNull("Should have got a message", receive2);
            System.out.println("Read message: " + receive2.getMessageId());
            assertEquals("msg" + i2, receive2.getMessageId());
        }
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveWithJMSSelectorFilter() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setGroupId("abcdefg");
        amqpMessage.setApplicationProperty("sn", 100);
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setGroupId("hijklm");
        amqpMessage2.setApplicationProperty("sn", 200);
        AmqpSender createSender = createSession.createSender(getTestName());
        createSender.send(amqpMessage);
        createSender.send(amqpMessage2);
        createSender.close();
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName(), "sn = 100");
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive);
        assertEquals(100, receive.getApplicationProperty("sn"));
        assertEquals("abcdefg", receive.getGroupId());
        receive.accept();
        assertNull(createReceiver.receive(1L, TimeUnit.SECONDS));
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testAdvancedLinkFlowControl() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            amqpMessage.setMessageAnnotation("serialNo", Integer.valueOf(i));
            amqpMessage.setText("Test-Message");
            createSender.send(amqpMessage);
        }
        createSender.close();
        LOG.info("Attempting to read first two messages with receiver #1");
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 1", receive);
        assertNotNull("Should have read message 2", receive2);
        assertEquals("msg0", receive.getMessageId());
        assertEquals("msg1", receive2.getMessageId());
        receive.accept();
        receive2.accept();
        LOG.info("Attempting to read next two messages with receiver #2");
        AmqpReceiver createReceiver2 = createSession.createReceiver(getTestName());
        createReceiver2.flow(2);
        AmqpMessage receive3 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive4 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 3", receive3);
        assertNotNull("Should have read message 4", receive4);
        assertEquals("msg2", receive3.getMessageId());
        assertEquals("msg3", receive4.getMessageId());
        receive3.accept();
        receive4.accept();
        LOG.info("Attempting to read remaining messages with receiver #1");
        createReceiver.flow(16);
        for (int i2 = 4; i2 < 20; i2++) {
            AmqpMessage receive5 = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Should have read a message", receive5);
            assertEquals("msg" + i2, receive5.getMessageId());
            receive5.accept();
        }
        createReceiver.close();
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testDispatchOrderWithPrefetchOfOne() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            amqpMessage.setMessageAnnotation("serialNo", Integer.valueOf(i));
            amqpMessage.setText("Test-Message");
            createSender.send(amqpMessage);
        }
        createSender.close();
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        createReceiver.flow(1);
        AmqpReceiver createReceiver2 = createSession.createReceiver(getTestName());
        createReceiver2.flow(1);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 1", receive);
        assertNotNull("Should have read message 2", receive2);
        assertEquals("msg0", receive.getMessageId());
        assertEquals("msg1", receive2.getMessageId());
        receive.accept();
        receive2.accept();
        createReceiver.flow(1);
        AmqpMessage receive3 = createReceiver.receive(10L, TimeUnit.SECONDS);
        createReceiver2.flow(1);
        AmqpMessage receive4 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 3", receive3);
        assertNotNull("Should have read message 4", receive4);
        assertEquals("msg2", receive3.getMessageId());
        assertEquals("msg3", receive4.getMessageId());
        receive3.accept();
        receive4.accept();
        LOG.info("*** Attempting to read remaining messages with both receivers");
        LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", 8);
        createReceiver.flow(8);
        for (int i2 = 0; i2 < 8; i2++) {
            AmqpMessage receive5 = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #1 should have read a message", receive5);
            LOG.info("Receiver #1 read message: {}", receive5.getMessageId());
            receive5.accept();
        }
        LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", 8);
        createReceiver2.flow(8);
        for (int i3 = 0; i3 < 8; i3++) {
            AmqpMessage receive6 = createReceiver2.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #2 should have read message[" + i3 + "]", receive6);
            LOG.info("Receiver #2 read message: {}", receive6.getMessageId());
            receive6.accept();
        }
        createReceiver.close();
        createReceiver2.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveMessageAndRefillCreditBeforeAccept() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String testName = getTestName();
        AmqpReceiver createReceiver = createSession.createReceiver(testName);
        AmqpSender createSender = createSession.createSender(testName);
        for (int i = 0; i < 2; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        createSender.close();
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        createReceiver.flow(1);
        receive.accept();
        AmqpMessage receive2 = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull(receive2);
        receive2.accept();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception {
        final AmqpClient createAmqpClient = createAmqpClient();
        final LinkedList linkedList = new LinkedList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        final String testName = getTestName();
        newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.7
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AmqpSendReceiveTest.LOG.info("Starting consumer connection");
                    AmqpConnection addConnection = AmqpSendReceiveTest.this.addConnection(createAmqpClient.connect());
                    AmqpReceiver createReceiver = addConnection.createSession().createReceiver(testName);
                    createReceiver.flow(1);
                    countDownLatch.countDown();
                    AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    createReceiver.flow(1);
                    receive.accept();
                    AmqpMessage receive2 = createReceiver.receive(5L, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive2);
                    receive2.accept();
                    createReceiver.close();
                    addConnection.close();
                } catch (Exception e) {
                    linkedList.add(e);
                }
            }
        });
        newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.8
            @Override // java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await(20L, TimeUnit.SECONDS);
                    AmqpConnection addConnection = AmqpSendReceiveTest.this.addConnection(createAmqpClient.connect());
                    AmqpSender createSender = addConnection.createSession().createSender(testName);
                    for (int i = 0; i < 2; i++) {
                        AmqpMessage amqpMessage = new AmqpMessage();
                        amqpMessage.setMessageId("msg" + i);
                        createSender.send(amqpMessage);
                    }
                    createSender.close();
                    addConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(20L, TimeUnit.SECONDS);
        assertTrue("no errors: " + linkedList, linkedList.isEmpty());
    }

    @Test(timeout = 60000)
    public void testMessageDurabliltyFollowsSpec() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        Queue proxyToQueue = getProxyToQueue(getTestName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setText("Test-Message -> non-durable");
        amqpMessage.setDurable(false);
        amqpMessage.setMessageId("ID:Message:1");
        createSender.send(amqpMessage);
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(50L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive);
        assertFalse("First message sent should not be durable", receive.isDurable());
        receive.accept();
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setText("Test-Message -> durable");
        amqpMessage2.setDurable(true);
        amqpMessage2.setMessageId("ID:Message:2");
        createSender.send(amqpMessage2);
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        AmqpMessage receive2 = createReceiver.receive(50L, TimeUnit.SECONDS);
        assertNotNull("Should have read a message", receive2);
        assertTrue("Second message sent should be durable", receive2.isDurable());
        receive2.accept();
        createSender.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String testName = getTestName();
        AmqpReceiver createReceiver = createSession.createReceiver(testName);
        AmqpSender createSender = createSession.createSender(testName);
        final Queue proxyToQueue = getProxyToQueue(testName);
        for (int i = 0; i < 50; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 50; i2++) {
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull(receive);
            arrayList.add(receive);
        }
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setMessageId("msg-final");
        createSender.send(amqpMessage2);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((AmqpMessage) it.next()).accept();
        }
        assertTrue("Should be no inflight messages: " + proxyToQueue.getDeliveringCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.9
            @Override // org.apache.activemq.artemis.tests.util.Wait.Condition
            public boolean isSatisfied() throws Exception {
                return proxyToQueue.getDeliveringCount() == 0;
            }
        }));
        createSender.close();
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String testName = getTestName();
        AmqpSender createSender = createSession.createSender(testName);
        AmqpReceiver createReceiver = createSession.createReceiver(testName, (String) null, false, true);
        AmqpReceiver createReceiver2 = createSession.createReceiver(testName, (String) null, false, true);
        for (int i = 0; i < 100; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setMessageId("msg" + i);
            createSender.send(amqpMessage);
        }
        LOG.info("Attempting to read first two messages with receiver #1");
        createReceiver.flow(2);
        AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 1", receive);
        assertNotNull("Should have read message 2", receive2);
        assertEquals("msg0", receive.getMessageId());
        assertEquals("msg1", receive2.getMessageId());
        receive.accept();
        receive2.accept();
        LOG.info("Attempting to read next two messages with receiver #2");
        createReceiver2.flow(2);
        AmqpMessage receive3 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        AmqpMessage receive4 = createReceiver2.receive(10L, TimeUnit.SECONDS);
        assertNotNull("Should have read message 3", receive3);
        assertNotNull("Should have read message 4", receive4);
        assertEquals("msg2", receive3.getMessageId());
        assertEquals("msg3", receive4.getMessageId());
        receive3.accept();
        receive4.accept();
        LOG.info("*** Attempting to read remaining messages with both receivers");
        LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", 48);
        createReceiver.flow(48);
        for (int i2 = 0; i2 < 48; i2++) {
            AmqpMessage receive5 = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #1 should have read a message", receive5);
            LOG.info("Receiver #1 read message: {}", receive5.getMessageId());
            receive5.accept();
        }
        LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", 48);
        createReceiver2.flow(48);
        for (int i3 = 0; i3 < 48; i3++) {
            AmqpMessage receive6 = createReceiver2.receive(10L, TimeUnit.SECONDS);
            assertNotNull("Receiver #2 should have read a message[" + i3 + "]", receive6);
            LOG.info("Receiver #2 read message: {}", receive6.getMessageId());
            receive6.accept();
        }
        createReceiver.close();
        createReceiver2.close();
        addConnection.close();
    }

    @Test
    public void testDeliveryDelayOfferedWhenRequested() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveTest.10
            public void inspectOpenedResource(Sender sender) {
                if (AmqpSupport.contains(sender.getRemoteOfferedCapabilities(), org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY)) {
                    return;
                }
                markAsInvalid("Broker did not indicate it support delayed message delivery");
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        AmqpSender createSender = addConnection.createSession().createSender("queue://" + getTestName(), new Symbol[]{org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY});
        assertNotNull(createSender);
        addConnection.getStateInspector().assertValid();
        createSender.close();
        addConnection.close();
    }

    public void sendMessages(String str, int i) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(str);
            for (int i2 = 0; i2 < i; i2++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setMessageId("MessageID:" + i2);
                createSender.send(amqpMessage);
            }
        } finally {
            addConnection.close();
        }
    }
}
