package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.class */
public class AMQPReplicaTest extends AmqpClientTestSupport {
    protected static final int AMQP_PORT_2 = 5673;
    protected static final int AMQP_PORT_3 = 5674;
    public static final int TIME_BEFORE_RESTART = 1000;
    ActiveMQServer server_2;

    @Before
    public void startLogging() {
        AssertionLoggerHandler.startCapture();
    }

    @After
    public void stopLogging() {
        try {
            Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ222214"}));
        } finally {
            AssertionLoggerHandler.stopCapture();
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(5672, false);
    }

    @Test
    public void testReplicaCatchupOnQueueCreates() throws Exception {
        this.server.setIdentity("Server1");
        this.server.stop();
        this.server_2 = createServer(AMQP_PORT_2, false);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration("sometest").setDurable(true));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue("sometest") != null;
        });
        this.server_2.stop();
        this.server.start();
        Assert.assertTrue(this.server.locateQueue("sometest") == null);
        ActiveMQServer activeMQServer = this.server;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isActive);
        this.server_2.start();
        Wait.assertTrue(() -> {
            return this.server.locateQueue("sometest") != null;
        });
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testDeleteQueueWithRemoveFalse() throws Exception {
        this.server.setIdentity("Server1");
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setQueueRemoval(false));
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.server_2.start();
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        this.server_2.addAddressInfo(new AddressInfo(randomSimpleString).setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration(randomSimpleString).setDurable(true));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(randomSimpleString) != null;
        });
        Wait.assertTrue(() -> {
            return this.server.locateQueue(randomSimpleString) != null;
        });
        this.server_2.destroyQueue(randomSimpleString);
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(randomSimpleString) == null;
        });
        Thread.sleep(100L);
        Assert.assertTrue("Queue was removed when it was configured to not remove it", this.server.locateQueue(randomSimpleString) != null);
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testSendCreateQueue() throws Exception {
        doSendCreateQueueTestImpl(true);
    }

    @Test
    public void testDoNotSendCreateQueue() throws Exception {
        doSendCreateQueueTestImpl(false);
    }

    private void doSendCreateQueueTestImpl(boolean z) throws Exception {
        this.server.start();
        SimpleString simpleString = SimpleString.toSimpleString("address");
        this.server_2 = createServer(AMQP_PORT_2, false);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
        if (z) {
            aMQPMirrorBrokerConnectionElement.setQueueCreation(true);
            aMQPMirrorBrokerConnectionElement.setQueueRemoval(false);
        } else {
            aMQPMirrorBrokerConnectionElement.setQueueCreation(false);
        }
        aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isActive);
        this.server_2.addAddressInfo(new AddressInfo(simpleString).addRoutingType(RoutingType.ANYCAST));
        this.server_2.createQueue(new QueueConfiguration(simpleString).setDurable(true).setAddress(simpleString));
        if (z) {
            Wait.assertTrue(() -> {
                return this.server.locateQueue(simpleString) != null;
            });
            Wait.assertTrue(() -> {
                return this.server.getAddressInfo(simpleString) != null;
            });
        } else {
            Thread.sleep(250L);
            Assert.assertTrue(this.server.locateQueue(simpleString) == null);
            Assert.assertTrue(this.server.getAddressInfo(simpleString) == null);
        }
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testReplicaCatchupOnQueueCreatesAndDeletes() throws Exception {
        this.server.start();
        this.server.setIdentity("Server1");
        this.server.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false).addRoutingType(RoutingType.MULTICAST));
        this.server.createQueue(new QueueConfiguration("ToBeGone").setDurable(true).setRoutingType(RoutingType.MULTICAST));
        this.server.stop();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false).addRoutingType(RoutingType.MULTICAST));
        this.server_2.createQueue(new QueueConfiguration("sometest").setDurable(true).setRoutingType(RoutingType.MULTICAST));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue("sometest") != null;
        });
        this.server_2.stop();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration2.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
        this.server.start();
        Assert.assertTrue(this.server.locateQueue("sometest") == null);
        Assert.assertTrue(this.server.locateQueue("ToBeGone") != null);
        ActiveMQServer activeMQServer = this.server;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isActive);
        this.server_2.start();
        Wait.assertTrue(() -> {
            return this.server.locateQueue("sometest") != null;
        });
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testReplicaWithDurable() throws Exception {
        this.server.start();
        this.server.setIdentity("Server1");
        this.server.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false).addRoutingType(RoutingType.MULTICAST));
        this.server.stop();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false).addRoutingType(RoutingType.MULTICAST));
        this.server_2.createQueue(new QueueConfiguration("sometest").setDurable(true).setRoutingType(RoutingType.MULTICAST));
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue("sometest") != null;
        });
        this.server_2.stop();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration2.addElement(new AMQPMirrorBrokerConnectionElement());
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
        this.server.start();
        Assert.assertTrue(this.server.locateQueue("sometest") == null);
        ActiveMQServer activeMQServer = this.server;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isActive);
        this.server_2.start();
        Wait.assertTrue(() -> {
            return this.server.locateQueue("sometest") != null;
        });
        this.server_2.stop();
        this.server.stop();
    }

    @Test
    public void testReplicaLargeMessages() throws Exception {
        replicaTest(true, true, false, false, false, false, false);
    }

    @Test
    public void testReplicaLargeMessagesPagingEverywhere() throws Exception {
        replicaTest(true, true, true, true, false, false, false);
    }

    @Test
    public void testReplica() throws Exception {
        replicaTest(false, true, false, false, false, false, false);
    }

    @Test
    public void testReplicaRestartBrokerConnection() throws Exception {
        replicaTest(false, true, false, false, false, false, true);
    }

    @Test
    public void testReplicaRestart() throws Exception {
        replicaTest(false, true, false, false, false, true, false);
    }

    @Test
    public void testReplicaDeferredStart() throws Exception {
        replicaTest(false, true, false, false, true, false, false);
    }

    @Test
    public void testReplicaCopyOnly() throws Exception {
        replicaTest(false, false, false, false, false, false, false);
    }

    @Test
    public void testReplicaPagedTarget() throws Exception {
        replicaTest(false, true, true, false, false, false, false);
    }

    @Test
    public void testReplicaPagingEverywhere() throws Exception {
        replicaTest(false, true, true, true, false, false, false);
    }

    private String getText(boolean z, int i) {
        if (!z) {
            return "Text " + i;
        }
        StringBuffer stringBuffer = new StringBuffer();
        while (stringBuffer.length() < 112640) {
            stringBuffer.append("Text " + i + " ");
        }
        return stringBuffer.toString();
    }

    @Test
    public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
        this.server.setIdentity("targetServer");
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval);
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server_2;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isStarted);
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
        Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ222214"}));
        Queue locateQueue = locateQueue(this.server_2, getQueueName());
        Queue locateQueue2 = locateQueue(this.server_2, getQueueName());
        AmqpConnection addConnection = addConnection(new AmqpClient(new URI("tcp://localhost:5673"), (String) null, (String) null).connect());
        AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setDeliveryAnnotation("gone", AutoCreateJmsDestinationTest.QUEUE_NAME);
            amqpMessage.setText(getText(true, i));
            createSender.send(amqpMessage);
        }
        createSender.close();
        addConnection.close();
        locateQueue.getClass();
        Wait.assertEquals(20, locateQueue::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(20, locateQueue2::getMessageCount);
        AmqpConnection addConnection2 = addConnection(new AmqpClient(new URI("tcp://localhost:5672"), (String) null, (String) null).connect());
        AmqpReceiver createReceiver = addConnection2.createSession().createReceiver(getQueueName());
        createReceiver.flow(20 + 1);
        for (int i2 = 0; i2 < 20; i2++) {
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull(receive);
            Assert.assertEquals(getText(true, i2), receive.getText());
            Assert.assertNull(receive.getDeliveryAnnotation("gone"));
        }
        Assert.assertNull(createReceiver.receiveNoWait());
        addConnection2.close();
    }

    @Test
    public void testNoAddressWithAnnotations() throws Exception {
        this.server.setIdentity("targetServer");
        this.server.start();
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME, "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval);
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server_2;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isStarted);
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
        Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ222214"}));
        AmqpConnection addConnection = addConnection(new AmqpClient(new URI("tcp://localhost:5673"), (String) null, (String) null).connect());
        AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
        for (int i = 0; i < 20; i++) {
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setDeliveryAnnotation("gone", AutoCreateJmsDestinationTest.QUEUE_NAME);
            amqpMessage.setText(getText(false, i));
            createSender.send(amqpMessage);
        }
        createSender.close();
        addConnection.close();
        AmqpConnection addConnection2 = addConnection(new AmqpClient(new URI("tcp://localhost:5672"), (String) null, (String) null).connect());
        AmqpReceiver createReceiver = addConnection2.createSession().createReceiver(getQueueName());
        createReceiver.flow(20 + 1);
        for (int i2 = 0; i2 < 20; i2++) {
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull(receive);
            Assert.assertEquals(getText(false, i2), receive.getText());
            Assert.assertNull(receive.getDeliveryAnnotation("gone"));
        }
        Assert.assertNull(createReceiver.receiveNoWait());
        addConnection2.close();
    }

    @Test
    public void testRouteSurviving() throws Exception {
        testRouteSurvivor(false);
    }

    @Test
    public void testRouteSurvivingStop() throws Exception {
        testRouteSurvivor(true);
    }

    private void testRouteSurvivor(boolean z) throws Exception {
        if (!z) {
            this.server.start();
        }
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        this.server_2.getConfiguration().setName("thisone");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("OtherSide", "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
        this.server_2.getConfiguration().addAMQPConnection(retryInterval);
        this.server_2.start();
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
        createProducer.setDeliveryMode(2);
        for (int i = 0; i < 200; i++) {
            createProducer.send(createSession.createTextMessage("i=" + i));
        }
        createConnection.close();
        if (!z) {
            Wait.assertTrue(() -> {
                return this.server.locateQueue(getQueueName()) != null;
            });
            Queue locateQueue = this.server.locateQueue(getQueueName());
            locateQueue.getClass();
            Wait.assertEquals(200, locateQueue::getMessageCount);
        }
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(getQueueName()) != null;
        });
        Queue locateQueue2 = this.server_2.locateQueue(getQueueName());
        locateQueue2.getClass();
        Wait.assertEquals(200, locateQueue2::getMessageCount);
        if (!z) {
            this.server.stop();
        }
        this.server_2.stop();
        this.server.start();
        this.server_2.start();
        Wait.assertTrue(() -> {
            return this.server.locateQueue(getQueueName()) != null;
        });
        Wait.assertTrue(() -> {
            return this.server_2.locateQueue(getQueueName()) != null;
        });
        Queue locateQueue3 = this.server.locateQueue(getQueueName());
        Queue locateQueue4 = this.server_2.locateQueue(getQueueName());
        locateQueue3.getClass();
        Wait.assertEquals(200, locateQueue3::getMessageCount);
        locateQueue4.getClass();
        Wait.assertEquals(200, locateQueue4::getMessageCount);
    }

    private void replicaTest(boolean z, boolean z2, boolean z3, boolean z4, boolean z5, boolean z6, boolean z7) throws Exception {
        Queue locateQueue;
        String str = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
        this.server.setIdentity("targetServer");
        if (z5) {
            this.server.stop();
        } else {
            this.server.start();
        }
        this.server_2 = createServer(AMQP_PORT_2, false);
        this.server_2.setIdentity("server_2");
        this.server_2.getConfiguration().setName("thisone");
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration(str, "tcp://localhost:5672").setReconnectAttempts(-1).setRetryInterval(100);
        AMQPMirrorBrokerConnectionElement messageAcknowledgements = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(z2);
        retryInterval.addElement(messageAcknowledgements);
        this.server_2.getConfiguration().addAMQPConnection(retryInterval);
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server_2;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isStarted);
        this.server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
        Session createSession = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
        createProducer.setDeliveryMode(1);
        if (!z5) {
            Queue locateQueue2 = locateQueue(this.server, getQueueName());
            if (z3) {
                locateQueue2.getPagingStore().startPaging();
            }
        }
        if (z4) {
            this.server_2.locateQueue(getQueueName()).getPagingStore().startPaging();
        }
        Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ222214"}));
        for (int i = 0; i < 200; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(getText(z, i));
            createTextMessage.setIntProperty("i", i);
            createProducer.send(createTextMessage);
        }
        Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ222214"}));
        if (z5) {
            Thread.sleep(1000L);
            this.server.start();
            ActiveMQServer activeMQServer2 = this.server;
            activeMQServer2.getClass();
            Wait.assertTrue(activeMQServer2::isActive);
            locateQueue = locateQueue(this.server, getQueueName());
            if (z3) {
                locateQueue.getPagingStore().startPaging();
            }
        } else {
            locateQueue = locateQueue(this.server, getQueueName());
        }
        Queue locateQueue3 = this.server_2.locateQueue(messageAcknowledgements.getMirrorSNF());
        Assert.assertNotNull(locateQueue3);
        locateQueue3.getClass();
        Wait.assertEquals(0L, locateQueue3::getMessageCount);
        Queue queue = locateQueue;
        queue.getClass();
        Wait.assertEquals(200, queue::getMessageCount, 2000L);
        Queue locateQueue4 = locateQueue(this.server_2, getQueueName());
        Queue queue2 = locateQueue;
        queue2.getClass();
        Wait.assertEquals(200, queue2::getMessageCount);
        locateQueue4.getClass();
        Wait.assertEquals(200, locateQueue4::getMessageCount);
        if (z7) {
            this.server_2.stopBrokerConnection(str);
            Thread.sleep(1000L);
            this.server_2.startBrokerConnection(str);
        }
        if (z3) {
            assertTrue(locateQueue.getPagingStore().isPaging());
        }
        if (z2) {
            consumeMessages(z, 0, (200 / 2) - 1, AMQP_PORT_2, false);
            Queue queue3 = locateQueue;
            queue3.getClass();
            Wait.assertEquals(200 / 2, queue3::getMessageCount);
            consumeMessages(z, 200 / 2, 200 - 1, 5672, true);
            consumeMessages(z, 200 / 2, 200 - 1, AMQP_PORT_2, false);
            if (z) {
                validateNoFilesOnLargeDir(this.server.getConfiguration().getLargeMessagesDirectory(), 0);
            }
        } else {
            consumeMessages(z, 0, 200 - 1, AMQP_PORT_2, true);
            consumeMessages(z, 0, 200 - 1, 5672, true);
            if (z) {
                validateNoFilesOnLargeDir(this.server.getConfiguration().getLargeMessagesDirectory(), 0);
                validateNoFilesOnLargeDir(this.server_2.getConfiguration().getLargeMessagesDirectory(), 0);
            }
        }
        if (z6) {
            this.server.stop();
            Thread.sleep(1000L);
            this.server.start();
            ActiveMQServer activeMQServer3 = this.server;
            activeMQServer3.getClass();
            Wait.assertTrue(activeMQServer3::isActive);
            consumeMessages(z, 0, -1, AMQP_PORT_2, true);
            consumeMessages(z, 0, -1, 5672, true);
            for (int i2 = 0; i2 < 200; i2++) {
                TextMessage createTextMessage2 = createSession.createTextMessage(getText(z, i2));
                createTextMessage2.setIntProperty("i", i2);
                createProducer.send(createTextMessage2);
            }
            consumeMessages(z, 0, 200 - 1, 5672, true);
            consumeMessages(z, 0, 200 - 1, AMQP_PORT_2, true);
        }
    }

    @Test
    public void testDualStandardRestartBrokerConnection() throws Exception {
        dualReplica(false, false, false, true);
    }

    @Test
    public void testDualStandard() throws Exception {
        dualReplica(false, false, false, false);
    }

    @Test
    public void testDualRegularPagedTargets() throws Exception {
        dualReplica(false, false, true, false);
    }

    @Test
    public void testDualRegularPagedEverything() throws Exception {
        dualReplica(false, true, true, false);
    }

    @Test
    public void testDualRegularLarge() throws Exception {
        dualReplica(true, false, false, false);
    }

    public Queue locateQueue(ActiveMQServer activeMQServer, String str) throws Exception {
        Wait.waitFor(() -> {
            return activeMQServer.locateQueue(str) != null;
        });
        return activeMQServer.locateQueue(str);
    }

    private void dualReplica(boolean z, boolean z2, boolean z3, boolean z4) throws Exception {
        this.server.setIdentity("server_1");
        this.server.start();
        ActiveMQServer createServer = createServer(AMQP_PORT_3, false);
        createServer.setIdentity("server_3");
        createServer.start();
        createServer.getClass();
        Wait.assertTrue(createServer::isStarted);
        CFUtil.createConnectionFactory("amqp", "tcp://localhost:5674").createConnection().close();
        this.server_2 = createServer(AMQP_PORT_2, false);
        String str = "brokerConnection1:" + UUIDGenerator.getInstance().generateStringUUID();
        String str2 = "brokerConnection2:" + UUIDGenerator.getInstance().generateStringUUID();
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(str, "tcp://localhost:5672");
        AMQPMirrorBrokerConnectionElement type = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR);
        aMQPBrokerConnectConfiguration.addElement(type);
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(str2, "tcp://localhost:5674");
        AMQPMirrorBrokerConnectionElement type2 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR);
        aMQPBrokerConnectConfiguration2.addElement(type2);
        this.server_2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
        this.server_2.start();
        ActiveMQServer activeMQServer = this.server_2;
        activeMQServer.getClass();
        Wait.assertTrue(activeMQServer::isStarted);
        Session createSession = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
        createProducer.setDeliveryMode(2);
        Queue locateQueue = locateQueue(this.server_2, getQueueName());
        Queue locateQueue2 = locateQueue(this.server, getQueueName());
        Queue locateQueue3 = locateQueue(createServer, getQueueName());
        if (z2) {
            locateQueue.getPagingStore().startPaging();
        }
        if (z3) {
            locateQueue2.getPagingStore().startPaging();
            locateQueue3.getPagingStore().startPaging();
        }
        for (int i = 0; i < 200; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(getText(z, i));
            createTextMessage.setIntProperty("i", i);
            createProducer.send(createTextMessage);
            if (i == 200 / 2 && z4) {
                locateQueue.getClass();
                Wait.assertEquals((200 / 2) + 1, locateQueue::getMessageCount);
                locateQueue3.getClass();
                Wait.assertEquals((200 / 2) + 1, locateQueue3::getMessageCount);
                locateQueue2.getClass();
                Wait.assertEquals((200 / 2) + 1, locateQueue2::getMessageCount);
                this.server_2.stopBrokerConnection(str);
                this.server_2.stopBrokerConnection(str2);
                Thread.sleep(1000L);
                this.server_2.startBrokerConnection(str);
                this.server_2.startBrokerConnection(str2);
            }
        }
        locateQueue.getClass();
        Wait.assertEquals(200, locateQueue::getMessageCount);
        locateQueue3.getClass();
        Wait.assertEquals(200, locateQueue3::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(200, locateQueue2::getMessageCount);
        Queue locateQueue4 = this.server_2.locateQueue(type.getMirrorSNF());
        PagingStore pagingStore = this.server_2.locateQueue(type2.getMirrorSNF()).getPagingStore();
        pagingStore.getClass();
        Wait.assertEquals(0L, pagingStore::getAddressSize, 1000L, 100L);
        PagingStore pagingStore2 = locateQueue4.getPagingStore();
        pagingStore2.getClass();
        Wait.assertEquals(0L, pagingStore2::getAddressSize, 1000L, 100L);
        if (z3) {
            Assert.assertTrue(locateQueue2.getPagingStore().isPaging());
            Assert.assertTrue(locateQueue3.getPagingStore().isPaging());
        }
        if (z2) {
            Assert.assertTrue(locateQueue.getPagingStore().isPaging());
        }
        consumeMessages(z, 0, (200 / 2) - 1, AMQP_PORT_2, false);
        locateQueue2.getClass();
        Wait.assertEquals(200 / 2, locateQueue2::getMessageCount);
        locateQueue.getClass();
        Wait.assertEquals(200 / 2, locateQueue::getMessageCount);
        locateQueue3.getClass();
        Wait.assertEquals(200 / 2, locateQueue3::getMessageCount);
        locateQueue2.getClass();
        Wait.assertEquals(200 / 2, locateQueue2::getMessageCount);
        consumeMessages(z, 200 / 2, 200 - 1, 5672, true);
        consumeMessages(z, 200 / 2, 200 - 1, AMQP_PORT_3, true);
        consumeMessages(z, 200 / 2, 200 - 1, AMQP_PORT_2, true);
        validateNoFilesOnLargeDir(this.server.getConfiguration().getLargeMessagesDirectory(), 0);
        validateNoFilesOnLargeDir(createServer.getConfiguration().getLargeMessagesDirectory(), 0);
        validateNoFilesOnLargeDir(this.server_2.getConfiguration().getLargeMessagesDirectory(), 0);
    }

    private void printMessages(String str, Queue queue) {
        System.out.println("*******************************************************************************************************************************");
        System.out.println(str);
        System.out.println();
        LinkedListIterator browserIterator = queue.browserIterator();
        while (browserIterator.hasNext()) {
            System.out.println("message " + ((MessageReference) browserIterator.next()).getMessage());
        }
        browserIterator.close();
        System.out.println("*******************************************************************************************************************************");
    }

    private void consumeMessages(boolean z, int i, int i2, int i3, boolean z2) throws JMSException {
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + i3).createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createConnection.start();
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(getQueueName()));
        for (int i4 = i; i4 <= i2; i4++) {
            TextMessage receive = createConsumer.receive(3000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(i4, receive.getIntProperty("i"));
            if (receive instanceof TextMessage) {
                Assert.assertEquals(getText(z, i4), receive.getText());
            }
        }
        if (z2) {
            Assert.assertNull(createConsumer.receiveNoWait());
        }
        createConnection.close();
    }
}
