package org.apache.activemq.artemis.tests.integration.cluster.failover;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverListenerTest.class */
public class FailoverListenerTest extends FailoverTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ServerLocatorInternal locator;
    private ClientSessionFactoryInternal sf;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverListenerTest$SessionFactoryFailoverListener.class */
    public class SessionFactoryFailoverListener implements FailoverEventListener {
        private final ArrayList<FailoverEventType> failoverTypeEvent = new ArrayList<>();
        private final CountDownLatch failureLatch;
        private final CountDownLatch failureDoneLatch;

        public SessionFactoryFailoverListener(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.failureLatch = countDownLatch;
            this.failureDoneLatch = countDownLatch2;
        }

        public ArrayList<FailoverEventType> getFailoverEventType() {
            return this.failoverTypeEvent;
        }

        public void failoverEvent(FailoverEventType failoverEventType) {
            this.failoverTypeEvent.add(failoverEventType);
            FailoverListenerTest.logger.debug("Failover event just happen : {}", failoverEventType.toString());
            if (failoverEventType == FailoverEventType.FAILURE_DETECTED) {
                this.failureLatch.countDown();
            } else if (failoverEventType == FailoverEventType.FAILOVER_COMPLETED || failoverEventType == FailoverEventType.FAILOVER_FAILED) {
                this.failureDoneLatch.countDown();
            }
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.locator = getServerLocator();
    }

    @Test
    public void testFailoverListenerCall() throws Exception {
        createSessionFactory(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        SessionFactoryFailoverListener sessionFactoryFailoverListener = new SessionFactoryFailoverListener(countDownLatch, countDownLatch2);
        this.sf.addFailoverListener(sessionFactoryFailoverListener);
        ClientSession sendAndConsume = sendAndConsume(this.sf, true);
        this.liveServer.crash(new ClientSession[0]);
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        assertEquals(FailoverEventType.FAILURE_DETECTED, sessionFactoryFailoverListener.getFailoverEventType().get(0));
        logger.debug("backup (nowLive) topology = {}", this.backupServer.getServer().getClusterManager().getDefaultConnection((TransportConfiguration) null).getTopology().describe());
        logger.debug("Server Crash!!!");
        assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        assertEquals(FailoverEventType.FAILOVER_COMPLETED, sessionFactoryFailoverListener.getFailoverEventType().get(1));
        ClientProducer createProducer = sendAndConsume.createProducer(ADDRESS);
        ClientMessage createMessage = sendAndConsume.createMessage(true);
        setBody(0, createMessage);
        createProducer.send(createMessage);
        verifyMessageOnServer(1, 1);
        logger.debug("******* starting live server back");
        this.liveServer.start();
        Thread.sleep(1000L);
        assertEquals(FailoverEventType.FAILURE_DETECTED, sessionFactoryFailoverListener.getFailoverEventType().get(2));
        assertEquals(FailoverEventType.FAILOVER_COMPLETED, sessionFactoryFailoverListener.getFailoverEventType().get(3));
        logger.debug("After failback: {}", this.locator.getTopology().describe());
        ClientMessage createMessage2 = sendAndConsume.createMessage(true);
        setBody(1, createMessage2);
        createProducer.send(createMessage2);
        sendAndConsume.close();
        verifyMessageOnServer(0, 1);
        wrapUpSessionFactory();
        assertEquals("Expected 4 FailoverEvents to be triggered", 4L, sessionFactoryFailoverListener.getFailoverEventType().size());
    }

    private void verifyMessageOnServer(int i, int i2) throws Exception {
        ServerLocator createInVMLocator = createInVMLocator(i);
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(createInVMLocator));
        ClientSession createSession = addSessionFactory.createSession(false, false);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        for (int i3 = 0; i3 < i2; i3++) {
            ClientMessage receive = createConsumer.receive(1000L);
            assertNotNull(receive);
            receive.acknowledge();
            createSession.commit();
        }
        createSession.close();
        addSessionFactory.close();
        createInVMLocator.close();
    }

    @Test
    public void testFailoverFailed() throws Exception {
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(1);
        this.sf = createSessionFactoryAndWaitForTopology(this.locator, 2);
        this.backupServer.stop();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        SessionFactoryFailoverListener sessionFactoryFailoverListener = new SessionFactoryFailoverListener(countDownLatch, countDownLatch2);
        this.sf.addFailoverListener(sessionFactoryFailoverListener);
        ClientSession sendAndConsume = sendAndConsume(this.sf, true);
        this.liveServer.crash(sendAndConsume);
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        assertEquals(FailoverEventType.FAILURE_DETECTED, sessionFactoryFailoverListener.getFailoverEventType().get(0));
        assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        assertEquals(FailoverEventType.FAILOVER_FAILED, sessionFactoryFailoverListener.getFailoverEventType().get(1));
        assertEquals("Expected 2 FailoverEvents to be triggered", 2L, sessionFactoryFailoverListener.getFailoverEventType().size());
        sendAndConsume.close();
        wrapUpSessionFactory();
    }

    private void createSessionFactory(int i) throws Exception {
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15);
        this.sf = createSessionFactoryAndWaitForTopology(this.locator, i);
    }

    private void wrapUpSessionFactory() {
        this.sf.close();
        assertEquals("Expecting 0 sessions", 0L, this.sf.numSessions());
        assertEquals("Expecting 0 connections", 0L, this.sf.numConnections());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    public void createConfigs() throws Exception {
        this.nodeManager = new InVMNodeManager(false);
        TransportConfiguration connectorTransportConfiguration = getConnectorTransportConfiguration(true);
        TransportConfiguration connectorTransportConfiguration2 = getConnectorTransportConfiguration(false);
        this.backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(connectorTransportConfiguration.getName(), connectorTransportConfiguration).addConnectorConfiguration(connectorTransportConfiguration2.getName(), connectorTransportConfiguration2).addClusterConfiguration(basicClusterConnectionConfig(connectorTransportConfiguration2.getName(), new String[]{connectorTransportConfiguration.getName()}));
        this.backupServer = createTestableServer(this.backupConfig);
        this.liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(connectorTransportConfiguration.getName(), new String[]{connectorTransportConfiguration2.getName()})).addConnectorConfiguration(connectorTransportConfiguration.getName(), connectorTransportConfiguration).addConnectorConfiguration(connectorTransportConfiguration2.getName(), connectorTransportConfiguration2);
        this.liveServer = createTestableServer(this.liveConfig);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMAcceptor(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMConnector(z);
    }

    private ClientSession sendAndConsume(ClientSessionFactory clientSessionFactory, boolean z) throws Exception {
        ClientSession createSession = clientSessionFactory.createSession(false, true, true);
        if (z) {
            createSession.createQueue(new QueueConfiguration(ADDRESS));
        }
        ClientProducer createProducer = createSession.createProducer(ADDRESS);
        for (int i = 0; i < 1000; i++) {
            ClientMessage createMessage = createSession.createMessage((byte) 3, false, 0L, System.currentTimeMillis(), (byte) 1);
            createMessage.putIntProperty(new SimpleString("count"), i);
            createMessage.getBodyBuffer().writeString("aardvarks");
            createProducer.send(createMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        createSession.start();
        for (int i2 = 0; i2 < 1000; i2++) {
            ClientMessage receive = createConsumer.receive();
            assertEquals("aardvarks", receive.getBodyBuffer().readString());
            assertEquals(Integer.valueOf(i2), receive.getObjectProperty(new SimpleString("count")));
            receive.acknowledge();
        }
        ClientMessage receiveImmediate = createConsumer.receiveImmediate();
        createConsumer.close();
        assertNull(receiveImmediate);
        return createSession;
    }
}
