package org.apache.activemq.artemis.tests.integration.amqp.connect;

import io.vertx.core.Vertx;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServerOptions;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/ValidateAMQPErrorsTest.class */
public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected static final int AMQP_PORT_2 = 5673;
    protected Vertx vertx;
    protected MockServer mockServer;

    public void startVerx() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void stop() throws Exception {
        try {
            if (this.mockServer != null) {
                this.mockServer.close();
                this.mockServer = null;
            }
            if (this.vertx != null) {
                try {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    this.vertx.close(asyncResult -> {
                        countDownLatch.countDown();
                    });
                    Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                    this.vertx = null;
                } catch (Throwable th) {
                    this.vertx = null;
                    throw th;
                }
            }
        } finally {
            AssertionLoggerHandler.stopCapture();
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(5672, false);
    }

    @Test
    public void testConnectItself() throws Exception {
        try {
            AssertionLoggerHandler.startCapture();
            AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672").setReconnectAttempts(10).setRetryInterval(1);
            retryInterval.addElement(new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(retryInterval);
            this.server.start();
            Assert.assertEquals(1L, this.server.getBrokerConnections().size());
            this.server.getBrokerConnections().forEach(brokerConnection -> {
                Objects.requireNonNull(brokerConnection);
                Wait.assertFalse(brokerConnection::isStarted);
            });
            Wait.assertTrue(() -> {
                return AssertionLoggerHandler.findText(new String[]{"AMQ111001"});
            });
            AssertionLoggerHandler.clear();
            Thread.sleep(100L);
            Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ111002"}));
            Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ111003"}));
        } finally {
            AssertionLoggerHandler.stopCapture();
        }
    }

    @Test
    public void testCloseLinkOnMirror() throws Exception {
        try {
            AssertionLoggerHandler.startCapture();
            ActiveMQServer createServer = createServer(AMQP_PORT_2, false);
            AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5673").setReconnectAttempts(-1).setRetryInterval(10);
            retryInterval.addElement(new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(retryInterval);
            this.server.start();
            Assert.assertEquals(1L, this.server.getBrokerConnections().size());
            Wait.assertTrue(() -> {
                return AssertionLoggerHandler.findText(new String[]{"AMQ111002"});
            });
            this.server.getBrokerConnections().forEach(brokerConnection -> {
                Wait.assertTrue(() -> {
                    return ((AMQPBrokerConnection) brokerConnection).isConnecting();
                });
            });
            createServer.start();
            this.server.getBrokerConnections().forEach(brokerConnection2 -> {
                Wait.assertFalse(() -> {
                    return ((AMQPBrokerConnection) brokerConnection2).isConnecting();
                });
            });
            createAddressAndQueues(this.server);
            Wait.assertTrue(() -> {
                return createServer.locateQueue(getQueueName()) != null;
            });
            RemotingService remotingService = createServer.getRemotingService();
            Objects.requireNonNull(remotingService);
            Wait.assertEquals(1, remotingService::getConnectionCount);
            createServer.getRemotingService().getConnections().forEach(remotingConnection -> {
                try {
                    ActiveMQProtonRemotingConnection activeMQProtonRemotingConnection = (ActiveMQProtonRemotingConnection) remotingConnection;
                    ConnectionImpl connection = activeMQProtonRemotingConnection.getAmqpConnection().getHandler().getConnection();
                    Wait.waitFor(() -> {
                        return connection.linkHead(EnumSet.of(EndpointState.ACTIVE), EnumSet.of(EndpointState.ACTIVE)) != null;
                    });
                    activeMQProtonRemotingConnection.getAmqpConnection().runNow(() -> {
                        connection.linkHead(EnumSet.of(EndpointState.ACTIVE), EnumSet.of(EndpointState.ACTIVE)).close();
                        activeMQProtonRemotingConnection.flush();
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
            ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Session createSession = createConnection.createSession(false, 1);
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
                for (int i = 0; i < 10; i++) {
                    createProducer.send(createSession.createTextMessage("message " + i));
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                createConnection = createConnectionFactory2.createConnection();
                try {
                    Session createSession2 = createConnection.createSession(false, 1);
                    MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
                    createConnection.start();
                    for (int i2 = 0; i2 < 10; i2++) {
                        Assert.assertEquals("message " + i2, createConsumer.receive(5000L).getText());
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            AssertionLoggerHandler.stopCapture();
        }
    }

    @Test
    public void testCloseLinkOnSender() throws Exception {
        testCloseLink(true);
    }

    @Test
    public void testCloseLinkOnReceiver() throws Exception {
        testCloseLink(false);
    }

    public void testCloseLink(boolean z) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AssertionLoggerHandler.startCapture(true);
        ActiveMQServer createServer = createServer(AMQP_PORT_2, false);
        if (z) {
            AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5673").setReconnectAttempts(-1).setRetryInterval(10);
            retryInterval.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
            this.server.getConfiguration().addAMQPConnection(retryInterval);
        } else {
            AMQPBrokerConnectConfiguration retryInterval2 = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(10);
            retryInterval2.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.RECEIVER));
            createServer.getConfiguration().addAMQPConnection(retryInterval2);
        }
        if (z) {
            this.server.start();
            Assert.assertEquals(1L, this.server.getBrokerConnections().size());
        } else {
            createServer.start();
            Assert.assertEquals(1L, createServer.getBrokerConnections().size());
        }
        Wait.assertTrue(() -> {
            return AssertionLoggerHandler.findText(new String[]{"AMQ111002"});
        });
        this.server.getBrokerConnections().forEach(brokerConnection -> {
            Wait.assertTrue(() -> {
                return ((AMQPBrokerConnection) brokerConnection).isConnecting();
            });
        });
        if (z) {
            createServer.start();
        } else {
            this.server.start();
        }
        this.server.getBrokerConnections().forEach(brokerConnection2 -> {
            Wait.assertFalse(() -> {
                return ((AMQPBrokerConnection) brokerConnection2).isConnecting();
            });
        });
        createAddressAndQueues(this.server);
        createAddressAndQueues(createServer);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(getQueueName()) != null;
        });
        Wait.assertTrue(() -> {
            return createServer.locateQueue(getQueueName()) != null;
        });
        ActiveMQServer activeMQServer = z ? createServer : this.server;
        RemotingService remotingService = activeMQServer.getRemotingService();
        Objects.requireNonNull(remotingService);
        Wait.assertEquals(1, remotingService::getConnectionCount);
        activeMQServer.getRemotingService().getConnections().forEach(remotingConnection -> {
            try {
                ActiveMQProtonRemotingConnection activeMQProtonRemotingConnection = (ActiveMQProtonRemotingConnection) remotingConnection;
                ConnectionImpl connection = activeMQProtonRemotingConnection.getAmqpConnection().getHandler().getConnection();
                Wait.waitFor(() -> {
                    return connection.linkHead(EnumSet.of(EndpointState.ACTIVE), EnumSet.of(EndpointState.ACTIVE)) != null;
                });
                activeMQProtonRemotingConnection.getAmqpConnection().runNow(() -> {
                    connection.linkHead(EnumSet.of(EndpointState.ACTIVE), EnumSet.of(EndpointState.ACTIVE)).close();
                    activeMQProtonRemotingConnection.flush();
                });
            } catch (Exception e) {
                atomicInteger.incrementAndGet();
                e.printStackTrace();
            }
        });
        Wait.assertEquals(1, () -> {
            return AssertionLoggerHandler.countText(new String[]{"AMQ119021"});
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
            for (int i = 0; i < 10; i++) {
                createProducer.send(createSession.createTextMessage("message " + i));
            }
            if (createConnection != null) {
                createConnection.close();
            }
            createConnection = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection.createSession(false, 1);
                MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
                createConnection.start();
                for (int i2 = 0; i2 < 10; i2++) {
                    Assert.assertEquals("message " + i2, createConsumer.receive(5000L).getText());
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                Assert.assertEquals(0L, atomicInteger.get());
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testTimeoutOnSenderOpen() throws Exception {
        startVerx();
        this.mockServer = new MockServer(this.vertx, new ProtonServerOptions(), null, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
            });
        });
        try {
            AssertionLoggerHandler.startCapture(true);
            AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:" + this.mockServer.actualPort() + "?connect-timeout-millis=20").setReconnectAttempts(5).setRetryInterval(10);
            retryInterval.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
            retryInterval.addElement(new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(retryInterval);
            this.server.start();
            Wait.assertTrue(() -> {
                return AssertionLoggerHandler.findText(new String[]{"AMQ111001"});
            });
            Wait.assertEquals(6, () -> {
                return AssertionLoggerHandler.countText(new String[]{"AMQ119020"});
            });
        } finally {
            this.mockServer.close();
        }
    }

    @Test
    public void testReconnectAfterSenderOpenTimeout() throws Exception {
        startVerx();
        AssertionLoggerHandler.startCapture(true);
        ProtonServerOptions protonServerOptions = new ProtonServerOptions();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicInteger atomicInteger4 = new AtomicInteger(0);
        ConcurrentHashSet concurrentHashSet = new ConcurrentHashSet();
        this.mockServer = new MockServer(this.vertx, protonServerOptions, null, protonConnection -> {
            protonConnection.disconnectHandler(protonConnection -> {
                atomicInteger2.incrementAndGet();
                concurrentHashSet.remove(protonConnection);
            });
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                    concurrentHashSet.remove(protonConnection);
                });
                protonConnection.open();
                concurrentHashSet.add(protonConnection);
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                if (atomicInteger.incrementAndGet() > 2) {
                    if (atomicInteger.get() == 3) {
                        try {
                            cyclicBarrier.await(10L, TimeUnit.SECONDS);
                            countDownLatch.await(10L, TimeUnit.SECONDS);
                            return;
                        } catch (Throwable th) {
                        }
                    }
                    HashMap hashMap = new HashMap();
                    hashMap.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id");
                    protonReceiver.setProperties(hashMap);
                    protonReceiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                    protonReceiver.open();
                    protonReceiver.handler((protonDelivery, message) -> {
                        Object obj;
                        if (message.getApplicationProperties() == null || (obj = message.getApplicationProperties().getValue().get("sender")) == null) {
                            return;
                        }
                        if (atomicInteger3.get() != ((Integer) obj).intValue()) {
                            logger.warn("Message out of order. Expected {} but received {}", Integer.valueOf(atomicInteger3.get()), obj);
                            atomicInteger4.incrementAndGet();
                        }
                        atomicInteger3.incrementAndGet();
                    });
                }
            });
        });
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:" + this.mockServer.actualPort() + "?connect-timeout-millis=1000").setReconnectAttempts(10).setRetryInterval(10);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        countDownLatch.countDown();
        Objects.requireNonNull(atomicInteger2);
        Wait.assertEquals(2, atomicInteger2::intValue);
        Objects.requireNonNull(concurrentHashSet);
        Wait.assertEquals(1, concurrentHashSet::size);
        Wait.assertEquals(3, () -> {
            return AssertionLoggerHandler.countText(new String[]{"AMQ119020"});
        });
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
            for (int i = 0; i < 100; i++) {
                TextMessage createTextMessage = createSession.createTextMessage("hello");
                createTextMessage.setIntProperty("sender", i);
                createProducer.send(createTextMessage);
            }
            if (createConnection != null) {
                createConnection.close();
            }
            Objects.requireNonNull(atomicInteger3);
            Wait.assertEquals(100, atomicInteger3::intValue, 5000L);
            Assert.assertEquals(0.0f, atomicInteger4.get(), 5000.0f);
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testNoServerOfferedMirrorCapability() throws Exception {
        startVerx();
        this.mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                protonReceiver.open();
            });
        });
        AssertionLoggerHandler.startCapture(true);
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:" + this.mockServer.actualPort() + "?connect-timeout-millis=100").setReconnectAttempts(5).setRetryInterval(10);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        Wait.assertTrue(() -> {
            return AssertionLoggerHandler.findText(new String[]{"AMQ111001"});
        });
        Assert.assertEquals(6L, AssertionLoggerHandler.countText(new String[]{"AMQ119018"}));
    }

    @Test
    public void testReconnectAfterMirrorLinkRefusal() throws Exception {
        startVerx();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger3 = new AtomicInteger();
        AtomicInteger atomicInteger4 = new AtomicInteger(0);
        this.mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.disconnectHandler(protonConnection -> {
                synchronizedList2.add(protonConnection);
            });
            protonConnection.openHandler(asyncResult -> {
                synchronizedList.add(protonConnection);
                protonConnection.open();
            });
            protonConnection.closeHandler(asyncResult2 -> {
                protonConnection.close();
                synchronizedList.remove(protonConnection);
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                String address = remoteTarget == null ? null : remoteTarget.getAddress();
                if (address == null || !address.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) {
                    atomicInteger.incrementAndGet();
                    logger.warn("Receiving address as {}", address);
                    return;
                }
                if (atomicInteger4.incrementAndGet() != 2) {
                    logger.debug("Link Opens::{}", atomicInteger4);
                    logger.debug("ServerReceiver = {}", protonReceiver.getTarget());
                    protonReceiver.setTarget((Target) null);
                    protonReceiver.handler((protonDelivery, message) -> {
                        atomicInteger3.incrementAndGet();
                        logger.debug("Should not have got message on refused link: {}", message);
                    });
                    protonReceiver.open();
                    this.vertx.setTimer(20L, l -> {
                        protonReceiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, "Testing refusal of mirror link for $reasons"));
                        protonReceiver.close();
                    });
                    return;
                }
                protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                HashMap hashMap = new HashMap();
                hashMap.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id");
                protonReceiver.setProperties(hashMap);
                protonReceiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
                protonReceiver.handler((protonDelivery2, message2) -> {
                    Object obj;
                    logger.debug("prefetch = {}, Got message: {}", Integer.valueOf(protonReceiver.getPrefetch()), message2);
                    if (message2.getApplicationProperties() != null && (obj = message2.getApplicationProperties().getValue().get("sender")) != null) {
                        if (atomicInteger2.get() != ((Integer) obj).intValue()) {
                            logger.warn("Message out of order. Expected {} but received {}", Integer.valueOf(atomicInteger2.get()), obj);
                            atomicInteger.incrementAndGet();
                        }
                        atomicInteger2.incrementAndGet();
                    }
                    protonDelivery2.disposition(Accepted.getInstance(), true);
                    if (protonReceiver.getPrefetch() == 0) {
                        protonReceiver.flow(1);
                    }
                });
                protonReceiver.open();
            });
        });
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:" + this.mockServer.actualPort()).setReconnectAttempts(3).setRetryInterval(10);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server.getConfiguration().addAMQPConnection(retryInterval);
        this.server.start();
        Objects.requireNonNull(synchronizedList2);
        Wait.assertEquals(1, synchronizedList2::size, 6000L);
        Objects.requireNonNull(synchronizedList);
        Wait.assertEquals(2, synchronizedList::size, 6000L);
        assertSame(synchronizedList.get(0), synchronizedList2.get(0));
        assertFalse(((ProtonConnection) synchronizedList.get(1)).isDisconnected());
        assertEquals("Should not have got any message on refused link", 0L, atomicInteger3.get());
        assertEquals(0L, atomicInteger.get());
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
            for (int i = 0; i < 100; i++) {
                TextMessage createTextMessage = createSession.createTextMessage("hello");
                createTextMessage.setIntProperty("sender", i);
                createProducer.send(createTextMessage);
            }
            if (createConnection != null) {
                createConnection.close();
            }
            Objects.requireNonNull(atomicInteger2);
            Wait.assertEquals(100, atomicInteger2::intValue);
            assertEquals(0L, atomicInteger.get());
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testNoClientDesiredMirrorCapability() throws Exception {
        AssertionLoggerHandler.startCapture();
        this.server.start();
        AmqpClient amqpClient = new AmqpClient(new URI("tcp://localhost:5672"), (String) null, (String) null);
        amqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.ValidateAMQPErrorsTest.1
            public void inspectOpenedResource(Sender sender) {
                ErrorCondition remoteCondition = sender.getRemoteCondition();
                if (remoteCondition == null || remoteCondition.getCondition() == null) {
                    markAsInvalid("Sender should have been detached with an error");
                    return;
                }
                if (!remoteCondition.getCondition().equals(AmqpError.ILLEGAL_STATE)) {
                    markAsInvalid("Should have been closed with an illegal state error, but error was: " + remoteCondition);
                }
                if (!remoteCondition.getDescription().contains("AMQ119024")) {
                    markAsInvalid("should have indicated the error code about missing a desired capability");
                }
                if (remoteCondition.getDescription().contains(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
                    return;
                }
                markAsInvalid("should have indicated the error code about missing a desired capability");
            }
        });
        String mirrorAddress = ProtonProtocolManager.getMirrorAddress(getTestName());
        AmqpConnection connect = amqpClient.connect();
        try {
            try {
                connect.createSession().createSender(mirrorAddress);
                fail("Link should have been refused.");
            } catch (Exception e) {
                Assert.assertTrue(e.getMessage().contains("AMQ119024"));
                logger.debug("Caught expected exception");
            }
            connect.getStateInspector().assertValid();
            connect.close();
            Wait.assertTrue(() -> {
                return AssertionLoggerHandler.findText(new String[]{"AMQ119024"});
            });
        } catch (Throwable th) {
            connect.close();
            throw th;
        }
    }
}
