package org.apache.activemq.artemis.tests.integration.amqp;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.class */
public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Timeout(60)
    @Test
    public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception {
        sendMessages(getQueueName(), 2);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), null, false, true);
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Assertions.assertEquals(2L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        createReceiver2.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        AmqpMessage receive2 = createReceiver2.receive(5L, TimeUnit.SECONDS);
        Assertions.assertNotNull(receive);
        Assertions.assertNotNull(receive2);
        Assertions.assertTrue(receive.getWrappedDelivery().remotelySettled());
        receive2.accept();
        createReceiver.close();
        createReceiver2.close();
        logger.debug("Message Count after all consumed: {}", Long.valueOf(proxyToQueue.getMessageCount()));
        AmqpReceiver createReceiver3 = createSession.createReceiver(getQueueName());
        createReceiver3.flow(1);
        AmqpMessage receive3 = createReceiver3.receive(5L, TimeUnit.SECONDS);
        if (receive3 != null) {
            logger.debug("Message read: {}", receive3.getMessageId());
        }
        Assertions.assertNull(receive3);
        Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Timeout(60)
    @Test
    public void testPresettledReceiverReadsAllMessages() throws Exception {
        sendMessages(getQueueName(), 100);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), null, false, true);
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Assertions.assertEquals(100L, proxyToQueue.getMessageCount());
        createReceiver.flow(100);
        for (int i = 0; i < 100; i++) {
            Assertions.assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        }
        createReceiver.close();
        logger.debug("Message Count after all consumed: {}", Long.valueOf(proxyToQueue.getMessageCount()));
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        createReceiver2.flow(1);
        AmqpMessage receive = createReceiver2.receive(5L, TimeUnit.SECONDS);
        if (receive != null) {
            logger.debug("Message read: {}", receive.getMessageId());
        }
        Assertions.assertNull(receive);
        Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Timeout(60)
    @Test
    public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception {
        sendMessages(getQueueName(), 100);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), null, false, true);
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        Assertions.assertEquals(100L, proxyToQueue.getMessageCount());
        createReceiver.flow(20);
        for (int i = 0; i < 10; i++) {
            Assertions.assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        }
        createReceiver.flow(10);
        for (int i2 = 0; i2 < 20; i2++) {
            Assertions.assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        }
        createReceiver.flow(70);
        for (int i3 = 0; i3 < 70; i3++) {
            Assertions.assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        }
        createReceiver.close();
        logger.debug("Message Count after all consumed: {}", Long.valueOf(proxyToQueue.getMessageCount()));
        AmqpReceiver createReceiver2 = createSession.createReceiver(getQueueName());
        createReceiver2.flow(1);
        AmqpMessage receive = createReceiver2.receive(5L, TimeUnit.SECONDS);
        if (receive != null) {
            logger.debug("Message read: {}", receive.getMessageId());
        }
        Assertions.assertNull(receive);
        Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Timeout(60)
    @Test
    public void testPresettledReceiverWithinBoundsOfActiveTXWithCommit() throws Exception {
        doTestPresettledReceiverWithinBoundsOfActiveTX(true);
    }

    @Timeout(60)
    @Test
    public void testPresettledReceiverWithinBoundsOfActiveTXWithRollback() throws Exception {
        doTestPresettledReceiverWithinBoundsOfActiveTX(false);
    }

    private void doTestPresettledReceiverWithinBoundsOfActiveTX(boolean z) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), null, false, true);
        createSession.begin();
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        Assertions.assertNotNull(receive);
        Assertions.assertTrue(receive.getWrappedDelivery().remotelySettled());
        if (z) {
            createSession.commit();
        } else {
            createSession.rollback();
        }
        Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
        createSender.close();
        addConnection.close();
    }

    @Timeout(60)
    @Test
    public void testPresettledReceiverWithinBoundsOfActiveTXWithSendAndRollback() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), null, false, true);
        createSession.begin();
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        Assertions.assertNotNull(receive);
        Assertions.assertTrue(receive.getWrappedDelivery().remotelySettled());
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setText("Test-Message - Rolled Back");
        createSender.send(amqpMessage2);
        createSession.rollback();
        Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
        createSender.close();
        addConnection.close();
    }
}
