package org.apache.activemq.artemis.tests.integration.jms.cluster;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.class */
public class JMSFailoverListenerTest extends ActiveMQTestBase {
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    protected Configuration backupConf;
    protected Configuration liveConf;
    protected JMSServerManager liveJMSServer;
    protected ActiveMQServer liveServer;
    protected JMSServerManager backupJMSServer;
    protected ActiveMQServer backupServer;
    private TransportConfiguration backuptc;
    private TransportConfiguration livetc;
    private TransportConfiguration liveAcceptortc;
    private TransportConfiguration backupAcceptortc;
    protected InVMNamingContext ctx1 = new InVMNamingContext();
    protected InVMNamingContext ctx2 = new InVMNamingContext();
    protected Map<String, Object> backupParams = new HashMap();

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest$MyFailoverListener.class */
    private static class MyFailoverListener implements FailoverEventListener {
        private List<FailoverEventType> eventTypeList;

        private MyFailoverListener() {
            this.eventTypeList = new ArrayList();
        }

        public FailoverEventType get(int i) {
            waitForElements(i + 1);
            return this.eventTypeList.get(i);
        }

        public int size() {
            return this.eventTypeList.size();
        }

        private void waitForElements(int i) {
            long currentTimeMillis = System.currentTimeMillis() + 5000;
            while (currentTimeMillis > System.currentTimeMillis() && this.eventTypeList.size() < i) {
                try {
                    Thread.sleep(1L);
                } catch (Throwable th) {
                    Assert.fail(th.getMessage());
                }
            }
            Assert.assertTrue(this.eventTypeList.size() >= i);
        }

        public void failoverEvent(FailoverEventType failoverEventType) {
            this.eventTypeList.add(failoverEventType);
        }
    }

