package org.apache.activemq.artemis.tests.integration.replication;

import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFile;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.class */
public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
    ExecutorService sendMessageExecutor;

    @Rule
    public TemporaryFolder brokersFolder = new TemporaryFolder();
    private static final Logger logger = Logger.getLogger(SharedNothingReplicationFlowControlTest.class);
    static AtomicInteger openCount = new AtomicInteger(0);
    static AtomicInteger closeCount = new AtomicInteger(0);

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest$AddRecordLoaderCallback.class */
    abstract class AddRecordLoaderCallback implements LoaderCallback {
        AddRecordLoaderCallback() {
        }

        public void addPreparedTransaction(PreparedTransactionInfo preparedTransactionInfo) {
        }

        public void deleteRecord(long j) {
        }

        public void updateRecord(RecordInfo recordInfo) {
        }

        public void failedTransaction(long j, List<RecordInfo> list, List<RecordInfo> list2) {
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest$PageStoreFactoryTestable.class */
    private static class PageStoreFactoryTestable extends PagingStoreFactoryNIO {
        PageStoreFactoryTestable(StorageManager storageManager, File file, long j, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, boolean z, IOCriticalErrorListener iOCriticalErrorListener) {
            super(storageManager, file, j, scheduledExecutorService, executorFactory, z, iOCriticalErrorListener);
        }

        PageStoreFactoryTestable(PagingStoreFactoryNIO pagingStoreFactoryNIO) {
            this(pagingStoreFactoryNIO.getStorageManager(), pagingStoreFactoryNIO.getDirectory(), pagingStoreFactoryNIO.getSyncTimeout(), pagingStoreFactoryNIO.getScheduledExecutor(), pagingStoreFactoryNIO.getExecutorFactory(), pagingStoreFactoryNIO.isSyncNonTransactional(), pagingStoreFactoryNIO.getCritialErrorListener());
        }

        protected SequentialFileFactory newFileFactory(String str) {
            return new TestableNIOFactory(new File(getDirectory(), str), false, getCritialErrorListener(), 1);
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest$TestInterceptor.class */
    public static final class TestInterceptor implements Interceptor {
        private long sleepTime;

        public TestInterceptor(long j) {
            this.sleepTime = j;
        }

        public void setSleepTime(long j) {
            this.sleepTime = j;
        }

        public boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() < currentTimeMillis + this.sleepTime) {
                    Thread.sleep(100L);
                }
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return true;
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest$TestableNIOFactory.class */
    public static class TestableNIOFactory extends NIOSequentialFileFactory {
        public TestableNIOFactory(File file, boolean z, IOCriticalErrorListener iOCriticalErrorListener, int i) {
            super(file, z, iOCriticalErrorListener, i);
        }

        public SequentialFile createSequentialFile(String str) {
            return new TestableSequentialFile(this, this.journalDir, str, this.maxIO, this.writeExecutor);
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest$TestableSequentialFile.class */
    public static class TestableSequentialFile extends NIOSequentialFile {
        public TestableSequentialFile(SequentialFileFactory sequentialFileFactory, File file, String str, int i, Executor executor) {
            super(sequentialFileFactory, file, str, i, executor);
        }

        public void open(int i, boolean z) throws IOException {
            super.open(i, z);
            SharedNothingReplicationFlowControlTest.openCount.incrementAndGet();
        }

        public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
            super.close();
            SharedNothingReplicationFlowControlTest.closeCount.incrementAndGet();
        }
    }

    @Before
    public void setupExecutor() {
        this.sendMessageExecutor = Executors.newCachedThreadPool();
    }

    @After
    public void teardownExecutor() {
        this.sendMessageExecutor.shutdownNow();
    }

    @Test
    public void testReplicationIfFlowControlled() throws Exception {
        Configuration createLiveConfiguration = createLiveConfiguration();
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration));
        addServer.start();
        Wait.waitFor(() -> {
            return addServer.isStarted();
        });
        ServerLocator newLocator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
        newLocator.setCallTimeout(60000L);
        newLocator.setConnectionTTL(60000L);
        ClientSessionFactory createSessionFactory = newLocator.createSessionFactory();
        ClientSession createSession = createSessionFactory.createSession();
        createSession.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true);
        createSession.close();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        Configuration createBackupConfiguration = createBackupConfiguration();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration));
        addServer2.start();
        Wait.waitFor(() -> {
            return addServer2.isStarted();
        });
        Objects.requireNonNull(addServer2);
        Wait.waitFor(addServer2::isReplicaSync, 30000L);
        TestInterceptor testInterceptor = new TestInterceptor(30000L);
        addServer2.getClusterManager().getClusterController().addIncomingInterceptorForReplication(testInterceptor);
        byte[] bArr = new byte[32768];
        for (int i = 0; i < 100; i++) {
            this.sendMessageExecutor.execute(() -> {
                try {
                    ClientSession createSession2 = createSessionFactory.createSession(true, true);
                    ClientProducer createProducer = createSession2.createProducer("flowcontrol");
                    ClientMessage createMessage = createSession2.createMessage(true);
                    createMessage.writeBodyBufferBytes(bArr);
                    logger.infof("try to send a message after replicated", new Object[0]);
                    createProducer.send(createMessage);
                    logger.info("send message done");
                    createProducer.close();
                    createSession2.close();
                    countDownLatch.countDown();
                } catch (ActiveMQException e) {
                    logger.error("send message", e);
                }
            });
        }
        Assert.assertTrue("all message sent", countDownLatch.await(30L, TimeUnit.SECONDS));
        testInterceptor.setSleepTime(0L);
        createSessionFactory.close();
        newLocator.close();
        Objects.requireNonNull(addServer);
        Assert.assertTrue("Waiting for replica sync timeout", Wait.waitFor(addServer::isReplicaSync, 30000L));
        addServer2.stop(true);
        addServer.stop(true);
        this.brokersFolder.getRoot().toPath().resolve("live").resolve("data").resolve("journal").toFile();
        MappedSequentialFileFactory mappedSequentialFileFactory = new MappedSequentialFileFactory(createLiveConfiguration.getJournalLocation(), createLiveConfiguration.getJournalFileSize(), false, createLiveConfiguration.getJournalBufferSize_NIO(), createLiveConfiguration.getJournalBufferTimeout_NIO(), (IOCriticalErrorListener) null);
        JournalImpl journalImpl = new JournalImpl(createLiveConfiguration.getJournalFileSize(), createLiveConfiguration.getJournalMinFiles(), createLiveConfiguration.getJournalPoolFiles(), createLiveConfiguration.getJournalCompactMinFiles(), createLiveConfiguration.getJournalCompactPercentage(), mappedSequentialFileFactory, "activemq-data", "amq", mappedSequentialFileFactory.getMaxIO());
        journalImpl.start();
        final AtomicInteger atomicInteger = new AtomicInteger();
        journalImpl.load(new AddRecordLoaderCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationFlowControlTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void addRecord(RecordInfo recordInfo) {
                if (recordInfo.userRecordType != 45) {
                }
                SharedNothingReplicationFlowControlTest.logger.infof("got live message %d %d", Long.valueOf(recordInfo.id), Byte.valueOf(recordInfo.userRecordType));
                atomicInteger.incrementAndGet();
            }
        });
        this.brokersFolder.getRoot().toPath().resolve("backup").resolve("data").resolve("journal").toFile();
        MappedSequentialFileFactory mappedSequentialFileFactory2 = new MappedSequentialFileFactory(createBackupConfiguration.getJournalLocation(), createBackupConfiguration.getJournalFileSize(), false, createBackupConfiguration.getJournalBufferSize_NIO(), createBackupConfiguration.getJournalBufferTimeout_NIO(), (IOCriticalErrorListener) null);
        JournalImpl journalImpl2 = new JournalImpl(createBackupConfiguration.getJournalFileSize(), createBackupConfiguration.getJournalMinFiles(), createBackupConfiguration.getJournalPoolFiles(), createBackupConfiguration.getJournalCompactMinFiles(), createBackupConfiguration.getJournalCompactPercentage(), mappedSequentialFileFactory2, "activemq-data", "amq", mappedSequentialFileFactory2.getMaxIO());
        journalImpl2.start();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        journalImpl2.load(new AddRecordLoaderCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationFlowControlTest.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void addRecord(RecordInfo recordInfo) {
                if (recordInfo.userRecordType != 45) {
                }
                SharedNothingReplicationFlowControlTest.logger.infof("replicated message %d", Long.valueOf(recordInfo.id));
                atomicInteger2.incrementAndGet();
            }
        });
        logger.infof("expected %d messages, live=%d, backup=%d", 100, Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()));
        Assert.assertEquals("Live lost journal record", 100L, atomicInteger.get());
        Assert.assertEquals("Backup did not replicated all journal", 100L, atomicInteger2.get());
    }

    @Test
    public void testSendPages() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer.start();
        Wait.waitFor(() -> {
            return addServer.isStarted();
        });
        ServerLocator newLocator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
        newLocator.setCallTimeout(60000L);
        newLocator.setConnectionTTL(60000L);
        ClientSession createSession = newLocator.createSessionFactory().createSession();
        createSession.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true);
        PagingStore pageStore = addServer.getPagingManager().getPageStore(SimpleString.toSimpleString("flowcontrol"));
        pageStore.startPaging();
        ClientProducer createProducer = createSession.createProducer("flowcontrol");
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(true));
            if (i % 10 == 0) {
                createSession.commit();
                pageStore.forceAnotherPage();
            }
        }
        createSession.close();
        openCount.set(0);
        closeCount.set(0);
        ActiveMQServerImpl activeMQServerImpl = new ActiveMQServerImpl(createBackupConfiguration().setNetworkCheckURLList((String) null), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())) { // from class: org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationFlowControlTest.3
            public PagingManager createPagingManager() throws Exception {
                PagingManagerImpl createPagingManager = super.createPagingManager();
                createPagingManager.replacePageStoreFactory(new PageStoreFactoryTestable(createPagingManager.getPagingStoreFactory()));
                return createPagingManager;
            }
        };
        addServer(activeMQServerImpl).start();
        Wait.waitFor(() -> {
            return activeMQServerImpl.isStarted();
        });
        Objects.requireNonNull(activeMQServerImpl);
        Wait.waitFor(activeMQServerImpl::isReplicaSync, 30000L);
        activeMQServerImpl.getPagingManager().getPagingStoreFactory();
        Assert.assertEquals(openCount.get(), closeCount.get());
    }

    private Configuration createLiveConfiguration() throws Exception {
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.setName("localhost::live");
        configurationImpl.setBrokerInstance(this.brokersFolder.newFolder("live"));
        configurationImpl.addAcceptorConfiguration("live", "tcp://localhost:61616?writeBufferHighWaterMark=2048&writeBufferLowWaterMark=2048");
        configurationImpl.addConnectorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.addConnectorConfiguration("live", "tcp://localhost:61616");
        configurationImpl.setClusterUser("mycluster");
        configurationImpl.setClusterPassword("mypassword");
        ReplicatedPolicyConfiguration replicatedPolicyConfiguration = new ReplicatedPolicyConfiguration();
        replicatedPolicyConfiguration.setVoteOnReplicationFailure(false);
        replicatedPolicyConfiguration.setCheckForLiveServer(false);
        configurationImpl.setHAPolicyConfiguration(replicatedPolicyConfiguration);
        ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration();
        clusterConnectionConfiguration.setStaticConnectors(new ArrayList()).getStaticConnectors().add("backup");
        clusterConnectionConfiguration.setName("cluster");
        clusterConnectionConfiguration.setConnectorName("live");
        clusterConnectionConfiguration.setCallTimeout(4000L);
        configurationImpl.addClusterConfiguration(clusterConnectionConfiguration);
        configurationImpl.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(524288).setConnectionTTLOverride(60000L);
        return configurationImpl;
    }

    private Configuration createBackupConfiguration() throws Exception {
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.setName("localhost::backup");
        configurationImpl.setBrokerInstance(this.brokersFolder.newFolder("backup"));
        ReplicaPolicyConfiguration replicaPolicyConfiguration = new ReplicaPolicyConfiguration();
        replicaPolicyConfiguration.setClusterName("cluster");
        configurationImpl.setHAPolicyConfiguration(replicaPolicyConfiguration);
        configurationImpl.addAcceptorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.addConnectorConfiguration("live", "tcp://localhost:61616");
        configurationImpl.addConnectorConfiguration("backup", "tcp://localhost:61617");
        configurationImpl.setClusterUser("mycluster");
        configurationImpl.setClusterPassword("mypassword");
        ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration();
        clusterConnectionConfiguration.setStaticConnectors(new ArrayList()).getStaticConnectors().add("live");
        clusterConnectionConfiguration.setName("cluster");
        clusterConnectionConfiguration.setConnectorName("backup");
        configurationImpl.addClusterConfiguration(clusterConnectionConfiguration);
        configurationImpl.setNetworkCheckPeriod(1000000L).setNetworkCheckURLList("http://localhost:28787").setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(524288).setConnectionTTLOverride(60000L);
        return configurationImpl;
    }
}
