package org.apache.activemq.artemis.tests.integration.cluster.failover;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.jboss.logging.Logger;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/FailBackAutoTest.class */
public class FailBackAutoTest extends FailoverTestBase {
    private static final Logger log = Logger.getLogger(FailBackAutoTest.class);
    private static final int NUM_MESSAGES = 100;
    private ServerLocatorInternal locator;
    private ClientSessionFactoryInternal sf;

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.locator = getServerLocator();
    }

    @Test
    public void testAutoFailback() throws Exception {
        this.backupServer.getServer().getHAPolicy().setRestartBackup(false);
        createSessionFactory();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ClientSession sendAndConsume = sendAndConsume(this.sf, true);
        CountDownSessionFailureListener countDownSessionFailureListener = new CountDownSessionFailureListener(countDownLatch, sendAndConsume);
        sendAndConsume.addFailureListener(countDownSessionFailureListener);
        this.liveServer.crash(new ClientSession[0]);
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        log.debug("backup (nowLive) topology = " + this.backupServer.getServer().getClusterManager().getDefaultConnection((TransportConfiguration) null).getTopology().describe());
        log.debug("Server Crash!!!");
        ClientProducer createProducer = sendAndConsume.createProducer(ADDRESS);
        ClientMessage createMessage = sendAndConsume.createMessage(true);
        setBody(0, createMessage);
        createProducer.send(createMessage);
        verifyMessageOnServer(1, 1);
        sendAndConsume.removeFailureListener(countDownSessionFailureListener);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        sendAndConsume.addFailureListener(new CountDownSessionFailureListener(countDownLatch2, sendAndConsume));
        log.debug("******* starting live server back");
        this.liveServer.start();
        Thread.sleep(1000L);
        log.debug("After failback: " + this.locator.getTopology().describe());
        assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        ClientMessage createMessage2 = sendAndConsume.createMessage(true);
        setBody(1, createMessage2);
        createProducer.send(createMessage2);
        sendAndConsume.close();
        verifyMessageOnServer(0, 1);
        wrapUpSessionFactory();
    }

    private void verifyMessageOnServer(int i, int i2) throws Exception {
        ServerLocator createInVMLocator = createInVMLocator(i);
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(createInVMLocator));
        ClientSession createSession = addSessionFactory.createSession(false, false);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        for (int i3 = 0; i3 < i2; i3++) {
            ClientMessage receive = createConsumer.receive(1000L);
            assertNotNull(receive);
            receive.acknowledge();
            createSession.commit();
        }
        createSession.close();
        addSessionFactory.close();
        createInVMLocator.close();
    }

    @Test
    public void testAutoFailbackThenFailover() throws Exception {
        createSessionFactory();
        ClientSession sendAndConsume = sendAndConsume(this.sf, true);
        CountDownSessionFailureListener countDownSessionFailureListener = new CountDownSessionFailureListener(sendAndConsume);
        sendAndConsume.addFailureListener(countDownSessionFailureListener);
        log.debug("Crashing live server...");
        this.liveServer.crash(sendAndConsume);
        ClientProducer createProducer = sendAndConsume.createProducer(ADDRESS);
        ClientMessage createMessage = sendAndConsume.createMessage(true);
        setBody(0, createMessage);
        createProducer.send(createMessage);
        sendAndConsume.removeFailureListener(countDownSessionFailureListener);
        CountDownSessionFailureListener countDownSessionFailureListener2 = new CountDownSessionFailureListener(sendAndConsume);
        sendAndConsume.addFailureListener(countDownSessionFailureListener2);
        log.debug("restarting live node now");
        this.liveServer.start();
        assertTrue("expected a session failure 1", countDownSessionFailureListener2.getLatch().await(5L, TimeUnit.SECONDS));
        ClientMessage createMessage2 = sendAndConsume.createMessage(true);
        setBody(1, createMessage2);
        createProducer.send(createMessage2);
        sendAndConsume.removeFailureListener(countDownSessionFailureListener2);
        CountDownSessionFailureListener countDownSessionFailureListener3 = new CountDownSessionFailureListener(sendAndConsume);
        sendAndConsume.addFailureListener(countDownSessionFailureListener3);
        waitForBackup(this.sf, 10);
        log.debug("Crashing live server again...");
        this.liveServer.crash(new ClientSession[0]);
        assertTrue("expected a session failure 2", countDownSessionFailureListener3.getLatch().await(5L, TimeUnit.SECONDS));
        sendAndConsume.close();
        wrapUpSessionFactory();
    }

    @Test
    public void testFailBack() throws Exception {
        this.backupServer.getServer().getHAPolicy().setRestartBackup(false);
        createSessionFactory();
        ClientSession sendAndConsume = sendAndConsume(this.sf, true);
        sendMessages(sendAndConsume, sendAndConsume.createProducer(ADDRESS), 100);
        sendAndConsume.commit();
        crash(sendAndConsume);
        sendAndConsume.start();
        ClientConsumer createConsumer = sendAndConsume.createConsumer(ADDRESS);
        receiveMessages(createConsumer, 0, 100, true);
        sendMessages(sendAndConsume, sendAndConsume.createProducer(ADDRESS), 200);
        sendAndConsume.commit();
        assertFalse("must NOT be a backup", this.liveServer.getServer().getHAPolicy().isBackup());
        adaptLiveConfigForReplicatedFailBack(this.liveServer);
        CountDownSessionFailureListener countDownSessionFailureListener = new CountDownSessionFailureListener(sendAndConsume);
        sendAndConsume.addFailureListener(countDownSessionFailureListener);
        this.liveServer.start();
        assertTrue(countDownSessionFailureListener.getLatch().await(5L, TimeUnit.SECONDS));
        assertTrue("live initialized after restart", this.liveServer.getServer().waitForActivation(15L, TimeUnit.SECONDS));
        sendAndConsume.start();
        receiveMessages(createConsumer, 0, 100, true);
    }

    private void createSessionFactory() throws Exception {
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15);
        this.sf = createSessionFactoryAndWaitForTopology(this.locator, 2);
    }

    private void wrapUpSessionFactory() {
        this.sf.close();
        assertEquals(0L, this.sf.numSessions());
        assertEquals(0L, this.sf.numConnections());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    public void createConfigs() throws Exception {
        this.nodeManager = new InVMNodeManager(false);
        TransportConfiguration connectorTransportConfiguration = getConnectorTransportConfiguration(true);
        TransportConfiguration connectorTransportConfiguration2 = getConnectorTransportConfiguration(false);
        this.backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setRestartBackup(true)).addConnectorConfiguration(connectorTransportConfiguration.getName(), connectorTransportConfiguration).addConnectorConfiguration(connectorTransportConfiguration2.getName(), connectorTransportConfiguration2).addClusterConfiguration(basicClusterConnectionConfig(connectorTransportConfiguration2.getName(), new String[]{connectorTransportConfiguration.getName()}));
        this.backupServer = createTestableServer(this.backupConfig);
        this.liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(connectorTransportConfiguration.getName(), new String[]{connectorTransportConfiguration2.getName()})).addConnectorConfiguration(connectorTransportConfiguration.getName(), connectorTransportConfiguration).addConnectorConfiguration(connectorTransportConfiguration2.getName(), connectorTransportConfiguration2);
        this.liveServer = createTestableServer(this.liveConfig);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMAcceptor(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMConnector(z);
    }

    private ClientSession sendAndConsume(ClientSessionFactory clientSessionFactory, boolean z) throws Exception {
        ClientSession createSession = clientSessionFactory.createSession(false, true, true);
        if (z) {
            createSession.createQueue(new QueueConfiguration(ADDRESS));
        }
        ClientProducer createProducer = createSession.createProducer(ADDRESS);
        for (int i = 0; i < 1000; i++) {
            ClientMessage createMessage = createSession.createMessage((byte) 3, false, 0L, System.currentTimeMillis(), (byte) 1);
            createMessage.putIntProperty(new SimpleString("count"), i);
            createMessage.getBodyBuffer().writeString("aardvarks");
            createProducer.send(createMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        createSession.start();
        for (int i2 = 0; i2 < 1000; i2++) {
            ClientMessage receive = createConsumer.receive();
            assertEquals("aardvarks", receive.getBodyBuffer().readString());
            assertEquals(Integer.valueOf(i2), receive.getObjectProperty(new SimpleString("count")));
            receive.acknowledge();
        }
        ClientMessage receiveImmediate = createConsumer.receiveImmediate();
        createConsumer.close();
        assertNull(receiveImmediate);
        return createSession;
    }
}
