package org.apache.activemq.artemis.tests.integration.client;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.class */
public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
    private static final String SUB_NAME = "SubscriptionStressTest";
    ActiveMQServer server;

    @BeforeEach
    public void setServer() throws Exception {
    }

    @Test
    public void testCreateSubscriptionCoreNoFiles() throws Exception {
        internalTest("core", false, 5, 1000, false);
    }

    @Test
    public void testCreateSubscriptionAMQPNoFiles() throws Exception {
        internalTest("amqp", false, 5, 1000, false);
    }

    @Test
    public void testCreateSubscriptionCoreRealFiles() throws Exception {
        internalTest("core", true, 2, 200, false);
    }

    @Test
    public void testCreateSubscriptionAMQPRealFiles() throws Exception {
        internalTest("amqp", true, 2, 200, false);
    }

    @Test
    public void testCreateSubscriptionCoreRealFilesDurable() throws Exception {
        internalTest("core", true, 2, 200, true);
    }

    @Test
    public void testCreateSubscriptionAMQPRealFilesDurable() throws Exception {
        internalTest("amqp", true, 2, 200, true);
    }

    public void internalTest(String str, boolean z, int i, int i2, boolean z2) throws Exception {
        this.server = createServer(z, true);
        this.server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST));
        this.server.getConfiguration().addQueueConfiguration(QueueConfiguration.of("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST));
        this.server.start();
        CountDownLatch countDownLatch = new CountDownLatch(i);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.max(1, i));
        runAfter(() -> {
            newFixedThreadPool.shutdownNow();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, SimpleManagementTest.LOCALHOST);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(i + 1);
        for (int i3 = 0; i3 < i; i3++) {
            int i4 = i3;
            newFixedThreadPool.execute(() -> {
                try {
                    try {
                        cyclicBarrier.await(10L, TimeUnit.SECONDS);
                        for (int i5 = 0; i5 < i2; i5++) {
                            if (!atomicBoolean.get()) {
                                break;
                            }
                            Connection createConnection = createConnectionFactory.createConnection();
                            if (z2) {
                                createConnection.setClientID("t" + i4);
                            }
                            createConnection.start();
                            Session createSession = createConnection.createSession(false, 1);
                            Topic createTopic = createSession.createTopic(SUB_NAME);
                            TopicSubscriber createDurableSubscriber = z2 ? createSession.createDurableSubscriber(createTopic, "t" + i4) : createSession.createConsumer(createTopic);
                            Message receiveNoWait = createDurableSubscriber.receiveNoWait();
                            if (receiveNoWait != null) {
                                receiveNoWait.acknowledge();
                            }
                            createDurableSubscriber.close();
                            if (z2) {
                                createSession.unsubscribe("t" + i4);
                            }
                            createConnection.close();
                        }
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        atomicInteger.incrementAndGet();
                        countDownLatch.countDown();
                    }
                } catch (Throwable th2) {
                    countDownLatch.countDown();
                    throw th2;
                }
            });
        }
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Queue locateQueue = this.server.locateQueue("Sub_1");
        Assertions.assertNotNull(locateQueue);
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createTopic(SUB_NAME));
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("SubscriptionStressTest::Sub_1"));
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        for (int i5 = 0; i5 < i2; i5++) {
            try {
                createProducer.send(createSession.createTextMessage("a"));
                Assertions.assertNotNull(createConsumer.receive(5000L));
            } catch (Throwable th) {
                atomicBoolean.set(false);
                Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                throw th;
            }
        }
        createConnection.close();
        atomicBoolean.set(false);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        Wait.assertEquals(0, this::countAddMessage, 5000L, 100L);
        PagingStore pagingStore = locateQueue.getPagingStore();
        Objects.requireNonNull(pagingStore);
        Wait.assertEquals(0L, pagingStore::getAddressSize, 2000L, 100L);
        Assertions.assertEquals(0, atomicInteger.get());
    }

    int countAddMessage() throws Exception {
        JournalStorageManager storageManager = this.server.getStorageManager();
        if (!(storageManager instanceof JournalStorageManager)) {
            return 0;
        }
        storageManager.getMessageJournal().scheduleCompactAndBlock(5000);
        AtomicInteger atomicInteger = countJournal(this.server.getConfiguration()).get(45);
        if (atomicInteger == null) {
            return 0;
        }
        return atomicInteger.get();
    }
}
