package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.transport.netty.NettyTransportOptions;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(20)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.class */
public class AMQPBrokerConnectionReceiverTest extends AmqpClientTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(NettyTransportOptions.DEFAULT_TCP_PORT, false);
    }

    @Test
    public void testBrokerConnectionCreatesReceiverOnRemote() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofReceiver().respondInKind();
            protonTestServer.expectFlow();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            LOG.info("Test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectionElement aMQPBrokerConnectionElement = new AMQPBrokerConnectionElement();
            aMQPBrokerConnectionElement.setType(AMQPBrokerConnectionAddressType.RECEIVER);
            aMQPBrokerConnectionElement.setName(getTestName());
            aMQPBrokerConnectionElement.setMatchAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPBrokerConnectionElement);
            aMQPBrokerConnectConfiguration.setAutostart(true);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.createQueue(new QueueConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            protonTestServer.waitForScriptToComplete();
            protonTestServer.expectClose();
            protonTestServer.remoteClose().now();
            protonTestServer.waitForScriptToComplete();
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testIncomingMessageWithNoToFieldArrivesOnConfiguredAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            LOG.info("Test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectionElement aMQPBrokerConnectionElement = new AMQPBrokerConnectionElement();
            aMQPBrokerConnectionElement.setType(AMQPBrokerConnectionAddressType.RECEIVER);
            aMQPBrokerConnectionElement.setName(getTestName());
            aMQPBrokerConnectionElement.setMatchAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPBrokerConnectionElement);
            aMQPBrokerConnectConfiguration.setAutostart(true);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).and().withTarget().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).and().respondInKind();
            protonTestServer.expectFlow();
            protonTestServer.remoteTransfer().withDeliveryId(1).withBody().withString("test-body").also().queue();
            protonTestServer.expectDisposition().withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                Assertions.assertNotNull(createConsumer.receive(5000L));
                createConsumer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectClose();
                protonTestServer.remoteClose().now();
                protonTestServer.waitForScriptToComplete();
                protonTestServer.close();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testIncomingMessageWithToFieldArrivesOnConfiguredAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            LOG.info("Test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectionElement aMQPBrokerConnectionElement = new AMQPBrokerConnectionElement();
            aMQPBrokerConnectionElement.setType(AMQPBrokerConnectionAddressType.RECEIVER);
            aMQPBrokerConnectionElement.setName(getTestName());
            aMQPBrokerConnectionElement.setMatchAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPBrokerConnectionElement);
            aMQPBrokerConnectConfiguration.setAutostart(true);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).and().withTarget().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).and().respondInKind();
            protonTestServer.expectFlow();
            protonTestServer.remoteTransfer().withDeliveryId(1).withProperties().withTo("should-not-be-used").also().withBody().withString("test-body").also().queue();
            protonTestServer.expectDisposition().withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                Assertions.assertNotNull(createConsumer.receive(5000L));
                createConsumer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectClose();
                protonTestServer.remoteClose().now();
                protonTestServer.waitForScriptToComplete();
                protonTestServer.close();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testBrokerConnectionRetriesReceiverOnRemoteIfAttachRejected() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofReceiver().reject(true, LinkError.DETACH_FORCED.toString(), "Attach refused");
            protonTestServer.expectDetach().optional();
            protonTestServer.expectClose().optional();
            protonTestServer.expectConnectionToDrop();
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofReceiver().respondInKind();
            protonTestServer.expectFlow();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            LOG.info("Test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectionElement aMQPBrokerConnectionElement = new AMQPBrokerConnectionElement();
            aMQPBrokerConnectionElement.setType(AMQPBrokerConnectionAddressType.RECEIVER);
            aMQPBrokerConnectionElement.setName(getTestName());
            aMQPBrokerConnectionElement.setMatchAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
            aMQPBrokerConnectConfiguration.setRetryInterval(100);
            aMQPBrokerConnectConfiguration.addElement(aMQPBrokerConnectionElement);
            aMQPBrokerConnectConfiguration.setAutostart(true);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.createQueue(new QueueConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            protonTestServer.waitForScriptToComplete();
            protonTestServer.expectClose();
            protonTestServer.remoteClose().now();
            protonTestServer.waitForScriptToComplete();
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testBrokerConnectionRetriesReceiverOnRemoteIfTargetQueueRemovedAndLaterAddedBack() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.execute(() -> {
                try {
                    this.server.createQueue(new QueueConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
                } catch (Exception e) {
                    LOG.warn("Error on creating server address and queue: ", e);
                }
            }).queue();
            protonTestServer.expectAttach().ofReceiver();
            protonTestServer.execute(() -> {
                try {
                    this.server.removeAddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), (SecurityAuth) null, true);
                } catch (Exception e) {
                    LOG.warn("Error on removing server address and queue: ", e);
                }
                protonTestServer.respondToLastAttach().now();
            }).queue();
            protonTestServer.expectDetach().respond();
            protonTestServer.execute(() -> {
                try {
                    this.server.createQueue(new QueueConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
                } catch (Exception e) {
                    LOG.warn("Error on creating server address and queue: ", e);
                }
            }).queue();
            protonTestServer.expectAttach().ofReceiver().respondInKind();
            protonTestServer.expectFlow();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            LOG.info("Test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectionElement aMQPBrokerConnectionElement = new AMQPBrokerConnectionElement();
            aMQPBrokerConnectionElement.setType(AMQPBrokerConnectionAddressType.RECEIVER);
            aMQPBrokerConnectionElement.setName(getTestName());
            aMQPBrokerConnectionElement.setMatchAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
            aMQPBrokerConnectConfiguration.setRetryInterval(50);
            aMQPBrokerConnectConfiguration.addElement(aMQPBrokerConnectionElement);
            aMQPBrokerConnectConfiguration.setAutostart(true);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete();
            protonTestServer.expectClose();
            protonTestServer.remoteClose().now();
            protonTestServer.waitForScriptToComplete();
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
