package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.transport.netty.NettyTransportOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.class */
public class AMQPMirrorFastACKTest extends AmqpClientTestSupport {
    private static final String SLOW_SERVER_NAME = "slow";
    private static final int SLOW_SERVER_PORT = 5673;
    private static final int ENCODE_DELAY = 10;
    private ActiveMQServer slowServer;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest$SlowMessagePersister.class */
    static class SlowMessagePersister<T> implements Persister<T> {
        private final Persister<T> delegate;

        SlowMessagePersister(Persister<T> persister) {
            this.delegate = persister;
        }

        public byte getID() {
            return this.delegate.getID();
        }

        public int getEncodeSize(T t) {
            return this.delegate.getEncodeSize(t);
        }

        public void encode(ActiveMQBuffer activeMQBuffer, T t) {
            try {
                Thread.sleep(10L);
            } catch (Exception e) {
            }
            this.delegate.encode(activeMQBuffer, t);
        }

        public T decode(ActiveMQBuffer activeMQBuffer, T t, CoreMessageObjectPools coreMessageObjectPools) {
            return (T) this.delegate.decode(activeMQBuffer, t, coreMessageObjectPools);
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE,OPENWIRE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.slowServer = createSlowServer();
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @AfterEach
    public void tearDown() throws Exception {
        try {
            if (this.slowServer != null) {
                this.slowServer.stop();
            }
        } finally {
            super.tearDown();
        }
    }

    @Test
    public void testMirrorTargetFastACKCore() throws Exception {
        doTestMirrorTargetFastACK("CORE");
    }

    @Test
    public void testMirrorTargetFastACKAMQP() throws Exception {
        doTestMirrorTargetFastACK("AMQP");
    }

    private void doTestMirrorTargetFastACK(String str) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow = configureMirrorTowardsSlow(this.server);
        this.slowServer.start();
        this.server.start();
        waitForServerToStart(this.slowServer);
        waitForServerToStart(this.server);
        this.server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
        Connection createConnection = CFUtil.createConnectionFactory(str, "tcp://localhost:5672").createConnection();
        try {
            Session createSession = createConnection.createSession(false, 2);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(getQueueName()));
            Session createSession2 = createConnection.createSession(false, 2);
            MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue(getQueueName()));
            createConnection.start();
            createConsumer.setMessageListener(message -> {
                try {
                    message.acknowledge();
                    countDownLatch.countDown();
                } catch (Exception e) {
                }
            });
            createProducer.setDeliveryMode(2);
            for (int i = 0; i < 10; i++) {
                createProducer.send(createSession2.createTextMessage("i=" + i));
            }
            Assertions.assertTrue(countDownLatch.await(5000L, TimeUnit.MILLISECONDS));
            if (createConnection != null) {
                createConnection.close();
            }
            Queue locateQueue = this.server.locateQueue(configureMirrorTowardsSlow.getMirrorSNF());
            Queue locateQueue2 = this.slowServer.locateQueue(getQueueName());
            Wait.waitFor(() -> {
                return locateQueue.getMessageCount() == 0 && locateQueue.getMessagesAdded() > 10;
            });
            Wait.assertTrue("Expected mirrored target queue " + getQueueName() + " to be empty", () -> {
                return locateQueue2.getMessageCount() == 0 && locateQueue2.getMessagesAdded() == 10;
            });
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(NettyTransportOptions.DEFAULT_TCP_PORT, false);
    }

    private AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow(ActiveMQServer activeMQServer) {
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:5673").setReconnectAttempts(-1).setRetryInterval(100);
        AMQPMirrorBrokerConnectionElement durable = new AMQPMirrorBrokerConnectionElement().setDurable(true);
        retryInterval.addElement(durable);
        activeMQServer.getConfiguration().addAMQPConnection(retryInterval);
        return durable;
    }

    private ActiveMQServer createSlowServer() throws Exception {
        ActiveMQServerImpl activeMQServerImpl = new ActiveMQServerImpl(createBasicConfig(SLOW_SERVER_PORT), this.mBeanServer, new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())) { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPMirrorFastACKTest.1
            protected StorageManager createStorageManager() {
                return AMQPMirrorFastACKTest.this.createStorageManager(getConfiguration(), getCriticalAnalyzer(), this.executorFactory, this.scheduledPool, this.ioExecutorFactory, this.ioCriticalErrorListener);
            }
        };
        activeMQServerImpl.getConfiguration().setName(SLOW_SERVER_NAME);
        activeMQServerImpl.getConfiguration().getAcceptorConfigurations().clear();
        activeMQServerImpl.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(this.slowServer, SLOW_SERVER_PORT));
        activeMQServerImpl.getConfiguration().setJMXManagementEnabled(true);
        activeMQServerImpl.getConfiguration().setMessageExpiryScanPeriod(100L);
        configureAddressPolicy(activeMQServerImpl);
        configureBrokerSecurity(activeMQServerImpl);
        return activeMQServerImpl;
    }

    private StorageManager createStorageManager(Configuration configuration, CriticalAnalyzer criticalAnalyzer, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory2, IOCriticalErrorListener iOCriticalErrorListener) {
        return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledExecutorService, executorFactory2, iOCriticalErrorListener) { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPMirrorFastACKTest.2
            protected Journal createMessageJournal(Configuration configuration2, IOCriticalErrorListener iOCriticalErrorListener2, int i) {
                return new JournalImpl(this.ioExecutorFactory, i, configuration2.getJournalMinFiles(), configuration2.getJournalPoolFiles(), configuration2.getJournalCompactMinFiles(), configuration2.getJournalCompactPercentage(), configuration2.getJournalFileOpenTimeout(), this.journalFF, "activemq-data", "amq", this.journalFF.getMaxIO(), 0, iOCriticalErrorListener2, configuration2.getJournalMaxAtticFiles()) { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPMirrorFastACKTest.2.1
                    public void appendAddRecordTransactional(long j, long j2, byte b, Persister persister, Object obj) throws Exception {
                        super.appendAddRecordTransactional(j, j2, b, obj instanceof AMQPStandardMessage ? new SlowMessagePersister(persister) : persister, obj);
                    }
                };
            }
        };
    }
}
