package org.apache.activemq.artemis.tests.integration.cluster.topology;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/topology/TopologyClusterTestBase.class */
public abstract class TopologyClusterTestBase extends ClusterTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/topology/TopologyClusterTestBase$LatchListener.class */
    private static final class LatchListener implements ClusterTopologyListener {
        private final CountDownLatch upLatch;
        private final List<String> nodes;
        private final CountDownLatch downLatch;
        private final List<String> seenUp = new ArrayList();

        private LatchListener(CountDownLatch countDownLatch, List<String> list, CountDownLatch countDownLatch2) {
            this.upLatch = countDownLatch;
            this.nodes = list;
            this.downLatch = countDownLatch2;
        }

        public synchronized void nodeUP(TopologyMember topologyMember, boolean z) {
            String nodeId = topologyMember.getNodeId();
            if (this.seenUp.contains(nodeId)) {
                return;
            }
            this.nodes.add(nodeId);
            this.seenUp.add(nodeId);
            this.upLatch.countDown();
        }

        public synchronized void nodeDown(long j, String str) {
            if (this.nodes.contains(str)) {
                this.nodes.remove(str);
                this.downLatch.countDown();
            }
        }
    }

    protected abstract ServerLocator createHAServerLocator();

    protected abstract void setupServers() throws Exception;

    protected abstract void setupCluster() throws Exception;

    protected abstract boolean isNetty() throws Exception;

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        setupServers();
        setupCluster();
    }

    protected void checkOrder(int[] iArr, String[] strArr, List<String> list) {
        Assertions.assertEquals(iArr.length, list.size());
        for (int i = 0; i < iArr.length; i++) {
            Assertions.assertEquals(strArr[iArr[i]], list.get(i), "did not receive expected nodeID at " + i);
        }
    }

    protected void checkContains(int[] iArr, String[] strArr, List<String> list) {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            if (iArr.length == list.size()) {
                boolean z = true;
                for (int i : iArr) {
                    z = z && list.contains(strArr[i]);
                }
                if (z) {
                    return;
                }
            }
        } while (System.currentTimeMillis() - currentTimeMillis < 5000);
        Assertions.fail("did not contain all expected node ID: " + list);
    }

    protected String[] getNodeIDs(int... iArr) {
        String[] strArr = new String[iArr.length];
        for (int i = 0; i < iArr.length; i++) {
            strArr[i] = this.servers[i].getNodeID().toString();
        }
        return strArr;
    }

    protected ClientSession checkSessionOrReconnect(ClientSession clientSession, ServerLocator serverLocator) throws Exception {
        try {
            String randomString = RandomUtil.randomString();
            clientSession.createQueue(QueueConfiguration.of(randomString));
            clientSession.deleteQueue(randomString);
            return clientSession;
        } catch (ActiveMQUnBlockedException e) {
            return createSessionFactory(serverLocator).createSession();
        } catch (ActiveMQObjectClosedException e2) {
            return createSessionFactory(serverLocator).createSession();
        }
    }

    protected void waitForClusterConnections(int i, int i2) throws Exception {
        int i3;
        ActiveMQServer activeMQServer = this.servers[i];
        if (activeMQServer == null) {
            throw new IllegalArgumentException("No server at " + i);
        }
        ClusterManager clusterManager = activeMQServer.getClusterManager();
        long currentTimeMillis = System.currentTimeMillis();
        do {
            i3 = 0;
            for (ClusterConnection clusterConnection : clusterManager.getClusterConnections()) {
                Iterator it = clusterConnection.getNodes().keySet().iterator();
                while (it.hasNext()) {
                    if (clusterConnection.isNodeActive((String) it.next())) {
                        i3++;
                    }
                }
            }
            if (i3 == i2) {
                return;
            } else {
                Thread.sleep(10L);
            }
        } while (System.currentTimeMillis() - currentTimeMillis < AmqpConnection.DEFAULT_CLOSE_TIMEOUT);
        logger.error(clusterDescription(this.servers[i]));
        Assertions.assertEquals(i2, i3, "Timed out waiting for cluster connections for server " + i);
    }

    @Test
    public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws Throwable {
        startServers(0);
        ServerLocator createHAServerLocator = createHAServerLocator();
        createHAServerLocator.getTopology().setOwner("testReceive");
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(4);
        createHAServerLocator.addClusterTopologyListener(new LatchListener(countDownLatch, synchronizedList, countDownLatch2));
        ClientSessionFactory createSessionFactory = createSessionFactory(createHAServerLocator);
        startServers(1, 4, 3, 2);
        String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Was not notified that all servers are UP");
        checkContains(new int[]{0, 1, 4, 3, 2}, nodeIDs, synchronizedList);
        waitForClusterConnections(0, 4);
        waitForClusterConnections(1, 4);
        waitForClusterConnections(2, 4);
        waitForClusterConnections(3, 4);
        waitForClusterConnections(4, 4);
        stopServers(2, 3, 1, 4);
        Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "Was not notified that all servers are DOWN");
        checkContains(new int[]{0}, nodeIDs, synchronizedList);
        createSessionFactory.close();
        createHAServerLocator.close();
        stopServers(0);
    }

    @Test
    public void testReceiveNotifications() throws Throwable {
        startServers(0, 1, 2, 3, 4);
        String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
        ServerLocator createHAServerLocator = createHAServerLocator();
        waitForClusterConnections(0, 4);
        waitForClusterConnections(1, 4);
        waitForClusterConnections(2, 4);
        waitForClusterConnections(3, 4);
        waitForClusterConnections(4, 4);
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(4);
        createHAServerLocator.addClusterTopologyListener(new LatchListener(countDownLatch, synchronizedList, countDownLatch2));
        ClientSessionFactory createSessionFactory = createSessionFactory(createHAServerLocator);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Was not notified that all servers are UP");
        checkContains(new int[]{0, 1, 2, 3, 4}, nodeIDs, synchronizedList);
        ClientSession createSession = createSessionFactory.createSession();
        stopServers(0);
        ClientSession checkSessionOrReconnect = checkSessionOrReconnect(createSession, createHAServerLocator);
        checkContains(new int[]{1, 2, 3, 4}, nodeIDs, synchronizedList);
        stopServers(2);
        ClientSession checkSessionOrReconnect2 = checkSessionOrReconnect(checkSessionOrReconnect, createHAServerLocator);
        checkContains(new int[]{1, 3, 4}, nodeIDs, synchronizedList);
        stopServers(4);
        ClientSession checkSessionOrReconnect3 = checkSessionOrReconnect(checkSessionOrReconnect2, createHAServerLocator);
        checkContains(new int[]{1, 3}, nodeIDs, synchronizedList);
        stopServers(3);
        checkSessionOrReconnect(checkSessionOrReconnect3, createHAServerLocator);
        checkContains(new int[]{1}, nodeIDs, synchronizedList);
        stopServers(1);
        Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "Was not notified that all servers are DOWN");
        checkContains(new int[0], nodeIDs, synchronizedList);
        createSessionFactory.close();
    }

    @Test
    public void testStopNodes() throws Throwable {
        startServers(0, 1, 2, 3, 4);
        String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
        ServerLocator createHAServerLocator = createHAServerLocator();
        waitForClusterConnections(0, 4);
        waitForClusterConnections(1, 4);
        waitForClusterConnections(2, 4);
        waitForClusterConnections(3, 4);
        waitForClusterConnections(4, 4);
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        createHAServerLocator.addClusterTopologyListener(new LatchListener(countDownLatch, synchronizedList, new CountDownLatch(0)));
        ClientSessionFactory createSessionFactory = createSessionFactory(createHAServerLocator);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Was not notified that all servers are UP");
        checkContains(new int[]{0, 1, 2, 3, 4}, nodeIDs, synchronizedList);
        ClientSession createSession = createSessionFactory.createSession();
        stopServers(0);
        Assertions.assertFalse(this.servers[0].isStarted());
        ClientSession checkSessionOrReconnect = checkSessionOrReconnect(createSession, createHAServerLocator);
        checkContains(new int[]{1, 2, 3, 4}, nodeIDs, synchronizedList);
        stopServers(2);
        Assertions.assertFalse(this.servers[2].isStarted());
        ClientSession checkSessionOrReconnect2 = checkSessionOrReconnect(checkSessionOrReconnect, createHAServerLocator);
        checkContains(new int[]{1, 3, 4}, nodeIDs, synchronizedList);
        stopServers(4);
        Assertions.assertFalse(this.servers[4].isStarted());
        ClientSession checkSessionOrReconnect3 = checkSessionOrReconnect(checkSessionOrReconnect2, createHAServerLocator);
        checkContains(new int[]{1, 3}, nodeIDs, synchronizedList);
        stopServers(3);
        Assertions.assertFalse(this.servers[3].isStarted());
        ClientSession checkSessionOrReconnect4 = checkSessionOrReconnect(checkSessionOrReconnect3, createHAServerLocator);
        checkContains(new int[]{1}, nodeIDs, synchronizedList);
        stopServers(1);
        Assertions.assertFalse(this.servers[1].isStarted());
        try {
            checkSessionOrReconnect(checkSessionOrReconnect4, createHAServerLocator);
            Assertions.fail();
        } catch (ActiveMQException e) {
            Assertions.assertEquals(ActiveMQExceptionType.NOT_CONNECTED, e.getType());
        }
    }

    @Test
    public void testWrongPasswordTriggersClusterConnectionStop() throws Exception {
        Configuration configuration = this.servers[4].getConfiguration();
        for (ActiveMQServer activeMQServer : this.servers) {
            if (activeMQServer != null) {
                activeMQServer.getConfiguration().setSecurityEnabled(true);
            }
        }
        Assertions.assertEquals(ActiveMQTestBase.CLUSTER_PASSWORD, configuration.getClusterPassword());
        configuration.setClusterPassword(configuration.getClusterPassword() + "-1-2-3-");
        startServers(0, 4);
        Assertions.assertTrue(Wait.waitFor(() -> {
            return (this.servers[4].getClusterManager().isStarted() && this.servers[0].getClusterManager().isStarted()) ? false : true;
        }, 5000L), "one or the other cluster managers should stop");
        ClientSession createSession = createSessionFactory(createNonHALocator(isNetty())).createSession(configuration.getClusterUser(), ActiveMQTestBase.CLUSTER_PASSWORD, false, true, true, false, 1);
        createSession.createQueue(QueueConfiguration.of("foo1235"));
        sendMessages(createSession, createSession.createProducer("foo1235"), 100);
        ClientConsumer createConsumer = createSession.createConsumer("foo1235");
        createSession.start();
        receiveMessages(createConsumer, 0, 100, true);
    }

    @Test
    public void testMultipleClientSessionFactories() throws Throwable {
        startServers(0, 1, 2, 3, 4);
        String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
        ServerLocator createHAServerLocator = createHAServerLocator();
        waitForClusterConnections(0, 4);
        waitForClusterConnections(1, 4);
        waitForClusterConnections(2, 4);
        waitForClusterConnections(3, 4);
        waitForClusterConnections(4, 4);
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(4);
        createHAServerLocator.addClusterTopologyListener(new LatchListener(countDownLatch, synchronizedList, countDownLatch2));
        ClientSessionFactory[] clientSessionFactoryArr = {createHAServerLocator.createSessionFactory(), createHAServerLocator.createSessionFactory(), createHAServerLocator.createSessionFactory(), createHAServerLocator.createSessionFactory(), createHAServerLocator.createSessionFactory()};
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Was not notified that all servers are UP");
        checkContains(new int[]{0, 1, 2, 3, 4}, nodeIDs, synchronizedList);
        stopServers(4, 2, 3, 1);
        boolean await = countDownLatch2.await(10L, TimeUnit.SECONDS);
        if (!await) {
            logger.warn("TopologyClusterTestBase.testMultipleClientSessionFactories will fail");
        }
        Assertions.assertTrue(await, "Was not notified that all servers are Down");
        checkContains(new int[]{0}, nodeIDs, synchronizedList);
        for (ClientSessionFactory clientSessionFactory : clientSessionFactoryArr) {
            clientSessionFactory.close();
        }
        createHAServerLocator.close();
        stopServers(0);
    }
}
