package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/CommitRollbackTest.class */
public class CommitRollbackTest extends ActiveMQTestBase {
    public final SimpleString addressA = SimpleString.of("addressA");
    public final SimpleString addressB = SimpleString.of("addressB");
    public final SimpleString queueA = SimpleString.of("queueA");
    public final SimpleString queueB = SimpleString.of("queueB");
    public final SimpleString queueC = SimpleString.of("queueC");

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/CommitRollbackTest$ackHandler.class */
    private static class ackHandler implements MessageHandler {
        private final ClientSession session;
        private final CountDownLatch latch;

        private ackHandler(ClientSession clientSession, CountDownLatch countDownLatch) {
            this.session = clientSession;
            this.latch = countDownLatch;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                clientMessage.acknowledge();
            } catch (ActiveMQException e) {
                try {
                    this.session.close();
                } catch (ActiveMQException e2) {
                    e2.printStackTrace();
                }
            }
            this.latch.countDown();
        }
    }

    @Test
    public void testReceiveWithCommit() throws Exception {
        ActiveMQServer createServer = createServer(false);
        createServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator());
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.queueA).setAddress(this.addressA).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.addressA);
        ClientConsumer createConsumer = createSession2.createConsumer(this.queueA);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(false));
        }
        createSession2.start();
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
        }
        Queue bindable = createServer.getPostOffice().getBinding(this.queueA).getBindable();
        Assertions.assertEquals(100, bindable.getDeliveringCount());
        createSession2.commit();
        Assertions.assertEquals(0, bindable.getDeliveringCount());
        createSession2.close();
        createSession.close();
    }

    @Test
    public void testReceiveWithRollback() throws Exception {
        ActiveMQServer createServer = createServer(false);
        createServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator());
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.queueA).setAddress(this.addressA).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.addressA);
        ClientConsumer createConsumer = createSession2.createConsumer(this.queueA);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(false));
        }
        createSession2.start();
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
        }
        Queue bindable = createServer.getPostOffice().getBinding(this.queueA).getBindable();
        Assertions.assertEquals(100, bindable.getDeliveringCount());
        createSession2.rollback();
        for (int i3 = 0; i3 < 100; i3++) {
            ClientMessage receive2 = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive2);
            receive2.acknowledge();
        }
        Assertions.assertEquals(100, bindable.getDeliveringCount());
        createSession2.close();
        createSession.close();
    }

    @Test
    public void testReceiveWithRollbackMultipleConsumersDifferentQueues() throws Exception {
        ActiveMQServer createServer = createServer(false);
        createServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator());
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(QueueConfiguration.of(this.queueA).setAddress(this.addressA).setDurable(false));
        createSession.createQueue(QueueConfiguration.of(this.queueB).setAddress(this.addressB).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.addressA);
        ClientProducer createProducer2 = createSession.createProducer(this.addressB);
        ClientConsumer createConsumer = createSession2.createConsumer(this.queueA);
        ClientConsumer createConsumer2 = createSession2.createConsumer(this.queueB);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(false));
            createProducer2.send(createSession.createMessage(false));
        }
        createSession2.start();
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            ClientMessage receive2 = createConsumer2.receive(5000L);
            Assertions.assertNotNull(receive2);
            receive2.acknowledge();
        }
        Queue queue = (Queue) createServer.getPostOffice().getBinding(this.queueA).getBindable();
        Queue bindable = createServer.getPostOffice().getBinding(this.queueB).getBindable();
        Assertions.assertEquals(100, queue.getDeliveringCount());
        createConsumer.close();
        createConsumer2.close();
        createSession2.rollback();
        Assertions.assertEquals(0, bindable.getDeliveringCount());
        Assertions.assertEquals(100, getMessageCount(queue));
        Assertions.assertEquals(0, bindable.getDeliveringCount());
        Assertions.assertEquals(100, getMessageCount(queue));
        createSession.close();
        createSession2.close();
    }

    @Test
    public void testAsyncConsumerCommit() throws Exception {
        ActiveMQServer createServer = createServer(false);
        createServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0));
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory.createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.queueA).setAddress(this.addressA).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.addressA);
        ClientConsumer createConsumer = createSession2.createConsumer(this.queueA);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(false));
        }
        CountDownLatch countDownLatch = new CountDownLatch(100);
        createSession2.start();
        createConsumer.setMessageHandler(clientMessage -> {
            try {
                clientMessage.acknowledge();
            } catch (ActiveMQException e) {
                try {
                    createSession2.close();
                } catch (ActiveMQException e2) {
                    e2.printStackTrace();
                }
            }
            countDownLatch.countDown();
        });
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Queue queue = (Queue) createServer.getPostOffice().getBinding(this.queueA).getBindable();
        Assertions.assertEquals(100, queue.getDeliveringCount());
        Assertions.assertEquals(100, getMessageCount(queue));
        createSession2.commit();
        Assertions.assertEquals(0, queue.getDeliveringCount());
        Assertions.assertEquals(0, getMessageCount(queue));
        createSession.close();
        createSession2.close();
    }

    @Test
    public void testAsyncConsumerRollback() throws Exception {
        ActiveMQServer createServer = createServer(false);
        createServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0));
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory.createSession(false, true, false);
        createSession.createQueue(QueueConfiguration.of(this.queueA).setAddress(this.addressA).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.addressA);
        ClientConsumer createConsumer = createSession2.createConsumer(this.queueA);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(false));
        }
        CountDownLatch countDownLatch = new CountDownLatch(100);
        createSession2.start();
        createConsumer.setMessageHandler(new ackHandler(createSession2, countDownLatch));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Queue queue = (Queue) createServer.getPostOffice().getBinding(this.queueA).getBindable();
        Assertions.assertEquals(100, queue.getDeliveringCount());
        Assertions.assertEquals(100, getMessageCount(queue));
        createSession2.stop();
        createSession2.rollback();
        Assertions.assertEquals(0, queue.getDeliveringCount());
        Assertions.assertEquals(100, getMessageCount(queue));
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        createConsumer.setMessageHandler(new ackHandler(createSession2, countDownLatch2));
        createSession2.start();
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        createSession.close();
        createSession2.close();
        createSessionFactory.close();
    }
}
