package org.apache.activemq.artemis.tests.integration.federation;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.Collections;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/federation/FederatedQueuePullConsumerTest.class */
public class FederatedQueuePullConsumerTest extends FederatedTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        getServer(0).getConfiguration().addConnectorConfiguration("server-pull-1", "tcp://localhost:61617?consumerWindowSize=0;ackBatchSize=10");
    }

    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase
    protected boolean isNetty() {
        return true;
    }

    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase
    protected void configureQueues(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(6000));
        createSimpleQueue(activeMQServer, getName());
    }

    protected ConnectionFactory getCF(int i) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://" + i);
        activeMQConnectionFactory.setConsumerWindowSize(0);
        return activeMQConnectionFactory;
    }

    @Test
    public void testFederatedQueuePullFromUpstream() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server-pull-1", name));
        getServer(0).getFederationManager().deploy();
        testFederatedQueuePullFromUpstream(name);
    }

    @Test
    public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
        ((AddressSettings) getServer(0).getAddressSettingsRepository().getMatch("#")).setAutoCreateAddresses(true).setAutoCreateQueues(true);
        ((AddressSettings) getServer(1).getAddressSettingsRepository().getMatch("#")).setAutoCreateAddresses(true).setAutoCreateQueues(true);
        getServer(1).createQueue(new QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
        getServer(1).createQueue(new QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
        getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration().setName("default").addFederationPolicy(new FederationQueuePolicyConfiguration().setName("myQueuePolicy").addInclude(new FederationQueuePolicyConfiguration.Matcher().setQueueMatch("#").setAddressMatch("Test.#"))).addUpstreamConfiguration(new FederationUpstreamConfiguration().setName("server1-upstream").addPolicyRef("myQueuePolicy").setStaticConnectors(Collections.singletonList("server-pull-1"))));
        getServer(0).getFederationManager().deploy();
        ConnectionFactory cf = getCF(0);
        ConnectionFactory cf2 = getCF(0);
        ConnectionFactory cf3 = getCF(1);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                Connection createConnection3 = cf3.createConnection();
                try {
                    createConnection.start();
                    Session createSession = createConnection.createSession();
                    createSession.createConsumer(createSession.createQueue("Test.Q.1"));
                    createConnection2.start();
                    Session createSession2 = createConnection2.createSession();
                    Queue createQueue = createSession2.createQueue("Test.Q.2");
                    MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                    Session createSession3 = createConnection3.createSession();
                    MessageProducer createProducer = createSession3.createProducer(createQueue);
                    createProducer.send(createSession3.createTextMessage("hello"));
                    assertNotNull(createConsumer.receive(1000L));
                    createConnection.close();
                    createProducer.send(createSession3.createTextMessage("hello"));
                    assertNotNull(createConsumer.receive(1000L));
                    if (createConnection3 != null) {
                        createConnection3.close();
                    }
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection3 != null) {
                        try {
                            createConnection3.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    private void testFederatedQueuePullFromUpstream(String str) throws Exception {
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection.start();
                Session createSession = createConnection.createSession();
                Queue createQueue = createSession.createQueue(str);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                createProducer.send(createSession.createTextMessage("1"));
                createConnection2.start();
                Session createSession2 = createConnection2.createSession();
                Queue createQueue2 = createSession2.createQueue(str);
                createSession2.createProducer(createQueue).send(createSession2.createTextMessage("0"));
                MessageConsumer createConsumer = createSession2.createConsumer(createQueue2);
                waitForBindings(getServer(1), str, true, 1, 1, 2000L);
                Wait.assertEquals(1, () -> {
                    return getMessageCount(getServer(0), str);
                }, 2000L, 100L);
                Wait.assertEquals(1, () -> {
                    return getMessageCount(getServer(1), str);
                }, 2000L, 100L);
                assertNotNull(createConsumer.receive(1000L));
                assertNotNull(createConsumer.receive(4000L));
                Wait.assertEquals(0L, () -> {
                    return getMessageCount(getServer(0), str);
                }, 2000L, 100L);
                Wait.assertEquals(0L, () -> {
                    return getMessageCount(getServer(1), str);
                }, 2000L, 100L);
                assertNull(createConsumer.receiveNoWait());
                for (int i = 0; i < 150; i++) {
                    createProducer.send(createSession.createTextMessage("1-" + i));
                }
                Wait.assertTrue(() -> {
                    return getMessageCount(getServer(1), str) > 100;
                }, 2000L, 200L);
                Wait.assertTrue(() -> {
                    return getMessageCount(getServer(1), str) < 150;
                }, 2000L, 200L);
                Wait.assertTrue(() -> {
                    return getMessageCount(getServer(0), str) > 10;
                }, 2000L, 100L);
                Wait.assertTrue(() -> {
                    return getMessageCount(getServer(0), str) < 100;
                }, 2000L, 100L);
                for (int i2 = 0; i2 < 150; i2++) {
                    assertNotNull(createConsumer.receive(4000L));
                }
                assertNull(createConsumer.receiveNoWait());
                createConsumer.close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
