package org.apache.activemq.artemis.tests.integration.replication;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedLockManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/RepeatStartBackupTest.class */
public class RepeatStartBackupTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private DistributedLockManagerConfiguration managerConfiguration;
    ActiveMQServer backupServer;
    ActiveMQServer server;

    File newTemporaryFolder(String str) {
        File file = new File(this.temporaryFolder, str);
        file.mkdirs();
        return file;
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        this.managerConfiguration = new DistributedLockManagerConfiguration(FileBasedLockManager.class.getName(), Collections.singletonMap("locks-folder", newTemporaryFolder("manager").toString()));
        int millis = (int) TimeUnit.SECONDS.toMillis(30L);
        this.server = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        this.server.setIdentity("PRIMARY");
        this.server.getConfiguration().setJournalFileSize(102400);
        this.server.start();
        this.server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST));
        this.server.createQueue(QueueConfiguration.of("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST));
        Configuration createBackupConfiguration = createBackupConfiguration();
        createBackupConfiguration.getHAPolicyConfiguration().setAllowFailBack(true);
        this.backupServer = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration));
        this.backupServer.setIdentity("BACKUP");
        this.backupServer.start();
        ActiveMQServer activeMQServer = this.backupServer;
        Objects.requireNonNull(activeMQServer);
        Wait.waitFor(activeMQServer::isStarted);
        Wait.assertTrue(() -> {
            return this.backupServer.isReplicaSync();
        }, millis);
    }

    @Test
    public void testLoopStart() throws Exception {
        AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler();
        try {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            Objects.requireNonNull(newFixedThreadPool);
            runAfter(newFixedThreadPool::shutdownNow);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            runAfter(() -> {
                atomicBoolean.set(false);
            });
            CountDownLatch countDownLatch = new CountDownLatch(1);
            newFixedThreadPool.execute(() -> {
                try {
                    try {
                        Connection createConnection = CFUtil.createConnectionFactory("core", SimpleManagementTest.LOCALHOST).createConnection();
                        try {
                            Session createSession = createConnection.createSession(false, 1);
                            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("t1"));
                            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("t1"));
                            createConnection.start();
                            while (atomicBoolean.get()) {
                                createProducer.send(createSession.createTextMessage("hello"));
                                Assertions.assertNotNull(createConsumer.receive(1000L));
                            }
                            if (createConnection != null) {
                                createConnection.close();
                            }
                            countDownLatch.countDown();
                        } catch (Throwable th) {
                            if (createConnection != null) {
                                try {
                                    createConnection.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        logger.warn(th3.getMessage(), th3);
                        atomicInteger.incrementAndGet();
                        countDownLatch.countDown();
                    }
                } catch (Throwable th4) {
                    countDownLatch.countDown();
                    throw th4;
                }
            });
            for (int i = 0; i < 5; i++) {
                logger.info("\n*******************************************************************************************************************************\ntest {}\n*******************************************************************************************************************************", Integer.valueOf(i));
                this.backupServer.stop();
                ActiveMQServer activeMQServer = this.backupServer;
                Objects.requireNonNull(activeMQServer);
                Wait.assertFalse(activeMQServer::isStarted);
                this.backupServer.start();
                ActiveMQServer activeMQServer2 = this.backupServer;
                Objects.requireNonNull(activeMQServer2);
                Wait.assertTrue(activeMQServer2::isStarted);
                if (i % 2 == 1) {
                    ActiveMQServer activeMQServer3 = this.backupServer;
                    Objects.requireNonNull(activeMQServer3);
                    Wait.assertTrue(activeMQServer3::isReplicaSync);
                }
                Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ229254"}));
                Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ229006"}));
                assertionLoggerHandler.clear();
            }
            atomicBoolean.set(false);
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assertions.assertEquals(0, atomicInteger.get());
            assertionLoggerHandler.close();
        } catch (Throwable th) {
            try {
                assertionLoggerHandler.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    protected HAPolicyConfiguration createReplicationLiveConfiguration() {
        ReplicationPrimaryPolicyConfiguration withDefault = ReplicationPrimaryPolicyConfiguration.withDefault();
        withDefault.setDistributedManagerConfiguration(this.managerConfiguration);
        return withDefault;
    }

    protected HAPolicyConfiguration createReplicationBackupConfiguration() {
        ReplicationBackupPolicyConfiguration withDefault = ReplicationBackupPolicyConfiguration.withDefault();
        withDefault.setDistributedManagerConfiguration(this.managerConfiguration);
        withDefault.setClusterName("cluster");
        return withDefault;
    }

    protected Configuration createLiveConfiguration() throws Exception {
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.setJournalType(JournalType.NIO);
        configurationImpl.setName("localhost::live");
        configurationImpl.setBrokerInstance(newTemporaryFolder("live"));
        configurationImpl.addAcceptorConfiguration("live", SimpleManagementTest.LOCALHOST);
        configurationImpl.addConnectorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.addConnectorConfiguration("live", SimpleManagementTest.LOCALHOST);
        configurationImpl.setClusterUser("mycluster");
        configurationImpl.setClusterPassword("mypassword");
        configurationImpl.setHAPolicyConfiguration(createReplicationLiveConfiguration());
        ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration();
        clusterConnectionConfiguration.setStaticConnectors(new ArrayList()).getStaticConnectors().add("backup");
        clusterConnectionConfiguration.setName("cluster");
        clusterConnectionConfiguration.setConnectorName("live");
        configurationImpl.addClusterConfiguration(clusterConnectionConfiguration);
        configurationImpl.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.NIO).setJournalFileSize(524288).setConnectionTTLOverride(AmqpConnection.DEFAULT_DRAIN_TIMEOUT);
        return configurationImpl;
    }

    protected Configuration createBackupConfiguration() throws Exception {
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.setName("localhost::backup");
        configurationImpl.setBrokerInstance(newTemporaryFolder("backup"));
        configurationImpl.setHAPolicyConfiguration(createReplicationBackupConfiguration());
        configurationImpl.addAcceptorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.addConnectorConfiguration("live", SimpleManagementTest.LOCALHOST);
        configurationImpl.addConnectorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.setClusterUser("mycluster");
        configurationImpl.setClusterPassword("mypassword");
        ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration();
        clusterConnectionConfiguration.setStaticConnectors(new ArrayList()).getStaticConnectors().add("live");
        clusterConnectionConfiguration.setName("cluster");
        clusterConnectionConfiguration.setConnectorName("backup");
        configurationImpl.addClusterConfiguration(clusterConnectionConfiguration);
        configurationImpl.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.NIO).setJournalFileSize(524288).setConnectionTTLOverride(AmqpConnection.DEFAULT_DRAIN_TIMEOUT);
        return configurationImpl;
    }
}
