package org.apache.activemq.artemis.tests.integration.jms.cluster;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.class */
public class SelectorRedistributionClusterTest extends JMSClusteredTestBase {
    private final String myQueue = "myQueue";

    @Override // org.apache.activemq.artemis.tests.util.JMSClusteredTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.jmsServer1.getActiveMQServer().setIdentity("Server 1");
        this.jmsServer2.getActiveMQServer().setIdentity("Server 2");
    }

    @Override // org.apache.activemq.artemis.tests.util.JMSClusteredTestBase
    protected boolean enablePersistence() {
        return true;
    }

    @Test
    public void testSelectorRoutingReDistributionOnNoConsumer() throws Exception {
        ((AddressSettings) this.server1.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        ((AddressSettings) this.server2.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        Connection createConnection = this.cf1.createConnection();
        Connection createConnection2 = this.cf2.createConnection();
        createConnection.start();
        createConnection2.start();
        try {
            Session createSession = createConnection.createSession(false, 2);
            Session createSession2 = createConnection2.createSession(false, 2);
            Queue createQueue = createSession.createQueue("myQueue");
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage("m1");
            createTextMessage.setIntProperty("N", 10);
            MessageConsumer createConsumer = createSession2.createConsumer(createQueue, "N = 10");
            createSession.createConsumer(createQueue, "N = 0");
            waitForBindings(this.server1, "myQueue", false, 1, 1, 4000L);
            createProducer.send(createTextMessage);
            TextMessage receive = createConsumer.receive(4000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("m1", receive.getText());
            createSession2.close();
            Session createSession3 = createConnection2.createSession(false, 2);
            Assertions.assertNotNull(createSession3.createConsumer(createQueue, "N = 10").receive(4000L));
            MessageConsumer createConsumer2 = createSession.createConsumer(createQueue, "N = 10");
            createSession3.close();
            TextMessage receive2 = createConsumer2.receive(4000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertEquals("m1", receive2.getText());
            receive2.acknowledge();
            createConnection.close();
            createConnection2.close();
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testSelectorRoutingNoReDistributionNewMessageSkipsTillLocalClose() throws Exception {
        ((AddressSettings) this.server1.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        ((AddressSettings) this.server2.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        Connection createConnection = this.cf1.createConnection();
        Connection createConnection2 = this.cf2.createConnection();
        createConnection.start();
        createConnection2.start();
        try {
            Session createSession = createConnection.createSession(false, 2);
            Session createSession2 = createConnection.createSession(false, 2);
            Session createSession3 = createConnection2.createSession(false, 2);
            Queue createQueue = createSession.createQueue("myQueue");
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage("m1");
            createTextMessage.setIntProperty("N", 10);
            MessageConsumer createConsumer = createSession3.createConsumer(createQueue, "N = 10");
            createSession.createConsumer(createQueue, "N = 0");
            MessageConsumer createConsumer2 = createSession2.createConsumer(createQueue, "N = 10");
            waitForBindings(this.server1, "myQueue", false, 1, 1, 4000L);
            createProducer.send(createTextMessage);
            TextMessage receive = createConsumer2.receive(4000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("m1", receive.getText());
            createSession2.close();
            Assertions.assertNull(createConsumer.receiveNoWait());
            TextMessage createTextMessage2 = createSession.createTextMessage("m2");
            createTextMessage2.setIntProperty("N", 10);
            createProducer.send(createTextMessage2);
            TextMessage receive2 = createConsumer.receive(4000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertEquals("m2", receive2.getText());
            receive2.acknowledge();
            createSession.close();
            TextMessage receive3 = createConsumer.receive(4000L);
            Assertions.assertNotNull(receive3);
            Assertions.assertEquals("m1", receive3.getText());
            receive3.acknowledge();
            createConnection.close();
            createConnection2.close();
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testSelectorRoutingReDistributionDoesNotBlockLocalConsumer() throws Exception {
        ((AddressSettings) this.server1.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        ((AddressSettings) this.server2.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        Connection createConnection = this.cf1.createConnection();
        Connection createConnection2 = this.cf2.createConnection();
        createConnection.start();
        createConnection2.start();
        try {
            Session createSession = createConnection.createSession(false, 2);
            Session createSession2 = createConnection.createSession(false, 2);
            Session createSession3 = createConnection2.createSession(false, 2);
            Queue createQueue = createSession.createQueue("myQueue");
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage("m1");
            createTextMessage.setIntProperty("N", 10);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "N = 0");
            MessageConsumer createConsumer2 = createSession.createConsumer(createQueue, "N = 1");
            MessageConsumer createConsumer3 = createSession2.createConsumer(createQueue, "N = 10");
            createProducer.send(createTextMessage);
            TextMessage receive = createConsumer3.receive(4000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("m1", receive.getText());
            createSession2.close();
            MessageConsumer createConsumer4 = createSession3.createConsumer(createQueue, "N = 10");
            TextMessage receive2 = createConsumer4.receive(4000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertEquals("m1", receive2.getText());
            receive2.acknowledge();
            TextMessage createTextMessage2 = createSession.createTextMessage("m2");
            createTextMessage2.setIntProperty("N", 0);
            createProducer.send(createTextMessage2);
            TextMessage createTextMessage3 = createSession.createTextMessage("m3");
            createTextMessage3.setIntProperty("N", 1);
            createProducer.send(createTextMessage3);
            TextMessage receive3 = createConsumer.receive(4000L);
            Assertions.assertNotNull(receive3);
            Assertions.assertEquals("m2", receive3.getText());
            receive3.acknowledge();
            TextMessage receive4 = createConsumer2.receive(4000L);
            Assertions.assertNotNull(receive4);
            Assertions.assertEquals("m3", receive4.getText());
            receive4.acknowledge();
            TextMessage createTextMessage4 = createSession.createTextMessage("m4");
            createTextMessage4.setIntProperty("N", 10);
            createProducer.send(createTextMessage4);
            TextMessage receive5 = createConsumer4.receive(4000L);
            Assertions.assertNotNull(receive5);
            Assertions.assertEquals("m4", receive5.getText());
            receive5.acknowledge();
            createConnection.close();
            createConnection2.close();
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testSelectorRoutingReDistributionOnConsumerMove() throws Exception {
        ((AddressSettings) this.server1.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        ((AddressSettings) this.server2.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        Connection createConnection = this.cf1.createConnection();
        Connection createConnection2 = this.cf2.createConnection();
        createConnection.start();
        createConnection2.start();
        try {
            Session createSession = createConnection.createSession(false, 2);
            Session createSession2 = createConnection.createSession(false, 2);
            Session createSession3 = createConnection2.createSession(false, 2);
            Queue createQueue = createSession.createQueue("myQueue");
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage("m1");
            createTextMessage.setIntProperty("N", 10);
            createSession.createConsumer(createQueue, "N = 0");
            createSession.createConsumer(createQueue, "N = 1");
            MessageConsumer createConsumer = createSession2.createConsumer(createQueue, "N = 10");
            createProducer.send(createTextMessage);
            TextMessage receive = createConsumer.receive(4000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("m1", receive.getText());
            createSession2.close();
            TextMessage receive2 = createSession3.createConsumer(createQueue, "N = 10").receive(4000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertEquals("m1", receive2.getText());
            receive2.acknowledge();
            createConnection.close();
            createConnection2.close();
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }

    @Test
    public void testSelectorRoutingReDistributionOnLocalNoMatchConsumerCloseNeedsNewRemoteDemand() throws Exception {
        ((AddressSettings) this.server1.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        ((AddressSettings) this.server2.getAddressSettingsRepository().getMatch("#")).setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0L);
        Connection createConnection = this.cf1.createConnection();
        Connection createConnection2 = this.cf2.createConnection();
        createConnection.start();
        createConnection2.start();
        try {
            Session createSession = createConnection.createSession(false, 2);
            Session createSession2 = createConnection.createSession(false, 2);
            Session createSession3 = createConnection.createSession(false, 2);
            Session createSession4 = createConnection2.createSession(false, 2);
            Queue createQueue = createSession.createQueue("myQueue");
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage("m1");
            createTextMessage.setIntProperty("N", 10);
            MessageConsumer createConsumer = createSession4.createConsumer(createQueue, "N = 10");
            createSession.createConsumer(createQueue, "N = 0");
            createSession3.createConsumer(createQueue, "N = 1");
            MessageConsumer createConsumer2 = createSession2.createConsumer(createQueue, "N = 10");
            waitForBindings(this.server1, "myQueue", false, 1, 1, 4000L);
            waitForBindings(this.server1, "myQueue", true, 1, 3, 4000L);
            createProducer.send(createTextMessage);
            TextMessage receive = createConsumer2.receive(4000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("m1", receive.getText());
            createSession2.close();
            Assertions.assertNull(createConsumer.receiveNoWait());
            createSession3.close();
            Assertions.assertNull(createConsumer.receiveNoWait());
            createSession4.createConsumer(createQueue, "N = 0");
            TextMessage receive2 = createConsumer.receive(4000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertEquals("m1", receive2.getText());
            receive2.acknowledge();
            createConnection.close();
            createConnection2.close();
        } catch (Throwable th) {
            createConnection.close();
            createConnection2.close();
            throw th;
        }
    }
}
