package org.apache.activemq.artemis.tests.integration.client;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.class */
public class JMSOrderTest extends JMSTestBase {
    String protocol;
    boolean exclusive;
    ConnectionFactory protocolCF;

    public JMSOrderTest(String str, boolean z) {
        this.protocol = str;
        this.exclusive = z;
    }

    @BeforeEach
    public void setupCF() {
        this.protocolCF = CFUtil.createConnectionFactory(this.protocol, SimpleManagementTest.LOCALHOST);
    }

    @Parameters(name = "protocol={0}&exclusive={1}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{"AMQP", true}, new Object[]{"AMQP", false}, new Object[]{"OPENWIRE", true}, new Object[]{"OPENWIRE", false}, new Object[]{"CORE", true}, new Object[]{"CORE", false});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.util.JMSTestBase
    public void extraServerConfig(ActiveMQServer activeMQServer) {
        if (this.exclusive) {
            activeMQServer.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(SimpleString.of("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true));
        }
    }

    protected void sendToAmqQueue(int i) throws Exception {
        Connection createConnection = this.protocolCF.createConnection();
        sendMessages(createConnection, (Destination) createConnection.createSession(false, 1).createQueue(this.name), i);
        createConnection.close();
    }

    public void sendMessages(Connection connection, Destination destination, int i) throws Exception {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(destination);
        for (int i2 = 1; i2 <= i; i2++) {
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("TextMessage: " + i2);
            createTextMessage.setIntProperty("nr", i2);
            createProducer.send(createTextMessage);
        }
        createSession.close();
    }

    @Timeout(60)
    @TestTemplate
    public void testReceiveSomeThenRollback() throws Exception {
        Connection createConnection = this.protocolCF.createConnection();
        try {
            createConnection.start();
            sendToAmqQueue(5);
            Session createSession = createConnection.createSession(true, 0);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.name));
            for (int i = 1; i <= 2; i++) {
                Message receive = createConsumer.receive(3000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals(i, receive.getIntProperty("nr"), "Unexpected message number");
            }
            createSession.rollback();
            ArrayList arrayList = new ArrayList();
            for (int i2 = 1; i2 <= 5; i2++) {
                Message receive2 = createConsumer.receive(3000L);
                Assertions.assertNotNull(receive2, "Failed to receive message: " + i2);
                arrayList.add(Integer.valueOf(receive2.getIntProperty("nr")));
            }
            createSession.commit();
            Assertions.assertEquals(5, arrayList.size(), "Unexpected size of list");
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                Assertions.assertEquals(Integer.valueOf(i3 + 1), (Integer) arrayList.get(i3), "Unexpected order of messages: " + arrayList);
            }
        } finally {
            createConnection.close();
        }
    }

    @Timeout(60)
    @TestTemplate
    public void testReceiveSomeThenClose() throws Exception {
        Connection createConnection = this.protocolCF.createConnection();
        try {
            createConnection.start();
            sendToAmqQueue(5);
            Session createSession = createConnection.createSession(true, 0);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.name));
            for (int i = 1; i <= 2; i++) {
                Message receive = createConsumer.receive(3000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals(i, receive.getIntProperty("nr"), "Unexpected message number");
            }
            createSession.close();
            Session createSession2 = createConnection.createSession(true, 0);
            MessageConsumer createConsumer2 = createSession2.createConsumer(createSession2.createQueue(this.name));
            ArrayList arrayList = new ArrayList();
            for (int i2 = 1; i2 <= 5; i2++) {
                Message receive2 = createConsumer2.receive(3000L);
                Assertions.assertNotNull(receive2, "Failed to receive message: " + i2);
                arrayList.add(Integer.valueOf(receive2.getIntProperty("nr")));
            }
            createSession2.commit();
            Assertions.assertEquals(5, arrayList.size(), "Unexpected size of list");
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                Assertions.assertEquals(Integer.valueOf(i3 + 1), (Integer) arrayList.get(i3), "Unexpected order of messages: " + arrayList);
            }
        } finally {
            createConnection.close();
        }
    }

    protected void sendToAmqQueueOutOfOrder(int i) throws Exception {
        Connection createConnection = this.protocolCF.createConnection();
        Queue createQueue = createConnection.createSession(false, 1).createQueue(this.name);
        for (int i2 = 1; i2 <= i; i2 += 2) {
            Session createSession = createConnection.createSession(true, 0);
            Session createSession2 = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createQueue);
            MessageProducer createProducer2 = createSession2.createProducer(createQueue);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("TextMessage: " + i2 + "1");
            createTextMessage.setIntProperty("nr", i2 + 1);
            createProducer.send(createTextMessage);
            TextMessage createTextMessage2 = createSession2.createTextMessage();
            createTextMessage2.setText("TextMessage: " + i2);
            createTextMessage2.setIntProperty("nr", i2);
            createProducer2.send(createTextMessage2);
            createSession2.commit();
            createSession.commit();
            createSession.close();
            createSession2.close();
        }
        createConnection.close();
    }

    @Timeout(30)
    @TestTemplate
    public void testReceiveOutOfOrderProducers() throws Exception {
        Connection createConnection = this.protocolCF.createConnection();
        try {
            createConnection.start();
            sendToAmqQueueOutOfOrder(4);
            Session createSession = createConnection.createSession(true, 0);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.name));
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i <= 2; i++) {
                Message receive = createConsumer.receive(3000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals(i, receive.getIntProperty("nr"), "Unexpected message number");
                arrayList.add(Integer.valueOf(receive.getIntProperty("nr")));
            }
            for (int i2 = 0; i2 < arrayList.size(); i2++) {
                Assertions.assertEquals(Integer.valueOf(i2 + 1), (Integer) arrayList.get(i2), "Unexpected order of messages: " + arrayList);
            }
            createSession.close();
            Session createSession2 = createConnection.createSession(true, 0);
            MessageConsumer createConsumer2 = createSession2.createConsumer(createSession2.createQueue(this.name));
            ArrayList arrayList2 = new ArrayList();
            for (int i3 = 1; i3 <= 4; i3++) {
                Message receive2 = createConsumer2.receive(3000L);
                Assertions.assertNotNull(receive2, "Failed to receive message: " + i3);
                arrayList2.add(Integer.valueOf(receive2.getIntProperty("nr")));
            }
            createSession2.commit();
            Assertions.assertEquals(4, arrayList2.size(), "Unexpected size of list");
            for (int i4 = 0; i4 < arrayList2.size(); i4++) {
                Assertions.assertEquals(Integer.valueOf(i4 + 1), (Integer) arrayList2.get(i4), "Unexpected order of messages: " + arrayList2);
            }
        } finally {
            createConnection.close();
        }
    }

    @TestTemplate
    public void testMultipleConsumersRollback() throws Exception {
        internalMultipleConsumers(true);
    }

    @TestTemplate
    public void testMultipleConsumersClose() throws Exception {
        internalMultipleConsumers(false);
    }

    private void internalMultipleConsumers(boolean z) throws Exception {
        org.apache.activemq.artemis.core.server.Queue createQueue = this.server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(false));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(this.protocol, SimpleManagementTest.LOCALHOST);
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue2 = createSession.createQueue(getName());
            MessageProducer createProducer = createSession.createProducer(createQueue2);
            for (int i = 0; i < 100; i++) {
                TextMessage createTextMessage = createSession.createTextMessage("test " + i);
                createTextMessage.setIntProperty("i", i);
                createProducer.send(createTextMessage);
            }
            if (createConnection != null) {
                createConnection.close();
            }
            Objects.requireNonNull(createQueue);
            Wait.assertEquals(100, createQueue::getMessageCount);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Runnable runnable = () -> {
                try {
                    Connection createConnection2 = createConnectionFactory.createConnection();
                    try {
                        Session createSession2 = createConnection2.createSession(true, 0);
                        MessageConsumer createConsumer = createSession2.createConsumer(createQueue2);
                        createConnection2.start();
                        int i2 = 0;
                        while (atomicBoolean.get()) {
                            if (createConsumer.receive(500L) == null) {
                                if (createConnection2 != null) {
                                    createConnection2.close();
                                    return;
                                }
                                return;
                            } else if (z) {
                                createSession2.rollback();
                                i2++;
                                if (i2 >= 3) {
                                    break;
                                }
                            }
                        }
                        if (createConnection2 != null) {
                            createConnection2.close();
                        }
                    } finally {
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                    atomicInteger.incrementAndGet();
                    atomicBoolean.set(false);
                }
            };
            Thread[] threadArr = new Thread[3];
            for (int i2 = 0; i2 < 3; i2++) {
                threadArr[i2] = new Thread(runnable, "consumer " + i2);
                threadArr[i2].start();
            }
            for (Thread thread : threadArr) {
                thread.join();
            }
            Assertions.assertEquals(0, atomicInteger.get());
            Objects.requireNonNull(createQueue);
            Wait.assertEquals(100, createQueue::getMessageCount);
            createConnection = createConnectionFactory.createConnection();
            try {
                MessageConsumer createConsumer = createConnection.createSession(true, 0).createConsumer(createQueue2);
                createConnection.start();
                for (int i3 = 0; i3 < 100; i3++) {
                    TextMessage receive = createConsumer.receive(1000L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertEquals(i3, receive.getIntProperty("i"));
                }
                Assertions.assertNull(createConsumer.receiveNoWait());
                if (createConnection != null) {
                    createConnection.close();
                }
            } finally {
            }
        } finally {
        }
    }
}
