package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpTransferTagGenerator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.class */
public class PagedMirrorTest extends ActiveMQTestBase {
    ActiveMQServer server1;
    ActiveMQServer server2;

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server1 = createServer(true, createDefaultConfig(0, true), AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, 10240L, -1, -1);
        this.server1.getConfiguration().getAcceptorConfigurations().clear();
        this.server1.getConfiguration().addAcceptorConfiguration("server", SimpleManagementTest.LOCALHOST);
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(false));
        this.server1.getConfiguration().addAMQPConnection(retryInterval);
        this.server2 = createServer(true, createDefaultConfig(1, true), AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, 10240L, -1, -1);
        this.server2.getConfiguration().getAcceptorConfigurations().clear();
        this.server2.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61617");
        AMQPBrokerConnectConfiguration retryInterval2 = new AMQPBrokerConnectConfiguration("other", SimpleManagementTest.LOCALHOST).setReconnectAttempts(-1).setRetryInterval(1000);
        retryInterval2.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(false));
        this.server2.getConfiguration().addAMQPConnection(retryInterval2);
        this.server1.start();
        this.server2.start();
    }

    @Test
    public void testPaged() throws Throwable {
        Wait.waitFor(() -> {
            return this.server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null;
        });
        Wait.waitFor(() -> {
            return this.server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null;
        });
        Queue locateQueue = this.server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
        Assertions.assertNotNull(locateQueue);
        Queue locateQueue2 = this.server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
        Assertions.assertNotNull(locateQueue2);
        File journalLocation = this.server2.getConfiguration().getJournalLocation();
        Assertions.assertTrue(journalLocation.exists() && journalLocation.isDirectory());
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("amqp", SimpleManagementTest.LOCALHOST);
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("amqp", SimpleManagementTest.LOCALHOST);
        ConnectionFactory createConnectionFactory3 = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617");
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < 1024; i++) {
            stringBuffer.append("*");
        }
        String stringBuffer2 = stringBuffer.toString();
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("someQueue"));
            for (int i2 = 0; i2 < 200; i2++) {
                TextMessage createTextMessage = createSession.createTextMessage(stringBuffer2);
                createTextMessage.setIntProperty("i", i2);
                createProducer.send(createTextMessage);
            }
            createSession.commit();
            if (createConnection != null) {
                createConnection.close();
            }
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(0L, locateQueue::getMessageCount);
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(0L, locateQueue2::getMessageCount);
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection2.createSession(false, 101);
                MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue("someQueue"));
                createConnection2.start();
                for (int i3 = 0; i3 < 200; i3++) {
                    TextMessage receive = createConsumer.receive(6000L);
                    if (receive.getIntProperty("i") == 77) {
                        receive.acknowledge();
                    }
                }
                Assertions.assertNull(createConsumer.receiveNoWait());
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                Objects.requireNonNull(locateQueue);
                Wait.assertEquals(0L, locateQueue::getMessageCount);
                Objects.requireNonNull(locateQueue2);
                Wait.assertEquals(0L, locateQueue2::getMessageCount);
                Queue locateQueue3 = this.server1.locateQueue("someQueue");
                Queue locateQueue4 = this.server1.locateQueue("someQueue");
                Objects.requireNonNull(locateQueue4);
                Wait.assertEquals(200 - 1, locateQueue4::getMessageCount, 5000L);
                Objects.requireNonNull(locateQueue3);
                Wait.assertEquals(200 - 1, locateQueue3::getMessageCount, 5000L);
                Wait.assertEquals(1, () -> {
                    return acksCount(journalLocation);
                }, 5000L, 1000L);
                HashSet hashSet = new HashSet();
                createConnection = createConnectionFactory3.createConnection();
                try {
                    Session createSession3 = createConnection.createSession(true, 0);
                    MessageConsumer createConsumer2 = createSession3.createConsumer(createSession3.createQueue("someQueue"));
                    createConnection.start();
                    for (int i4 = 0; i4 < 200 - 1; i4++) {
                        TextMessage receive2 = createConsumer2.receive(6000L);
                        Assertions.assertNotNull(receive2);
                        Assertions.assertNotEquals(77, receive2.getIntProperty("i"));
                        hashSet.add(Integer.valueOf(receive2.getIntProperty("i")));
                    }
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Assertions.assertEquals(200 - 1, hashSet.size());
                    for (int i5 = 0; i5 < 200; i5++) {
                        if (i5 != 77) {
                            Assertions.assertTrue(hashSet.contains(Integer.valueOf(i5)));
                        }
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }

    private int acksCount(File file) throws Exception {
        AtomicInteger atomicInteger = countJournal(file, 10485760, 2, 2).get(39);
        if (atomicInteger != null) {
            return atomicInteger.get();
        }
        return 0;
    }
}
