package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/AMQPMessageLoadBalancingTest.class */
public class AMQPMessageLoadBalancingTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        start();
    }

    private void start() throws Exception {
        setupServers();
        setRedistributionDelay(0L);
    }

    protected boolean isNetty() {
        return true;
    }

    @Test
    public void testLoadBalanceAMQP() throws Exception {
        setupCluster(MessageLoadBalancingType.STRICT);
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
        createQueue(1, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
        waitForBindings(0, "queues.0", 1, 0, true);
        waitForBindings(1, "queues.0", 1, 0, true);
        waitForBindings(0, "queues.0", 1, 0, false);
        waitForBindings(1, "queues.0", 1, 0, false);
        Connection createConnection = new JmsConnectionFactory("amqp://localhost:61616").createConnection();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("queues.0"));
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("hello " + i));
        }
        createSession.commit();
        createConnection.close();
        receiveOnBothNodes(100);
        ClientSession createSession2 = this.sfs[0].createSession();
        ClientProducer createProducer2 = createSession2.createProducer("queues.0");
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage putIntProperty = createSession2.createMessage((byte) 7, true).putIntProperty("i", i2);
            putIntProperty.getBodyBuffer().writeString("hello!");
            createProducer2.send(putIntProperty);
            createProducer2.send(createSession2.createMessage((byte) 7, true));
        }
        receiveOnBothNodes(200);
    }

    private void receiveOnBothNodes(int i) throws ActiveMQException {
        for (int i2 = 0; i2 <= 1; i2++) {
            ClientSession createSession = this.sfs[i2].createSession();
            ClientConsumer createConsumer = createSession.createConsumer("queues.0");
            createSession.start();
            for (int i3 = 0; i3 < i / 2; i3++) {
                ClientMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                receive.acknowledge();
            }
            Assertions.assertNull(createConsumer.receiveImmediate());
            createSession.commit();
            createSession.close();
        }
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
    }

    protected void setRedistributionDelay(long j) {
        AddressSettings redistributionDelay = new AddressSettings().setRedistributionDelay(j);
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        this.servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        this.servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        stopServers(0, 1);
        clearServer(0, 1);
    }
}
