package org.apache.activemq.artemis.tests.integration.divert;

import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.class */
public class ReplicationWithDivertTest extends ActiveMQTestBase {
    public static final String JMS_SOURCE_QUEUE = "Queue";
    public static final String SOURCE_QUEUE = "Queue";
    public static final String JMS_TARGET_QUEUE = "DestQueue";
    public static final String TARGET_QUEUE = "DestQueue";
    private static ActiveMQServer backupServer;
    private static ActiveMQServer liveServer;
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
    ActiveMQConnection connection;
    Session session;
    Queue queue;
    Queue targetQueue;
    MessageProducer producer;
    Configuration backupConfig;
    Configuration liveConfig;
    public static int messageChunkCount = 0;
    static final ReusableLatch flagChunkEntered = new ReusableLatch(1);
    static final ReusableLatch flagChunkWait = new ReusableLatch(1);
    static final ReusableLatch flagSyncEntered = new ReusableLatch(1);
    static final ReusableLatch flagSyncWait = new ReusableLatch(1);

    @Before
    public void setUp() throws Exception {
        super.setUp();
        flagChunkEntered.setCount(1);
        flagChunkWait.setCount(1);
        flagSyncEntered.setCount(1);
        flagSyncWait.setCount(1);
        messageChunkCount = 0;
        TransportConfiguration nettyConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
        TransportConfiguration nettyAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
        TransportConfiguration nettyConnector2 = TransportConfigurationUtils.getNettyConnector(false, 0);
        TransportConfiguration nettyAcceptor2 = TransportConfigurationUtils.getNettyAcceptor(false, 0);
        this.backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true));
        this.backupConfig.addQueueConfiguration(new QueueConfiguration("Queue").setRoutingType(RoutingType.ANYCAST));
        this.backupConfig.addQueueConfiguration(new QueueConfiguration("DestQueue").setRoutingType(RoutingType.ANYCAST));
        DivertConfiguration routingName = new DivertConfiguration().setName("Test").setAddress("Queue").setForwardingAddress("DestQueue").setRoutingName("Test");
        this.liveConfig = createDefaultInVMConfig();
        this.liveConfig.addQueueConfiguration(new QueueConfiguration("Queue").setRoutingType(RoutingType.ANYCAST));
        this.liveConfig.addQueueConfiguration(new QueueConfiguration("DestQueue").setRoutingType(RoutingType.ANYCAST));
        this.liveConfig.addDivertConfiguration(routingName);
        this.backupConfig.addDivertConfiguration(routingName);
        ReplicatedBackupUtils.configureReplicationPair(this.backupConfig, nettyConnector2, nettyAcceptor2, this.liveConfig, nettyConnector, nettyAcceptor);
        liveServer = createServer(this.liveConfig);
        liveServer.start();
        startBackup();
        waitForServerToStart(liveServer);
        Assert.assertEquals(10000L, this.factory.getMinLargeMessageSize());
        Assert.assertEquals(10000L, this.factory.getProducerWindowSize());
        Assert.assertEquals(100L, this.factory.getRetryInterval());
        Assert.assertEquals(-1L, this.factory.getReconnectAttempts());
        Assert.assertTrue(this.factory.isHA());
        this.connection = this.factory.createConnection();
        this.session = this.connection.createSession(true, 0);
        this.queue = this.session.createQueue("Queue");
        this.targetQueue = this.session.createQueue("DestQueue");
        this.producer = this.session.createProducer(this.queue);
    }

    private void startBackup() throws Exception {
        backupServer = createServer(this.backupConfig);
        backupServer.start();
        waitForServerToStart(backupServer);
    }

    @After
    public void stopServers() throws Exception {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (Exception e) {
            }
        }
        if (backupServer != null) {
            backupServer.stop();
            backupServer = null;
        }
        if (liveServer != null) {
            liveServer.stop();
            liveServer = null;
        }
        liveServer = null;
        backupServer = null;
    }

    @Test
    public void testSendLargeMessage() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.setFailoverListener(new FailoverEventListener() { // from class: org.apache.activemq.artemis.tests.integration.divert.ReplicationWithDivertTest.1
            public void failoverEvent(FailoverEventType failoverEventType) {
                countDownLatch.countDown();
            }
        });
        final MapMessage createLargeMessage = createLargeMessage();
        Thread thread = new Thread() { // from class: org.apache.activemq.artemis.tests.integration.divert.ReplicationWithDivertTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                for (int i = 0; i < 5; i++) {
                    try {
                        ReplicationWithDivertTest.this.producer.send(createLargeMessage);
                        ReplicationWithDivertTest.this.session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                        return;
                    }
                }
            }
        };
        thread.start();
        thread.join(10000L);
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        this.connection.start();
        for (int i = 0; i < 5; i++) {
            Assert.assertNotNull(createConsumer.receive(5000L));
            for (int i2 = 0; i2 < 10; i2++) {
                Assert.assertEquals(204800L, r0.getBytes(AutoCreateJmsDestinationTest.QUEUE_NAME + i2).length);
            }
            this.session.commit();
        }
        createConsumer.close();
        Assert.assertFalse(thread.isAlive());
        liveServer.fail(true);
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        MessageConsumer createConsumer2 = this.session.createConsumer(this.targetQueue);
        this.connection.start();
        for (int i3 = 0; i3 < 5; i3++) {
            Assert.assertNotNull(createConsumer2.receive(5000L));
            for (int i4 = 0; i4 < 10; i4++) {
                Assert.assertEquals(204800L, r0.getBytes(AutoCreateJmsDestinationTest.QUEUE_NAME + i4).length);
            }
            this.session.commit();
        }
        createConsumer2.close();
    }

    private MapMessage createLargeMessage() throws JMSException {
        MapMessage createMapMessage = this.session.createMapMessage();
        for (int i = 0; i < 10; i++) {
            createMapMessage.setBytes(AutoCreateJmsDestinationTest.QUEUE_NAME + i, new byte[204800]);
        }
        return createMapMessage;
    }
}
