package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.class */
public class AMQPBridgeTest extends AmqpClientTestSupport {
    protected static final int AMQP_PORT_2 = 5673;
    ActiveMQServer server_2;

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(5672, false);
    }

    @Test
    public void testsSimpleConnect() throws Exception {
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672"));
        this.server_2.start();
    }

    @Test
    public void testSimpleTransferPush() throws Exception {
        internalTransferPush("TEST", false, false);
    }

    @Test
    public void testSimpleTransferPushRestartBC() throws Exception {
        internalTransferPush("TEST", false, true);
    }

    @Test
    public void testSimpleTransferPushDeferredCreation() throws Exception {
        internalTransferPush("TEST", true, false);
    }

    @Test
    public void testSimpleTransferPushDeferredCreationRestartBC() throws Exception {
        internalTransferPush("TEST", true, true);
    }

    public void internalTransferPush(String str, boolean z, boolean z2) throws Exception {
        this.server.setIdentity("targetServer");
        this.server.start();
        this.server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(str), RoutingType.ANYCAST));
        this.server.createQueue(new QueueConfiguration(str).setRoutingType(RoutingType.ANYCAST));
        this.server_2 = createServer(AMQP_PORT_2, false);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration.addElement(new AMQPBrokerConnectionElement().setMatchAddress(str).setType(AMQPBrokerConnectionAddressType.SENDER));
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        if (!z) {
            this.server_2.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(str).addRoutingType(RoutingType.ANYCAST));
            this.server_2.getConfiguration().addQueueConfiguration(new QueueConfiguration(str).setRoutingType(RoutingType.ANYCAST));
        }
        this.server_2.setIdentity("serverWithBridge");
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server_2;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isStarted);
        if (z) {
            this.server_2.addAddressInfo(new AddressInfo(str).addRoutingType(RoutingType.ANYCAST));
            this.server_2.createQueue(new QueueConfiguration(str).setRoutingType(RoutingType.ANYCAST));
        }
        if (z2) {
            this.server_2.stopBrokerConnection(AutoCreateJmsDestinationTest.QUEUE_NAME);
            Thread.sleep(1000L);
            this.server_2.startBrokerConnection(AutoCreateJmsDestinationTest.QUEUE_NAME);
        }
        Session createSession = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        createProducer.setDeliveryMode(2);
        String str2 = null;
        for (int i = 0; i < 30; i++) {
            if (i == 0) {
                StringBuffer stringBuffer = new StringBuffer();
                for (int i2 = 0; i2 < 10024; i2++) {
                    stringBuffer.append("*******************************************************************************************************************************");
                }
                str2 = stringBuffer.toString();
                createProducer.send(createSession.createTextMessage(stringBuffer.toString()));
            } else {
                createProducer.send(createSession.createMessage());
            }
        }
        Queue locateQueue = this.server_2.locateQueue(str);
        Assert.assertNotNull(locateQueue);
        locateQueue.getClass();
        Wait.assertEquals(0L, locateQueue::getMessageCount);
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        Session createSession2 = createConnection.createSession(false, 1);
        createConnection.start();
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(str));
        for (int i3 = 0; i3 < 30; i3++) {
            TextMessage receive = createConsumer.receive(5000L);
            if (receive instanceof TextMessage) {
                if (receive instanceof TextMessage) {
                    Assert.assertEquals(str2, receive.getText());
                } else {
                    System.out.println("i = " + i3);
                }
            }
        }
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Test
    public void testSimpleTransferPull() throws Exception {
        internaltestSimpleTransferPull(false);
    }

    @Test
    public void testSimpleTransferPullSecurity() throws Exception {
        internaltestSimpleTransferPull(true);
    }

    public void internaltestSimpleTransferPull(boolean z) throws Exception {
        this.server.setIdentity("targetServer");
        if (z) {
            enableSecurity(this.server, "#");
        }
        this.server.start();
        this.server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("TEST"), RoutingType.ANYCAST));
        this.server.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST));
        this.server_2 = createServer(AMQP_PORT_2, false);
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672").setRetryInterval(10);
        if (z) {
            retryInterval.setUser(this.fullUser).setPassword("wrongPassword");
        }
        retryInterval.addElement(new AMQPBrokerConnectionElement().setMatchAddress("TEST").setType(AMQPBrokerConnectionAddressType.RECEIVER));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval);
        this.server_2.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("TEST").addRoutingType(RoutingType.ANYCAST));
        this.server_2.getConfiguration().addQueueConfiguration(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST));
        this.server_2.setIdentity("serverWithBridge");
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server_2;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isStarted);
        Session createSession = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection(this.fullUser, this.fullPass).createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("TEST"));
        createProducer.setDeliveryMode(2);
        String str = null;
        for (int i = 0; i < 30; i++) {
            if (i == 0) {
                StringBuffer stringBuffer = new StringBuffer();
                for (int i2 = 0; i2 < 10024; i2++) {
                    stringBuffer.append("*******************************************************************************************************************************");
                }
                str = stringBuffer.toString();
                createProducer.send(createSession.createTextMessage(stringBuffer.toString()));
            } else {
                createProducer.send(createSession.createMessage());
            }
        }
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
        Session createSession2 = createConnection.createSession(false, 1);
        createConnection.start();
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue("TEST"));
        if (z) {
            Thread.sleep(500L);
            Assert.assertNull(createConsumer.receiveNoWait());
            retryInterval.setUser(this.fullUser).setPassword(this.fullPass);
        }
        for (int i3 = 0; i3 < 30; i3++) {
            TextMessage receive = createConsumer.receive(5000L);
            if (receive instanceof TextMessage) {
                if (receive instanceof TextMessage) {
                    Assert.assertEquals(str, receive.getText());
                } else {
                    System.out.println("i = " + i3);
                }
            }
        }
        Assert.assertNull(createConsumer.receiveNoWait());
    }
}
