package org.apache.activemq.artemis.tests.smoke.crossprotocol;

import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/smoke/crossprotocol/MultiThreadConvertTest.class */
public class MultiThreadConvertTest extends SmokeTestBase {
    private static final String SERVER_NAME_0 = "standard";
    private static final Logger LOG = LoggerFactory.getLogger(MultiThreadConvertTest.class);

    @Before
    public void before() throws Exception {
        cleanupData(SERVER_NAME_0);
        disableCheckThread();
        startServer(SERVER_NAME_0, 0, 30000);
    }

    protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer activeMQServer, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("port", String.valueOf(i));
        hashMap.put("protocols", "AMQP");
        return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, hashMap, "netty-amqp-acceptor", new HashMap());
    }

    public String getTopicName() {
        return "test-topic-1";
    }

    @After
    public void tearDown() throws Exception {
        super.tearDown();
    }

    @Test(timeout = 60000)
    public void testSendLotsOfDurableMessagesOnTopicWithManySubscribersPersistent() throws Exception {
        doTestSendLotsOfDurableMessagesOnTopicWithManySubscribers(2);
    }

    @Test(timeout = 60000)
    public void testSendLotsOfDurableMessagesOnTopicWithManySubscribersNonPersistent() throws Exception {
        doTestSendLotsOfDurableMessagesOnTopicWithManySubscribers(1);
    }

    private void doTestSendLotsOfDurableMessagesOnTopicWithManySubscribers(int i) throws Exception {
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection createConnection = jmsConnectionFactory.createConnection();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        try {
            CountDownLatch countDownLatch = new CountDownLatch(4);
            CountDownLatch countDownLatch2 = new CountDownLatch(1600);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            for (int i2 = 0; i2 < 4; i2++) {
                newFixedThreadPool.execute(() -> {
                    Connection connection = null;
                    try {
                        try {
                            connection = activeMQConnectionFactory.createConnection();
                            Session createSession = connection.createSession(false, 1);
                            MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(getTopicName()));
                            countDownLatch.countDown();
                            connection.start();
                            for (int i3 = 0; i3 < 400; i3++) {
                                Message receive = createConsumer.receive(TimeUnit.SECONDS.toMillis(5L));
                                countDownLatch2.countDown();
                                if (receive.getJMSDeliveryMode() != i) {
                                    throw new IllegalStateException("Message durability state is not corret.");
                                }
                            }
                            try {
                                connection.close();
                            } catch (Throwable th) {
                            }
                        } catch (Throwable th2) {
                            try {
                                connection.close();
                            } catch (Throwable th3) {
                            }
                            throw th2;
                        }
                    } catch (Throwable th4) {
                        LOG.error("Error during message consumption: ", th4);
                        atomicBoolean.set(true);
                        try {
                            connection.close();
                        } catch (Throwable th5) {
                        }
                    }
                });
            }
            assertTrue("Receivers didn't signal ready", countDownLatch.await(10L, TimeUnit.SECONDS));
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(getTopicName()));
            createProducer.setDeliveryMode(i);
            for (int i3 = 0; i3 < 400; i3++) {
                TextMessage createTextMessage = createSession.createTextMessage("test");
                createTextMessage.setJMSCorrelationID(UUID.randomUUID().toString());
                createProducer.send(createTextMessage);
            }
            assertTrue("did not read all messages, waiting on: " + countDownLatch2.getCount(), countDownLatch2.await(30L, TimeUnit.SECONDS));
            assertFalse("should not be any errors on receive", atomicBoolean.get());
        } finally {
            try {
                createConnection.close();
            } catch (Exception e) {
            }
            newFixedThreadPool.shutdown();
            activeMQConnectionFactory.close();
        }
    }
}
