package org.apache.activemq.artemis.tests.integration.client;

import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageConsumerRollbackTest.class */
public class MessageConsumerRollbackTest extends ActiveMQTestBase {
    ActiveMQServer server;
    ServerLocator locator;
    ClientSessionFactory sf;
    private static final String inQueue = "inqueue";
    private static final String outQueue = "outQueue";

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageConsumerRollbackTest$LocalConsumer.class */
    private class LocalConsumer implements MessageHandler {
        boolean rollbackFirstMessage = true;
        ServerLocator consumerLocator;
        ClientSessionFactory factoryLocator;
        ClientSession session;
        ClientConsumer consumer;
        ClientProducer producer;
        AtomicInteger counter;
        CountDownLatch commitLatch;

        public LocalConsumer(AtomicInteger atomicInteger, CountDownLatch countDownLatch) {
            this.counter = atomicInteger;
            this.commitLatch = countDownLatch;
        }

        public void stop() throws Exception {
            this.session.close();
            this.factoryLocator.close();
            this.consumerLocator.close();
        }

        public void start() throws Exception {
            this.consumerLocator = MessageConsumerRollbackTest.this.createNettyNonHALocator();
            this.factoryLocator = MessageConsumerRollbackTest.this.createSessionFactory(this.consumerLocator);
            this.session = this.factoryLocator.createTransactedSession();
            this.consumer = this.session.createConsumer(MessageConsumerRollbackTest.inQueue);
            this.producer = this.session.createProducer(MessageConsumerRollbackTest.outQueue);
            this.consumer.setMessageHandler(this);
            this.session.start();
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                clientMessage.acknowledge();
                ClientMessage createMessage = this.session.createMessage(true);
                createMessage.putIntProperty("out_msg", clientMessage.getIntProperty("msg").intValue());
                this.producer.send(createMessage);
                if (this.rollbackFirstMessage) {
                    this.session.rollback();
                    this.rollbackFirstMessage = false;
                    return;
                }
                if (this.counter.incrementAndGet() % 200 == 0) {
                    System.out.println("rollback " + clientMessage);
                    this.session.rollback();
                } else {
                    this.commitLatch.countDown();
                    this.session.commit();
                }
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    this.session.rollback();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(true, true);
        this.server.getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedeliveryDelay(100L));
        this.server.start();
        this.locator = createNettyNonHALocator();
        this.sf = createSessionFactory(this.locator);
        ClientSession createTransactedSession = this.sf.createTransactedSession();
        createTransactedSession.createQueue(inQueue, inQueue, true);
        createTransactedSession.createQueue(outQueue, outQueue, true);
        createTransactedSession.close();
    }

    @Test
    public void testRollbackMultipleConsumers() throws Exception {
        ClientSession createTransactedSession = this.sf.createTransactedSession();
        sendMessages(3000, createTransactedSession);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(3000);
        LocalConsumer[] localConsumerArr = new LocalConsumer[10];
        for (int i = 0; i < 10; i++) {
            localConsumerArr[i] = new LocalConsumer(atomicInteger, countDownLatch);
            localConsumerArr[i].start();
        }
        countDownLatch.await(2L, TimeUnit.MINUTES);
        for (LocalConsumer localConsumer : localConsumerArr) {
            localConsumer.stop();
        }
        ClientConsumer createConsumer = createTransactedSession.createConsumer(outQueue);
        createTransactedSession.start();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 3000; i2++) {
            ClientMessage receive = createConsumer.receive(1000L);
            assertNotNull(receive);
            int intValue = receive.getIntProperty("out_msg").intValue();
            receive.acknowledge();
            assertFalse("msg " + intValue + " received in duplicate", hashSet.contains(Integer.valueOf(intValue)));
            hashSet.add(Integer.valueOf(intValue));
        }
        assertNull(createConsumer.receiveImmediate());
        for (int i3 = 0; i3 < 3000; i3++) {
            assertTrue(hashSet.contains(Integer.valueOf(i3)));
        }
        assertEquals(3000, hashSet.size());
        createTransactedSession.close();
    }

    private void sendMessages(int i, ClientSession clientSession) throws Exception {
        ClientProducer createProducer = clientSession.createProducer(inQueue);
        for (int i2 = 0; i2 < i; i2++) {
            ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage(clientSession);
            activeMQTextMessage.setIntProperty("msg", i2);
            activeMQTextMessage.setText("Message Number (" + i2 + ")");
            activeMQTextMessage.doBeforeSend();
            createProducer.send(activeMQTextMessage.getCoreMessage());
        }
        clientSession.commit();
    }
}
