package org.apache.activemq.artemis.tests.integration.replication;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.class */
public class SharedNothingReplicationTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Rule
    public TemporaryFolder brokersFolder = new TemporaryFolder();
    private SlowMessagePersister slowMessagePersister;
    ExecutorService sendMessageExecutor;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest$AddRecordLoaderCallback.class */
    abstract class AddRecordLoaderCallback implements LoaderCallback {
        AddRecordLoaderCallback() {
        }

        public void addPreparedTransaction(PreparedTransactionInfo preparedTransactionInfo) {
        }

        public void deleteRecord(long j) {
        }

        public void updateRecord(RecordInfo recordInfo) {
        }

        public void failedTransaction(long j, List<RecordInfo> list, List<RecordInfo> list2) {
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest$SlowMessagePersister.class */
    static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message> {
        private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
        static SlowMessagePersister theInstance;
        boolean used = false;
        private final CoreMessagePersister persister = CoreMessagePersister.getInstance();

        private SlowMessagePersister() {
        }

        static SlowMessagePersister _getInstance() {
            if (theInstance == null) {
                theInstance = new SlowMessagePersister();
            }
            return theInstance;
        }

        public byte getID() {
            return this.persister.getID();
        }

        public int getEncodeSize(Message message) {
            return this.persister.getEncodeSize(message);
        }

        public void encode(ActiveMQBuffer activeMQBuffer, Message message) {
            this.used = true;
            try {
                Long longProperty = message.getLongProperty("delay");
                if (longProperty == null || longProperty.longValue() <= 0) {
                    logger.debug("encode message {}, caller={}", Long.valueOf(message.getMessageID()), Thread.currentThread().getName());
                } else {
                    logger.debug("sleep {} ms before encode message {}, caller={}", new Object[]{Long.valueOf(longProperty.longValue()), Long.valueOf(message.getMessageID()), Thread.currentThread().getName()});
                    Thread.sleep(longProperty.longValue());
                }
            } catch (InterruptedException e) {
            }
            this.persister.encode(activeMQBuffer, message);
        }

        public Message decode(ActiveMQBuffer activeMQBuffer, Message message, CoreMessageObjectPools coreMessageObjectPools) {
            return this.persister.decode(activeMQBuffer, message, coreMessageObjectPools);
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.sendMessageExecutor = Executors.newSingleThreadExecutor();
        CoreMessagePersister.registerPersister(SlowMessagePersister._getInstance());
    }

    @After
    public void tearDown() throws Exception {
        CoreMessagePersister.resetPersister();
        this.sendMessageExecutor.shutdownNow();
        super.tearDown();
    }

    @Test
    public void testReplicateFromSlowLive() throws Exception {
        Configuration createLiveConfiguration = createLiveConfiguration();
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration));
        addServer.start();
        Objects.requireNonNull(addServer);
        Wait.waitFor(addServer::isStarted);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ServerLocator newLocator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
        newLocator.setCallTimeout(60000L);
        newLocator.setConnectionTTL(60000L);
        newLocator.addClusterTopologyListener(new ClusterTopologyListener() { // from class: org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationTest.1
            public void nodeUP(TopologyMember topologyMember, boolean z) {
                SharedNothingReplicationTest.logger.debug("nodeUP fired last={}, live={}, backup={}", new Object[]{Boolean.valueOf(z), topologyMember.getLive(), topologyMember.getBackup()});
                if (topologyMember.getBackup() != null) {
                    countDownLatch.countDown();
                }
            }

            public void nodeDown(long j, String str) {
            }
        });
        ClientSessionFactory createSessionFactory = newLocator.createSessionFactory();
        ClientSession createSession = createSessionFactory.createSession();
        createSession.createQueue(new QueueConfiguration("slow").setRoutingType(RoutingType.ANYCAST));
        createSession.close();
        int i = 0;
        CountDownLatch countDownLatch2 = new CountDownLatch(50);
        while (i < 5) {
            this.sendMessageExecutor.execute(() -> {
                try {
                    ClientSession createSession2 = createSessionFactory.createSession(true, true);
                    ClientProducer createProducer = createSession2.createProducer("slow");
                    ClientMessage createMessage = createSession2.createMessage(true);
                    createMessage.putLongProperty("delay", 500L);
                    logger.debug("try to send a message before replicated");
                    createProducer.send(createMessage);
                    logger.debug("send message done");
                    createProducer.close();
                    createSession2.close();
                    countDownLatch2.countDown();
                } catch (ActiveMQException e) {
                    logger.error("send message", e);
                }
            });
            i++;
        }
        Configuration createBackupConfiguration = createBackupConfiguration();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration));
        addServer2.start();
        Wait.waitFor(() -> {
            return addServer2.isStarted();
        });
        Assert.assertTrue("can not replicate in 30 seconds", countDownLatch.await(30L, TimeUnit.SECONDS));
        while (i < 50) {
            this.sendMessageExecutor.execute(() -> {
                try {
                    ClientSession createSession2 = createSessionFactory.createSession(true, true);
                    ClientProducer createProducer = createSession2.createProducer("slow");
                    ClientMessage createMessage = createSession2.createMessage(true);
                    createMessage.putLongProperty("delay", 0L);
                    logger.debug("try to send a message after replicated");
                    createProducer.send(createMessage);
                    logger.debug("send message done");
                    createProducer.close();
                    createSession2.close();
                    countDownLatch2.countDown();
                } catch (ActiveMQException e) {
                    logger.error("send message", e);
                }
            });
            i++;
        }
        Assert.assertTrue("all message sent", countDownLatch2.await(30L, TimeUnit.SECONDS));
        createSessionFactory.close();
        newLocator.close();
        addServer2.stop(true);
        addServer.stop(true);
        this.brokersFolder.getRoot().toPath().resolve("live").resolve("data").resolve("journal").toFile();
        MappedSequentialFileFactory mappedSequentialFileFactory = new MappedSequentialFileFactory(createLiveConfiguration.getJournalLocation(), createLiveConfiguration.getJournalFileSize(), false, createLiveConfiguration.getJournalBufferSize_NIO(), createLiveConfiguration.getJournalBufferTimeout_NIO(), (IOCriticalErrorListener) null);
        JournalImpl journalImpl = new JournalImpl(createLiveConfiguration.getJournalFileSize(), createLiveConfiguration.getJournalMinFiles(), createLiveConfiguration.getJournalPoolFiles(), createLiveConfiguration.getJournalCompactMinFiles(), createLiveConfiguration.getJournalCompactPercentage(), mappedSequentialFileFactory, "activemq-data", "amq", mappedSequentialFileFactory.getMaxIO());
        journalImpl.start();
        final AtomicInteger atomicInteger = new AtomicInteger();
        journalImpl.load(new AddRecordLoaderCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationTest.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void addRecord(RecordInfo recordInfo) {
                if (recordInfo.userRecordType != 45) {
                }
                SharedNothingReplicationTest.logger.debug("got live message {}", Long.valueOf(recordInfo.id));
                atomicInteger.incrementAndGet();
            }
        });
        this.brokersFolder.getRoot().toPath().resolve("backup").resolve("data").resolve("journal").toFile();
        MappedSequentialFileFactory mappedSequentialFileFactory2 = new MappedSequentialFileFactory(createBackupConfiguration.getJournalLocation(), createBackupConfiguration.getJournalFileSize(), false, createBackupConfiguration.getJournalBufferSize_NIO(), createBackupConfiguration.getJournalBufferTimeout_NIO(), (IOCriticalErrorListener) null);
        JournalImpl journalImpl2 = new JournalImpl(createBackupConfiguration.getJournalFileSize(), createBackupConfiguration.getJournalMinFiles(), createBackupConfiguration.getJournalPoolFiles(), createBackupConfiguration.getJournalCompactMinFiles(), createBackupConfiguration.getJournalCompactPercentage(), mappedSequentialFileFactory2, "activemq-data", "amq", mappedSequentialFileFactory2.getMaxIO());
        journalImpl2.start();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        journalImpl2.load(new AddRecordLoaderCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationTest.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void addRecord(RecordInfo recordInfo) {
                if (recordInfo.userRecordType != 45) {
                }
                SharedNothingReplicationTest.logger.debug("replicated message {}", Long.valueOf(recordInfo.id));
                atomicInteger2.incrementAndGet();
            }
        });
        logger.debug("expected {} messages, live={}, backup={}", new Object[]{50, Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get())});
        Assert.assertEquals("Live lost journal record", 50L, atomicInteger.get());
        Assert.assertEquals("Backup did not replicated all journal", 50L, atomicInteger2.get());
        Assert.assertTrue("The test is not valid, slow persister stopped being used", SlowMessagePersister._getInstance().used);
    }

    protected HAPolicyConfiguration createReplicationLiveConfiguration() {
        return new ReplicatedPolicyConfiguration().setVoteOnReplicationFailure(false).setCheckForLiveServer(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Configuration createLiveConfiguration() throws Exception {
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.setName("localhost::live");
        configurationImpl.setBrokerInstance(this.brokersFolder.newFolder("live"));
        configurationImpl.addAcceptorConfiguration("live", "tcp://localhost:61616");
        configurationImpl.addConnectorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.addConnectorConfiguration("live", "tcp://localhost:61616");
        configurationImpl.setClusterUser("mycluster");
        configurationImpl.setClusterPassword("mypassword");
        configurationImpl.setHAPolicyConfiguration(createReplicationLiveConfiguration());
        ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration();
        clusterConnectionConfiguration.setStaticConnectors(new ArrayList()).getStaticConnectors().add("backup");
        clusterConnectionConfiguration.setName("cluster");
        clusterConnectionConfiguration.setConnectorName("live");
        configurationImpl.addClusterConfiguration(clusterConnectionConfiguration);
        configurationImpl.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(524288).setConnectionTTLOverride(60000L);
        return configurationImpl;
    }

    protected HAPolicyConfiguration createReplicationBackupConfiguration() {
        return new ReplicaPolicyConfiguration().setClusterName("cluster");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Configuration createBackupConfiguration() throws Exception {
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.setName("localhost::backup");
        configurationImpl.setBrokerInstance(this.brokersFolder.newFolder("backup"));
        configurationImpl.setHAPolicyConfiguration(createReplicationBackupConfiguration());
        configurationImpl.addAcceptorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.addConnectorConfiguration("live", "tcp://localhost:61616");
        configurationImpl.addConnectorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.setClusterUser("mycluster");
        configurationImpl.setClusterPassword("mypassword");
        ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration();
        clusterConnectionConfiguration.setStaticConnectors(new ArrayList()).getStaticConnectors().add("live");
        clusterConnectionConfiguration.setName("cluster");
        clusterConnectionConfiguration.setConnectorName("backup");
        configurationImpl.addClusterConfiguration(clusterConnectionConfiguration);
        configurationImpl.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(524288).setConnectionTTLOverride(60000L);
        return configurationImpl;
    }
}
