package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Collections;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.ha.PrimaryOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPClusterReplicaTest.class */
public class AMQPClusterReplicaTest extends AmqpClientTestSupport {
    protected static final int NODE_1_PORT = 5673;
    protected static final int NODE_2_PORT = 5674;

    @Test
    public void testReplicaWithCluster() throws Exception {
        ActiveMQServer createNode1 = createNode1(MessageLoadBalancingType.ON_DEMAND);
        ActiveMQServer createNode2 = createNode2(MessageLoadBalancingType.ON_DEMAND);
        this.server.start();
        createNode1.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)));
        createNode1.start();
        createNode2.start();
        configureAddressAndQueue(createNode1);
        configureAddressAndQueue(createNode2);
        waitForTopology(createNode1, 2);
        waitForTopology(createNode2, 2);
        ClientSession addClientSession = addClientSession(addSessionFactory(getNode1ServerLocator().createSessionFactory()).createSession());
        sendMessages(addClientSession, addClientProducer(addClientSession.createProducer(AutoCreateJmsDestinationTest.QUEUE_NAME)), 10);
        ClientSession addClientSession2 = addClientSession(addSessionFactory(getNode2ServerLocator().createSessionFactory()).createSession());
        addClientSession2.start();
        receiveMessages(addClientConsumer(addClientSession2.createConsumer(AutoCreateJmsDestinationTest.QUEUE_NAME)), 0, 10, true);
        Wait.waitFor(() -> {
            return createNode1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror").getMessageCount() == 0;
        });
        Wait.assertEquals(0L, () -> {
            return this.server.locateQueue(AutoCreateJmsDestinationTest.QUEUE_NAME).getMessageCount();
        });
    }

    @Test
    public void testReplicaWithClusterTargetStrict() throws Exception {
        ActiveMQServer createNode1 = createNode1(MessageLoadBalancingType.STRICT);
        ActiveMQServer createNode2 = createNode2(MessageLoadBalancingType.STRICT);
        this.server.stop();
        this.server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("mirror1", "tcp://localhost:5673").setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)));
        this.server.start();
        createNode1.start();
        createNode2.start();
        configureAddressAndQueue(createNode1);
        configureAddressAndQueue(this.server);
        configureAddressAndQueue(createNode2);
        waitForTopology(createNode1, 2);
        waitForTopology(createNode2, 2);
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("hello"));
        }
        Queue locateQueue = createNode1.locateQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(10L, locateQueue::getMessageCount, 5000L, 10L);
        createConnection.close();
    }

    private ServerLocator getNode1ServerLocator() throws Exception {
        return addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:5673"));
    }

    private ServerLocator getNode2ServerLocator() throws Exception {
        return addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:5674"));
    }

    private ActiveMQServer createNode1(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        ActiveMQServer createServer = createServer(NODE_1_PORT, false);
        ClusterConnectionConfiguration staticConnectors = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("node1").setMessageLoadBalancingType(messageLoadBalancingType).setStaticConnectors(Collections.singletonList("node2"));
        createServer.setIdentity("node_1");
        createServer.getConfiguration().setName("node_1").setHAPolicyConfiguration(new PrimaryOnlyPolicyConfiguration()).addConnectorConfiguration("node1", "tcp://localhost:5673").addConnectorConfiguration("node2", "tcp://localhost:5674").addClusterConfiguration(staticConnectors);
        return createServer;
    }

    private ActiveMQServer createNode2(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        ActiveMQServer createServer = createServer(NODE_2_PORT, false);
        ClusterConnectionConfiguration staticConnectors = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("node2").setMessageLoadBalancingType(messageLoadBalancingType).setStaticConnectors(Collections.singletonList("node1"));
        createServer.setIdentity("node_2");
        createServer.getConfiguration().setName("node_2").setHAPolicyConfiguration(new PrimaryOnlyPolicyConfiguration()).addConnectorConfiguration("node1", "tcp://localhost:5673").addConnectorConfiguration("node2", "tcp://localhost:5674").addClusterConfiguration(staticConnectors);
        return createServer;
    }

    private void configureAddressAndQueue(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.addAddressInfo(new AddressInfo(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        activeMQServer.getAddressSettingsRepository().addMatch(AutoCreateJmsDestinationTest.QUEUE_NAME, new AddressSettings().setRedistributionDelay(0L));
        activeMQServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }
}
