package org.apache.activemq.artemis.tests.integration.jms.cluster;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.class */
public class BindingsClusterTest extends JMSClusteredTestBase {
    public static final String TOPIC = "jms.t1";
    private final boolean crash;

    public BindingsClusterTest(boolean z) {
        this.crash = z;
    }

    @Parameterized.Parameters(name = "crash={0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    @Override // org.apache.activemq.artemis.tests.util.JMSClusteredTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.jmsServer1.getActiveMQServer().setIdentity("Server 1");
        this.jmsServer2.getActiveMQServer().setIdentity("Server 2");
    }

    @Override // org.apache.activemq.artemis.tests.util.JMSClusteredTestBase
    protected boolean enablePersistence() {
        return true;
    }

    @Test
    public void testSendToSingleDisconnectedBinding() throws Exception {
        Connection createConnection = this.cf1.createConnection();
        createConnection.setClientID("someClient1");
        Connection createConnection2 = this.cf2.createConnection();
        createConnection2.setClientID("someClient2");
        createConnection.start();
        createConnection2.start();
        try {
            Topic createTopic = createTopic("jms.t1", true);
            Topic topic = (Topic) this.context1.lookup("topic/jms.t1");
            Session createSession = createConnection.createSession(false, 1);
            Session createSession2 = createConnection2.createSession(false, 1);
            createSession2.createDurableSubscriber(topic, "sub2").close();
            createSession2.close();
            createConnection2.close();
            Thread.sleep(500L);
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(2);
            createProducer.send(createSession.createTextMessage("m1"));
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            printBindings(this.jmsServer2.getActiveMQServer(), "jms.t1");
            crash();
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            createProducer.send(createSession.createTextMessage("m2"));
            restart();
            Thread.sleep(2000L);
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            printBindings(this.jmsServer2.getActiveMQServer(), "jms.t1");
            createProducer.send(createSession.createTextMessage("m3"));
            this.cf2 = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(2))});
            createConnection2 = this.cf2.createConnection();
            createConnection2.setClientID("someClient2");
            TopicSubscriber createDurableSubscriber = createConnection2.createSession(false, 1).createDurableSubscriber(topic, "sub2");
            createConnection2.start();
            TextMessage receive = createDurableSubscriber.receive(5000L);
            assertNotNull(receive);
            assertEquals("m1", receive.getText());
            TextMessage receive2 = createDurableSubscriber.receive(5000L);
            assertNotNull(receive2);
            assertEquals("m2", receive2.getText());
            TextMessage receive3 = createDurableSubscriber.receive(5000L);
            assertNotNull(receive3);
            assertEquals("m3", receive3.getText());
            createDurableSubscriber.close();
            createConnection.close();
            createConnection2.close();
            this.jmsServer1.destroyTopic("jms.t1");
            this.jmsServer2.destroyTopic("jms.t1");
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testSendToSingleDisconnectedBindingWhenLocalAvailable() throws Exception {
        Connection createConnection = this.cf1.createConnection();
        createConnection.setClientID("someClient2");
        Connection createConnection2 = this.cf2.createConnection();
        createConnection2.setClientID("someClient2");
        createConnection.start();
        createConnection2.start();
        try {
            Topic createTopic = createTopic("jms.t1", true);
            Topic topic = (Topic) this.context1.lookup("topic/jms.t1");
            Session createSession = createConnection.createSession(false, 1);
            Session createSession2 = createConnection2.createSession(false, 1);
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "sub2");
            createSession2.createDurableSubscriber(topic, "sub2").close();
            createSession2.close();
            createConnection2.close();
            Thread.sleep(500L);
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(2);
            createProducer.send(createSession.createTextMessage("m1"));
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            printBindings(this.jmsServer2.getActiveMQServer(), "jms.t1");
            crash();
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            createProducer.send(createSession.createTextMessage("m2"));
            createProducer.send(createSession.createTextMessage("m3"));
            createProducer.send(createSession.createTextMessage("m4"));
            restart();
            Thread.sleep(2000L);
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            printBindings(this.jmsServer2.getActiveMQServer(), "jms.t1");
            createProducer.send(createSession.createTextMessage("m5"));
            createProducer.send(createSession.createTextMessage("m6"));
            this.cf2 = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration[]{new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(2))});
            createConnection2 = this.cf2.createConnection();
            createConnection2.setClientID("someClient2");
            TopicSubscriber createDurableSubscriber2 = createConnection2.createSession(false, 1).createDurableSubscriber(topic, "sub2");
            createConnection2.start();
            assertNull(createDurableSubscriber2.receiveNoWait());
            TextMessage receive = createDurableSubscriber.receive(5000L);
            assertNotNull(receive);
            assertEquals("m1", receive.getText());
            TextMessage receive2 = createDurableSubscriber.receive(5000L);
            assertNotNull(receive2);
            assertEquals("m2", receive2.getText());
            TextMessage receive3 = createDurableSubscriber.receive(5000L);
            assertNotNull(receive3);
            assertEquals("m3", receive3.getText());
            TextMessage receive4 = createDurableSubscriber.receive(5000L);
            assertNotNull(receive4);
            assertEquals("m4", receive4.getText());
            TextMessage receive5 = createDurableSubscriber.receive(5000L);
            assertNotNull(receive5);
            assertEquals("m5", receive5.getText());
            createDurableSubscriber2.close();
            createConnection.close();
            createConnection2.close();
            this.jmsServer1.destroyTopic("jms.t1");
            this.jmsServer2.destroyTopic("jms.t1");
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testRemoteBindingRemovedOnReconnectLocalAvailable() throws Exception {
        Connection createConnection = this.cf1.createConnection();
        createConnection.setClientID("someClient2");
        Connection createConnection2 = this.cf2.createConnection();
        createConnection2.setClientID("someClient2");
        createConnection.start();
        createConnection2.start();
        try {
            Topic createTopic = createTopic("jms.t1", true);
            Topic topic = (Topic) this.context1.lookup("topic/jms.t1");
            Session createSession = createConnection.createSession(false, 1);
            Session createSession2 = createConnection2.createSession(false, 1);
            MessageConsumer createSharedConsumer = createSession.createSharedConsumer(createTopic, "sub2");
            MessageConsumer createSharedConsumer2 = createSession2.createSharedConsumer(topic, "sub2");
            Thread.sleep(500L);
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(2);
            createProducer.send(createSession.createTextMessage("m1"));
            createProducer.send(createSession.createTextMessage("m2"));
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            printBindings(this.jmsServer2.getActiveMQServer(), "jms.t1");
            TextMessage receive = createSharedConsumer2.receive(5000L);
            assertNotNull(receive);
            assertEquals("m2", receive.getText());
            crash();
            createSharedConsumer2.close();
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            createProducer.send(createSession.createTextMessage("m3"));
            createProducer.send(createSession.createTextMessage("m4"));
            createProducer.send(createSession.createTextMessage("m5"));
            restart();
            Thread.sleep(2000L);
            printBindings(this.jmsServer1.getActiveMQServer(), "jms.t1");
            printBindings(this.jmsServer2.getActiveMQServer(), "jms.t1");
            createProducer.send(createSession.createTextMessage("m6"));
            createProducer.send(createSession.createTextMessage("m7"));
            TextMessage receive2 = createSharedConsumer.receive(5000L);
            assertNotNull(receive2);
            assertEquals("m1", receive2.getText());
            TextMessage receive3 = createSharedConsumer.receive(5000L);
            assertNotNull(receive3);
            assertEquals("m3", receive3.getText());
            TextMessage receive4 = createSharedConsumer.receive(5000L);
            assertNotNull(receive4);
            assertEquals("m4", receive4.getText());
            TextMessage receive5 = createSharedConsumer.receive(5000L);
            assertNotNull(receive5);
            assertEquals("m5", receive5.getText());
            TextMessage receive6 = createSharedConsumer.receive(5000L);
            assertNotNull(receive6);
            assertEquals("m6", receive6.getText());
            TextMessage receive7 = createSharedConsumer.receive(5000L);
            assertNotNull(receive7);
            assertEquals("m7", receive7.getText());
            createSharedConsumer2.close();
            createConnection.close();
            createConnection2.close();
            this.jmsServer1.destroyTopic("jms.t1");
            this.jmsServer2.destroyTopic("jms.t1");
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    private void crash() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        RemotingConnection forwardingConnection = getForwardingConnection(((MessageFlowRecord) ((ClusterConnectionImpl) this.server1.getClusterManager().getClusterConnections().iterator().next()).getRecords().values().iterator().next()).getBridge());
        forwardingConnection.addFailureListener(new FailureListener() { // from class: org.apache.activemq.artemis.tests.integration.jms.cluster.BindingsClusterTest.1
            public void connectionFailed(ActiveMQException activeMQException, boolean z) {
                countDownLatch.countDown();
            }

            public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
                connectionFailed(activeMQException, z);
            }
        });
        forwardingConnection.fail(new ActiveMQNotConnectedException());
        assertTrue(countDownLatch.await(5000L, TimeUnit.MILLISECONDS));
        if (this.crash) {
            this.jmsServer2.stop();
        }
    }

    private void restart() throws Exception {
        if (this.crash) {
            this.jmsServer2.start();
        }
    }

    private RemotingConnection getForwardingConnection(Bridge bridge) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            RemotingConnection forwardingConnection = ((BridgeImpl) bridge).getForwardingConnection();
            if (forwardingConnection != null) {
                return forwardingConnection;
            }
            Thread.sleep(10L);
        } while (System.currentTimeMillis() - currentTimeMillis < 50000);
        throw new IllegalStateException("Failed to get forwarding connection");
    }
}
