package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.class */
public class ExtremeCancelsTest extends JMSClientTestSupport {
    private SimpleString anycastAddress = new SimpleString("theQueue");
    private boolean isAMQP;

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,OPENWIRE,CORE";
    }

    public ExtremeCancelsTest(boolean z) {
        this.isAMQP = z;
    }

    @Parameterized.Parameters(name = "{index}: isAMQP={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    @Test(timeout = 120000)
    public void testLotsOfCloseOpenConsumer() throws Exception {
        this.server.createQueue(new QueueConfiguration(this.anycastAddress).setRoutingType(RoutingType.ANYCAST));
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.amqp.ExtremeCancelsTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Connection createConnection = ExtremeCancelsTest.this.createCF().createConnection();
                    Session createSession = createConnection.createSession();
                    createConnection.start();
                    Queue createQueue = createSession.createQueue(ExtremeCancelsTest.this.anycastAddress.toString());
                    while (atomicBoolean.get()) {
                        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                        if (createConsumer.receive(100L) != null) {
                            createConsumer.close();
                        }
                    }
                    createConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicInteger.incrementAndGet();
                }
            }
        };
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = new Thread(runnable);
            threadArr[i].start();
        }
        Session createSession = createCF().createConnection().createSession();
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.anycastAddress.toString()));
        for (int i2 = 0; i2 < 500; i2++) {
            createProducer.send(createSession.createTextMessage("Hello guys " + i2));
        }
        atomicBoolean.set(false);
        for (Thread thread : threadArr) {
            thread.join();
        }
        Assert.assertEquals(0L, atomicInteger.get());
    }

    private ConnectionFactory createCF() {
        return this.isAMQP ? new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()) : new ActiveMQConnectionFactory("tcp://localhost:5672");
    }
}
