package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ExecuteUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.class */
public class QpidDispatchPeerTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    ExecuteUtil.ProcessHolder qpidProcess;

    @BeforeClass
    public static void validateqdrotuer() {
        try {
            Assume.assumeTrue("qdrouterd does not exist", ExecuteUtil.runCommand(true, new String[]{"qdrouterd", "--version"}) == 0);
        } catch (Exception e) {
            logger.debug(e.getMessage(), e);
            Assume.assumeNoException("qdrouterd does not exist", e);
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        ActiveMQServer createServer = createServer(5672, false);
        createServer.getConfiguration().setNetworkCheckPeriod(100L);
        return createServer;
    }

    @Before
    public void startQpidRouter() throws Exception {
        this.qpidProcess = ExecuteUtil.run(true, new String[]{"qdrouterd", "-c", getClass().getClassLoader().getResource("QpidRouterPeerTest-qpidr.conf").getFile()});
    }

    @After
    public void stopQpidRouter() throws Exception {
        this.qpidProcess.kill();
    }

    public void pauseThenKill(int i) throws Exception {
        long pid = this.qpidProcess.pid();
        Assert.assertEquals(0L, ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Long.toString(pid)}));
        logger.info("\n{}\nPaused\n{}", "*".repeat(127), "*".repeat(127));
        Thread.sleep(i);
        Assert.assertEquals(0L, ExecuteUtil.runCommand(true, new String[]{"kill", "-9", Long.toString(pid)}));
    }

    @Test(timeout = 60000)
    public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
        internalMultipleQueues(true, true, true, false, false);
    }

    @Test(timeout = 60000)
    public void testWithMatchingDifferentNamesOnQueueKillMaxAttempts() throws Exception {
        internalMultipleQueues(true, true, true, false, true);
    }

    @Test(timeout = 60000)
    public void testWithMatchingDifferentNamesOnQueuePauseMaxAttempts() throws Exception {
        internalMultipleQueues(true, true, false, true, false);
    }

    @Test(timeout = 60000)
    public void testWithMatchingDifferentNamesOnQueuePause() throws Exception {
        internalMultipleQueues(true, true, false, true, false);
    }

    @Test(timeout = 60000)
    public void testWithMatchingDifferentNamesOnQueue() throws Exception {
        internalMultipleQueues(true, true, false, false, false);
    }

    @Test(timeout = 60000)
    public void testWithMatching() throws Exception {
        internalMultipleQueues(true, false, false, false, false);
    }

    @Test(timeout = 60000)
    public void testwithQueueName() throws Exception {
        internalMultipleQueues(false, false, false, false, false);
    }

    @Test(timeout = 60000)
    public void testwithQueueNameDistinctName() throws Exception {
        internalMultipleQueues(false, true, false, false, false);
    }

    private void internalMultipleQueues(boolean z, boolean z2, boolean z3, boolean z4, boolean z5) throws Exception {
        String str = "brokerConnection." + UUIDGenerator.getInstance().generateStringUUID();
        AMQPBrokerConnectConfiguration reconnectAttempts = new AMQPBrokerConnectConfiguration(str, "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(z5 ? 10 : -1);
        if (z) {
            reconnectAttempts.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
        } else {
            for (int i = 0; i < 10; i++) {
                reconnectAttempts.addElement(new AMQPBrokerConnectionElement().setQueueName(createQueueName(i, z2)).setType(AMQPBrokerConnectionAddressType.PEER));
            }
        }
        this.server.getConfiguration().addAMQPConnection(reconnectAttempts);
        this.server.start();
        for (int i2 = 0; i2 < 10; i2++) {
            this.server.addAddressInfo(new AddressInfo("queue.test" + i2).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(false));
            this.server.createQueue(new QueueConfiguration(createQueueName(i2, z2)).setAddress("queue.test" + i2).setRoutingType(RoutingType.ANYCAST));
        }
        for (int i3 = 0; i3 < 10; i3++) {
            Connection createConnectionDumbRetry = createConnectionDumbRetry(CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"));
            Session createSession = createConnectionDumbRetry.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("queue.test" + i3));
            createProducer.setDeliveryMode(1);
            Queue locateQueue = this.server.locateQueue(createQueueName(i3, z2));
            for (int i4 = 0; i4 < 100; i4++) {
                createProducer.send(createSession.createTextMessage("hello " + i4));
            }
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(100L, locateQueue::getMessageCount);
            createConnectionDumbRetry.close();
        }
        if (z3) {
            this.qpidProcess.kill();
            if (z5) {
                Thread.sleep(1000L);
            }
            startQpidRouter();
        } else if (z4) {
            pauseThenKill(3000);
            startQpidRouter();
        }
        if (z5) {
            createConnectionDumbRetry(CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622")).close();
            this.server.stopBrokerConnection(str);
            this.server.startBrokerConnection(str);
        }
        for (int i5 = 0; i5 < 10; i5++) {
            Connection createConnectionDumbRetry2 = createConnectionDumbRetry(CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"));
            Session createSession2 = createConnectionDumbRetry2.createSession(false, 1);
            MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue("queue.test" + i5));
            createConnectionDumbRetry2.start();
            for (int i6 = 0; i6 < 100; i6++) {
                try {
                    TextMessage receive = createConsumer.receive(5000L);
                    if (receive == null) {
                        System.out.println("*******************************************************************************************************************************");
                        System.out.println("qdstat after message timed out:");
                        ExecuteUtil.runCommand(true, new String[]{"qdstat", "-b", "127.0.0.1:24622", "-l"});
                        System.out.println("*******************************************************************************************************************************");
                    }
                    Assert.assertNotNull(receive);
                    Assert.assertEquals("hello " + i6, receive.getText());
                } catch (Throwable th) {
                    try {
                        createConnectionDumbRetry2.close();
                    } catch (Throwable th2) {
                    }
                    throw th;
                }
            }
            Assert.assertNull(createConsumer.receiveNoWait());
            try {
                createConnectionDumbRetry2.close();
            } catch (Throwable th3) {
            }
            Queue locateQueue2 = this.server.locateQueue(createQueueName(i5, z2));
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(0L, locateQueue2::getMessageCount);
        }
    }

    private String createQueueName(int i, boolean z) {
        return z ? "distinct.test" + i : "queue.test" + i;
    }

    private Connection createConnectionDumbRetry(ConnectionFactory connectionFactory) throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            try {
                return connectionFactory.createConnection();
            } catch (Exception e) {
                Thread.sleep(10L);
            }
        }
        return null;
    }
}