    @Test
    public void testAutomaticFailover() throws Exception {
        ActiveMQConnectionFactory createConnectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration[]{this.livetc});
        createConnectionFactoryWithHA.setReconnectAttempts(-1);
        createConnectionFactoryWithHA.setBlockOnDurableSend(true);
        createConnectionFactoryWithHA.setBlockOnNonDurableSend(true);
        createConnectionFactoryWithHA.setConsumerWindowSize(1000);
        ActiveMQConnection createConnectionAndWaitForTopology = JMSUtil.createConnectionAndWaitForTopology(createConnectionFactoryWithHA, 2, 5);
        MyFailoverListener myFailoverListener = new MyFailoverListener();
        createConnectionAndWaitForTopology.setFailoverListener(myFailoverListener);
        ActiveMQSession createSession = createConnectionAndWaitForTopology.createSession(false, 1);
        ClientSession coreSession = createSession.getCoreSession();
        SimpleString simpleString = new SimpleString("myqueue");
        coreSession.createQueue(simpleString, RoutingType.ANYCAST, simpleString, (SimpleString) null, true);
        Queue createQueue = createSession.createQueue("myqueue");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(2);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        byte[] randomBytes = RandomUtil.randomBytes(1000);
        for (int i = 0; i < 10; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(randomBytes);
            createProducer.send(createBytesMessage);
        }
        createConnectionAndWaitForTopology.start();
        log.info("sent messages and started connection");
        Thread.sleep(2000L);
        JMSUtil.crash(this.liveServer, createSession.getCoreSession());
        Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, myFailoverListener.get(0));
        for (int i2 = 0; i2 < 10; i2++) {
            log.info("got message " + i2);
            BytesMessage receive = createConsumer.receive(1000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(randomBytes.length, receive.getBodyLength());
        }
        Assert.assertNull(createConsumer.receiveNoWait());
        Assert.assertEquals(FailoverEventType.FAILOVER_COMPLETED, myFailoverListener.get(1));
        createConnectionAndWaitForTopology.close();
        Assert.assertEquals("Expected 2 FailoverEvents to be triggered", 2L, myFailoverListener.size());
    }

    @Test
    public void testManualFailover() throws Exception {
        ActiveMQConnectionFactory createConnectionFactoryWithoutHA = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)});
        createConnectionFactoryWithoutHA.setBlockOnNonDurableSend(true);
        createConnectionFactoryWithoutHA.setBlockOnDurableSend(true);
        ActiveMQConnectionFactory createConnectionFactoryWithoutHA2 = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY, this.backupParams)});
        createConnectionFactoryWithoutHA2.setBlockOnNonDurableSend(true);
        createConnectionFactoryWithoutHA2.setBlockOnDurableSend(true);
        createConnectionFactoryWithoutHA2.setInitialConnectAttempts(-1);
        createConnectionFactoryWithoutHA2.setReconnectAttempts(-1);
        ActiveMQConnection createConnection = createConnectionFactoryWithoutHA.createConnection();
        MyFailoverListener myFailoverListener = new MyFailoverListener();
        createConnection.setFailoverListener(myFailoverListener);
        ActiveMQSession createSession = createConnection.createSession(false, 1);
        ClientSession coreSession = createSession.getCoreSession();
        SimpleString simpleString = new SimpleString("myqueue");
        coreSession.createQueue(simpleString, RoutingType.ANYCAST, simpleString, (SimpleString) null, true);
        Queue createQueue = createSession.createQueue("myqueue");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 1000; i++) {
            createProducer.send(createSession.createTextMessage("message" + i));
        }
        JMSUtil.crash(this.liveServer, coreSession);
        Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, myFailoverListener.get(0));
        createConnection.close();
        Connection createConnection2 = createConnectionFactoryWithoutHA2.createConnection();
        MessageConsumer createConsumer = createConnection2.createSession(false, 1).createConsumer(createQueue);
        createConnection2.start();
        for (int i2 = 0; i2 < 1000; i2++) {
            TextMessage receive = createConsumer.receive(1000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals("message" + i2, receive.getText());
        }
        TextMessage receiveNoWait = createConsumer.receiveNoWait();
        Assert.assertEquals(FailoverEventType.FAILOVER_FAILED, myFailoverListener.get(1));
        Assert.assertEquals("Expected 2 FailoverEvents to be triggered", 2L, myFailoverListener.size());
        Assert.assertNull(receiveNoWait);
        createConnection2.close();
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        startServers();
    }

    protected void startServers() throws Exception {
        InVMNodeManager inVMNodeManager = new InVMNodeManager(false);
        this.backuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, this.backupParams);
        this.livetc = new TransportConfiguration(INVM_CONNECTOR_FACTORY);
        this.liveAcceptortc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
        this.backupAcceptortc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, this.backupParams);
        this.backupParams.put("serverId", 1);
        this.backupConf = createBasicConfig().addAcceptorConfiguration(this.backupAcceptortc).addConnectorConfiguration(this.livetc.getName(), this.livetc).addConnectorConfiguration(this.backuptc.getName(), this.backuptc).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, this.backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(this.backuptc.getName(), new String[]{this.livetc.getName()}));
        this.backupServer = addServer(new InVMNodeManagerServer(this.backupConf, inVMNodeManager));
        this.backupJMSServer = new JMSServerManagerImpl(this.backupServer);
        this.backupJMSServer.setRegistry(new JndiBindingRegistry(this.ctx2));
        this.backupJMSServer.getActiveMQServer().setIdentity("JMSBackup");
        log.info("Starting backup");
        this.backupJMSServer.start();
        this.liveConf = createBasicConfig().setJournalDirectory(getJournalDir()).setBindingsDirectory(getBindingsDir()).addAcceptorConfiguration(this.liveAcceptortc).setJournalType(getDefaultJournalType()).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).addConnectorConfiguration(this.livetc.getName(), this.livetc).setPersistenceEnabled(true).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(this.livetc.getName(), new String[0]));
        this.liveServer = addServer(new InVMNodeManagerServer(this.liveConf, inVMNodeManager));
        this.liveJMSServer = new JMSServerManagerImpl(this.liveServer);
        this.liveJMSServer.setRegistry(new JndiBindingRegistry(this.ctx1));
        this.liveJMSServer.getActiveMQServer().setIdentity("JMSLive");
        log.info("Starting life");
        this.liveJMSServer.start();
        JMSUtil.waitForServer(this.backupServer);
    }
}
