package org.apache.activemq.artemis.tests.integration.client;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.cli.commands.tools.journal.CompactJournal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.integration.persistence.XmlImportExportTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.class */
public class InfiniteRedeliveryTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    String protocol;
    boolean useCLI;
    TestableServer liveServer;
    TestableServer backupServer;
    Configuration backupConfig;
    Configuration liveConfig;

    @Parameterized.Parameters(name = "protocol={0}, useCLI={1}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{"CORE", true}, new Object[]{"AMQP", false}, new Object[]{"OPENWIRE", false});
    }

    public InfiniteRedeliveryTest(String str, boolean z) {
        this.protocol = str;
        this.useCLI = z;
    }

    protected TestableServer createTestableServer(Configuration configuration, NodeManager nodeManager) throws Exception {
        return new SameProcessActiveMQServer(createInVMFailoverServer(true, configuration, nodeManager, (configuration.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration) || (configuration.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration) ? 2 : 1));
    }

    protected void createReplicatedConfigs() throws Exception {
        TransportConfiguration nettyConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
        TransportConfiguration nettyConnector2 = TransportConfigurationUtils.getNettyConnector(false, 0);
        TransportConfiguration nettyAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
        this.backupConfig = createDefaultConfig(0, true);
        this.liveConfig = createDefaultConfig(0, true);
        configureReplicationPair(nettyConnector2, nettyAcceptor, nettyConnector);
        this.backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
        this.backupServer = createTestableServer(this.backupConfig, new InVMNodeManager(true, this.backupConfig.getJournalLocation()));
        this.liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(TransportConfigurationUtils.getNettyAcceptor(true, 0));
        this.liveServer = createTestableServer(this.liveConfig, new InVMNodeManager(false, this.liveConfig.getJournalLocation()));
    }

    protected void configureReplicationPair(TransportConfiguration transportConfiguration, TransportConfiguration transportConfiguration2, TransportConfiguration transportConfiguration3) {
        ReplicatedBackupUtils.configureReplicationPair(this.backupConfig, transportConfiguration, transportConfiguration2, this.liveConfig, transportConfiguration3, null);
        this.backupConfig.getHAPolicyConfiguration().setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true);
        this.backupConfig.getHAPolicyConfiguration().setRestartBackup(false);
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
    }

    protected void startServer(boolean z) throws Exception {
        createReplicatedConfigs();
        Configuration configuration = this.liveServer.getServer().getConfiguration();
        configuration.getAddressSettings().clear();
        if (z) {
            configuration.getAddressSettings().put("#", new AddressSettings().setMaxDeliveryAttempts(Integer.MAX_VALUE).setRedeliveryDelay(1L));
        } else {
            configuration.getAddressSettings().put("#", new AddressSettings().setMaxDeliveryAttempts(Integer.MAX_VALUE).setRedeliveryDelay(0L));
        }
        this.liveServer.start();
        this.backupServer.start();
        ActiveMQServer server = this.liveServer.getServer();
        Objects.requireNonNull(server);
        Wait.waitFor(server::isReplicaSync);
    }

    @Test
    public void testInifinteRedeliveryWithScheduling() throws Exception {
        testInifinteRedeliveryWithScheduling(true);
    }

    @Test
    public void testInifinteRedeliveryWithoutScheduling() throws Exception {
        testInifinteRedeliveryWithScheduling(false);
    }

    public void testInifinteRedeliveryWithScheduling(boolean z) throws Exception {
        startServer(z);
        this.liveServer.getServer().addAddressInfo(new AddressInfo(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
        this.liveServer.getServer().createQueue(new QueueConfiguration(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setDurable(true));
        Connection createConnection = (this.protocol.toUpperCase().equals("OPENWIRE") ? CFUtil.createConnectionFactory(this.protocol, "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=100&jms.redeliveryPolicy.redeliveryDelay=0") : CFUtil.createConnectionFactory(this.protocol, "tcp://localhost:61616")).createConnection();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
        Assert.assertNotNull(createQueue);
        createSession.createProducer(createQueue).send(createSession.createTextMessage("hello"));
        createSession.commit();
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        createConnection.start();
        for (int i = 0; i < 100; i++) {
            Assert.assertNotNull(createConsumer.receive(10000L));
            createSession.rollback();
        }
        createConnection.close();
        if (!this.useCLI) {
            this.liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(XmlImportExportTest.CONSUMER_TIMEOUT);
            this.backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(XmlImportExportTest.CONSUMER_TIMEOUT);
        }
        this.liveServer.stop();
        this.backupServer.stop();
        if (this.useCLI) {
            CompactJournal.compactJournals(this.backupServer.getServer().getConfiguration());
            CompactJournal.compactJournals(this.liveServer.getServer().getConfiguration());
        }
        HashMap countJournal = countJournal(this.liveServer.getServer().getConfiguration());
        countJournal.forEach((num, atomicInteger) -> {
            logger.debug("{}={}", num, atomicInteger);
        });
        countJournal.forEach((num2, atomicInteger2) -> {
            Assert.assertTrue("Record type " + num2 + " has a lot of records:" + atomicInteger2, atomicInteger2.intValue() < 20);
        });
        HashMap countJournal2 = countJournal(this.backupServer.getServer().getConfiguration());
        Assert.assertTrue(countJournal2.size() > 0);
        countJournal2.forEach((num3, atomicInteger3) -> {
            logger.debug("On Backup:{}={}", num3, atomicInteger3);
        });
        countJournal2.forEach((num4, atomicInteger4) -> {
            Assert.assertTrue("Backup Record type " + num4 + " has a lot of records:" + atomicInteger4, atomicInteger4.intValue() < 10);
        });
    }
}
