package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/TopicClusteredOffTest.class */
public class TopicClusteredOffTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        start();
    }

    private void start() throws Exception {
        setupServers();
        setRedistributionDelay(0L);
    }

    protected boolean isNetty() {
        return true;
    }

    @Test
    public void testTopicRedistributionOff() throws Exception {
        internalTest(false);
    }

    @Test
    public void testTopicRedistributionOn() throws Exception {
        internalTest(true);
    }

    private void internalTest(boolean z) throws Exception {
        if (z) {
            setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
        } else {
            setupCluster(MessageLoadBalancingType.OFF);
        }
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.dist", "queue0", null, false);
        createQueue(1, "queues.dist", "queue1", null, false);
        waitForBindings(0, "queues.dist", 1, 0, true);
        waitForBindings(0, "queues.dist", 1, 0, false);
        waitForBindings(1, "queues.dist", 1, 0, true);
        waitForBindings(1, "queues.dist", 1, 0, false);
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("CORE", SimpleManagementTest.LOCALHOST);
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61617");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic("queues.dist"));
            for (int i = 0; i < 10; i++) {
                createProducer.send(createSession.createTextMessage("message" + i));
            }
            createSession.commit();
            createConnection.start();
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("queues.dist::queue0"));
            for (int i2 = 0; i2 < 10; i2++) {
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals("message" + i2, receive.getText());
            }
            createSession.rollback();
            if (createConnection != null) {
                createConnection.close();
            }
            createConnection = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection.createSession(true, 0);
                createConnection.start();
                MessageConsumer createConsumer2 = createSession2.createConsumer(createSession2.createTopic("queues.dist::queue1"));
                for (int i3 = 0; i3 < 10; i3++) {
                    TextMessage receive2 = createConsumer2.receive(5000L);
                    Assertions.assertNotNull(receive2);
                    Assertions.assertEquals("message" + i3, receive2.getText());
                }
                createSession2.rollback();
                if (createConnection != null) {
                    createConnection.close();
                }
                createQueue(1, "queues.dist", "queue0", null, false);
                waitForBindings(0, "queues.dist", 1, 0, true);
                waitForBindings(0, "queues.dist", 2, 0, false);
                waitForBindings(1, "queues.dist", 2, 0, true);
                waitForBindings(1, "queues.dist", 1, 0, false);
                createConnection = createConnectionFactory2.createConnection();
                try {
                    Session createSession3 = createConnection.createSession(false, 1);
                    createConnection.start();
                    MessageConsumer createConsumer3 = createSession3.createConsumer(createSession3.createTopic("queues.dist::queue0"));
                    if (z) {
                        for (int i4 = 0; i4 < 10; i4++) {
                            TextMessage receive3 = createConsumer3.receive(5000L);
                            Assertions.assertNotNull(receive3);
                            Assertions.assertEquals("message" + i4, receive3.getText());
                        }
                    } else {
                        Assertions.assertNull(createConsumer3.receive(100L), "Messages are being redistributed");
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
    }

    protected void setRedistributionDelay(long j) {
        AddressSettings redistributionDelay = new AddressSettings().setRedistributionDelay(j);
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        stopServers(0, 1);
        clearServer(0, 1);
    }
}
