package org.apache.activemq.artemis.tests.integration.amqp;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.class */
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Test(timeout = 60000)
    public void testReceiverCanDrainMessagesQueue() throws Exception {
        doTestReceiverCanDrainMessages(false);
    }

    @Test(timeout = 60000)
    public void testReceiverCanDrainMessagesTopic() throws Exception {
        doTestReceiverCanDrainMessages(true);
    }

    private void doTestReceiverCanDrainMessages(boolean z) throws Exception {
        String topicName = z ? getTopicName() : getQueueName();
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(topicName);
        sendMessages(topicName, 20);
        Queue proxyToQueue = getProxyToQueue(topicName);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(20, proxyToQueue::getMessageCount);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(0, proxyToQueue::getDeliveringCount);
        createReceiver.drain(20);
        for (int i = 0; i < 20; i++) {
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull("Failed to read message: " + (i + 1), receive);
            logger.info("Read message: {}", receive.getMessageId());
            receive.accept();
        }
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testPullWithNoMessageGetDrainedQueue() throws Exception {
        doTestPullWithNoMessageGetDrained(false);
    }

    @Test(timeout = 60000)
    public void testPullWithNoMessageGetDrainedTopic() throws Exception {
        doTestPullWithNoMessageGetDrained(true);
    }

    private void doTestPullWithNoMessageGetDrained(boolean z) throws Exception {
        String topicName = z ? getTopicName() : getQueueName();
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(topicName);
        createReceiver.flow(10);
        Queue proxyToQueue = getProxyToQueue(topicName);
        assertEquals(0L, proxyToQueue.getMessageCount());
        assertEquals(0L, proxyToQueue.getMessagesAcknowledged());
        assertEquals(10L, createReceiver.getReceiver().getRemoteCredit());
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testPullOneFromRemoteQueue() throws Exception {
        doTestPullOneFromRemote(false);
    }

    @Test(timeout = 60000)
    public void testPullOneFromRemoteTopic() throws Exception {
        doTestPullOneFromRemote(true);
    }

    private void doTestPullOneFromRemote(boolean z) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String topicName = z ? getTopicName() : getQueueName();
        AmqpReceiver createReceiver = createSession.createReceiver(topicName);
        sendMessages(topicName, 20);
        assertEquals(20, getProxyToQueue(topicName).getMessageCount());
        assertEquals(0L, r0.getDeliveringCount());
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        AmqpMessage pull = createReceiver.pull(5L, TimeUnit.SECONDS);
        assertNotNull(pull);
        pull.accept();
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        createReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testMultipleZeroResultPullsQueue() throws Exception {
        doTestMultipleZeroResultPulls(false);
    }

    @Test(timeout = 60000)
    public void testMultipleZeroResultPullsTopic() throws Exception {
        doTestMultipleZeroResultPulls(true);
    }

    private void doTestMultipleZeroResultPulls(boolean z) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        String topicName = z ? getTopicName() : getQueueName();
        AmqpReceiver createReceiver = createSession.createReceiver(topicName);
        createReceiver.flow(10);
        assertEquals(0L, getProxyToQueue(topicName).getMessageCount());
        assertEquals(0L, r0.getDeliveringCount());
        assertEquals(10L, createReceiver.getReceiver().getRemoteCredit());
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        addConnection.close();
    }
}
