package org.apache.activemq.artemis.tests.integration.federation;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Objects;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.class */
public class NettyFederatedQueueTest extends FederatedTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase
    protected boolean isNetty() {
        return true;
    }

    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase
    protected boolean isPersistenceEnabled() {
        return true;
    }

    @Test
    public void testFederatedQueueBiDirectionalUpstream() throws Exception {
        AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler();
        runAfter(() -> {
            assertionLoggerHandler.close();
        });
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
        getServer(0).getFederationManager().deploy();
        getServer(1).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server0", name));
        getServer(1).getFederationManager().deploy();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616?consumerWindowSize=0");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61617?consumerWindowSize=0");
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Objects.requireNonNull(createConnection);
        runAfter(createConnection::close);
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(name));
        Connection createConnection2 = createConnectionFactory2.createConnection();
        createConnection2.start();
        Objects.requireNonNull(createConnection2);
        runAfter(createConnection2::close);
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(name));
        createProducer.send(createSession.createTextMessage("Test"));
        createSession.commit();
        Assertions.assertNotNull(createConsumer.receive(5000L));
        for (int i = 0; i < 1000; i++) {
            createProducer.send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
        }
        createSession.commit();
        MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createQueue(name));
        for (int i2 = 0; i2 < 100; i2++) {
            Assertions.assertNotNull(createConsumer2.receive(5000L));
            createSession.commit();
            Assertions.assertNotNull(createConsumer.receive(5000L));
        }
        Assertions.assertNotNull(createConsumer.receive(5000L));
        Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ222153"}));
    }
}
