package org.apache.activemq.artemis.tests.integration.jms.cluster;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest.class */
public class JMSReconnectTest extends ActiveMQTestBase {
    private ActiveMQServer server;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/cluster/JMSReconnectTest$MyExceptionListener.class */
    public static class MyExceptionListener implements ExceptionListener {
        volatile JMSException e;

        private MyExceptionListener() {
        }

        public void onException(JMSException jMSException) {
            this.e = jMSException;
        }
    }

    @Test
    public void testReattachSameNode() throws Exception {
        testReconnectOrReattachSameNode(true);
    }

    @Test
    public void testReconnectSameNode() throws Exception {
        testReconnectOrReattachSameNode(false);
    }

    private void testReconnectOrReattachSameNode(boolean z) throws Exception {
        ActiveMQConnectionFactory createConnectionFactoryWithoutHA = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)});
        createConnectionFactoryWithoutHA.setBlockOnDurableSend(true);
        createConnectionFactoryWithoutHA.setBlockOnNonDurableSend(true);
        createConnectionFactoryWithoutHA.setReconnectAttempts(-1);
        if (z) {
            createConnectionFactoryWithoutHA.setConfirmationWindowSize(1048576);
        }
        createConnectionFactoryWithoutHA.setConsumerWindowSize(1000);
        Connection createConnection = createConnectionFactoryWithoutHA.createConnection();
        MyExceptionListener myExceptionListener = new MyExceptionListener();
        createConnection.setExceptionListener(myExceptionListener);
        ActiveMQSession createSession = createConnection.createSession(false, 1);
        ClientSessionInternal coreSession = createSession.getCoreSession();
        RemotingConnection connection = coreSession.getConnection();
        SimpleString simpleString = new SimpleString("jms.queue.myqueue");
        coreSession.createQueue(simpleString, simpleString, (SimpleString) null, true);
        Queue createQueue = createSession.createQueue("myqueue");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(2);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        byte[] randomBytes = RandomUtil.randomBytes(1000);
        for (int i = 0; i < 10; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(randomBytes);
            createProducer.send(createBytesMessage);
        }
        createConnection.start();
        Thread.sleep(2000L);
        Throwable activeMQNotConnectedException = new ActiveMQNotConnectedException();
        connection.fail(activeMQNotConnectedException);
        for (int i2 = 0; i2 < 10; i2++) {
            BytesMessage receive = createConsumer.receive(1000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(randomBytes.length, receive.getBodyLength());
        }
        Assert.assertNull(createConsumer.receiveNoWait());
        createConnection.close();
        Assert.assertNotNull(myExceptionListener.e);
        Assert.assertTrue(activeMQNotConnectedException == myExceptionListener.e.getCause());
    }

    @Test
    public void testReconnectSameNodeServerRestartedWithNonDurableSub() throws Exception {
        testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(true);
    }

    @Test
    public void testReconnectSameNodeServerRestartedWithTempQueue() throws Exception {
        testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(false);
    }

    private void testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(boolean z) throws Exception {
        Topic createTemporaryQueue;
        ActiveMQConnectionFactory createConnectionFactoryWithoutHA = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)});
        createConnectionFactoryWithoutHA.setReconnectAttempts(-1);
        Connection createConnection = createConnectionFactoryWithoutHA.createConnection();
        MyExceptionListener myExceptionListener = new MyExceptionListener();
        createConnection.setExceptionListener(myExceptionListener);
        ActiveMQSession createSession = createConnection.createSession(false, 1);
        ClientSession coreSession = createSession.getCoreSession();
        if (z) {
            coreSession.createQueue("jms.topic.mytopic", "blahblah", (String) null, false);
            createTemporaryQueue = ActiveMQJMSClient.createTopic("mytopic");
        } else {
            createTemporaryQueue = createSession.createTemporaryQueue();
        }
        MessageProducer createProducer = createSession.createProducer(createTemporaryQueue);
        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
        this.server.stop();
        this.server.start();
        Thread.sleep(3000L);
        byte[] randomBytes = RandomUtil.randomBytes(1000);
        for (int i = 0; i < 100; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(randomBytes);
            createProducer.send(createBytesMessage);
        }
        createConnection.start();
        for (int i2 = 0; i2 < 100; i2++) {
            BytesMessage receive = createConsumer.receive(1000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(randomBytes.length, receive.getBodyLength());
        }
        Assert.assertNull(createConsumer.receiveNoWait());
        createConnection.close();
        Assert.assertNotNull(myExceptionListener.e);
    }

    @Test
    public void testNoReconnectCloseAfterFailToReconnectWithTopicConsumer() throws Exception {
        ActiveMQConnectionFactory createConnectionFactoryWithoutHA = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)});
        createConnectionFactoryWithoutHA.setReconnectAttempts(0);
        Connection createConnection = createConnectionFactoryWithoutHA.createConnection();
        ActiveMQSession createSession = createConnection.createSession(false, 1);
        createSession.getCoreSession().createQueue("jms.topic.mytopic", "blahblah", (String) null, false);
        createSession.createConsumer(ActiveMQJMSClient.createTopic("mytopic"));
        Thread.sleep(2000L);
        this.server.stop();
        this.server.start();
        createSession.close();
        createConnection.close();
    }

    @Test
    public void testNoReconnectCloseAfterFailToReconnectWithTempQueue() throws Exception {
        ActiveMQConnectionFactory createConnectionFactoryWithoutHA = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)});
        createConnectionFactoryWithoutHA.setReconnectAttempts(0);
        Connection createConnection = createConnectionFactoryWithoutHA.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createTemporaryQueue();
        Thread.sleep(2000L);
        this.server.stop();
        this.server.start();
        createSession.close();
        createConnection.close();
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), true));
        this.server.start();
    }
}
