package org.apache.activemq.artemis.tests.integration.replication;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.class */
public final class ReplicationTest extends ActiveMQTestBase {
    private ThreadFactory tFactory;
    private ExecutorService executor;
    private ExecutorFactory factory;
    private ScheduledExecutorService scheduledExecutor;
    private ActiveMQServer backupServer;
    private ActiveMQServer liveServer;
    private ServerLocator locator;
    private ReplicationManager manager;
    private static final SimpleString ADDRESS = new SimpleString("foobar123");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/ReplicationTest$ExtraConfigurer.class */
    public interface ExtraConfigurer {
        void config(Configuration configuration, Configuration configuration2);
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/ReplicationTest$FakeData.class */
    class FakeData implements EncodingSupport {
        FakeData() {
        }

        public void decode(ActiveMQBuffer activeMQBuffer) {
        }

        public void encode(ActiveMQBuffer activeMQBuffer) {
            activeMQBuffer.writeBytes(new byte[5]);
        }

        public int getEncodeSize() {
            return 5;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/ReplicationTest$FakeJournal.class */
    static final class FakeJournal implements Journal {
        FakeJournal() {
        }

        public void appendAddRecord(long j, byte b, byte[] bArr, boolean z) throws Exception {
        }

        public void appendAddRecord(long j, byte b, EncodingSupport encodingSupport, boolean z) throws Exception {
        }

        public void appendAddRecordTransactional(long j, long j2, byte b, byte[] bArr) throws Exception {
        }

        public void appendAddRecordTransactional(long j, long j2, byte b, EncodingSupport encodingSupport) throws Exception {
        }

        public void appendCommitRecord(long j, boolean z) throws Exception {
        }

        public void appendDeleteRecord(long j, boolean z) throws Exception {
        }

        public void appendDeleteRecordTransactional(long j, long j2, byte[] bArr) throws Exception {
        }

        public void appendDeleteRecordTransactional(long j, long j2, EncodingSupport encodingSupport) throws Exception {
        }

        public void appendDeleteRecordTransactional(long j, long j2) throws Exception {
        }

        public void appendPrepareRecord(long j, EncodingSupport encodingSupport, boolean z) throws Exception {
        }

        public void appendPrepareRecord(long j, byte[] bArr, boolean z) throws Exception {
        }

        public void appendRollbackRecord(long j, boolean z) throws Exception {
        }

        public void appendUpdateRecord(long j, byte b, byte[] bArr, boolean z) throws Exception {
        }

        public void appendUpdateRecord(long j, byte b, EncodingSupport encodingSupport, boolean z) throws Exception {
        }

        public void appendUpdateRecordTransactional(long j, long j2, byte b, byte[] bArr) throws Exception {
        }

        public void appendUpdateRecordTransactional(long j, long j2, byte b, EncodingSupport encodingSupport) throws Exception {
        }

        public int getAlignment() throws Exception {
            return 0;
        }

        public JournalLoadInformation load(LoaderCallback loaderCallback) throws Exception {
            return new JournalLoadInformation();
        }

        public JournalLoadInformation load(List<RecordInfo> list, List<PreparedTransactionInfo> list2, TransactionFailureCallback transactionFailureCallback) throws Exception {
            return new JournalLoadInformation();
        }

        public void perfBlast(int i) {
        }

        public boolean isStarted() {
            return false;
        }

        public void start() throws Exception {
        }

        public void stop() throws Exception {
        }

        public JournalLoadInformation loadInternalOnly() throws Exception {
            return new JournalLoadInformation();
        }

        public int getNumberOfRecords() {
            return 0;
        }

        public void appendAddRecord(long j, byte b, EncodingSupport encodingSupport, boolean z, IOCompletion iOCompletion) throws Exception {
        }

        public void appendCommitRecord(long j, boolean z, IOCompletion iOCompletion) throws Exception {
        }

        public void appendDeleteRecord(long j, boolean z, IOCompletion iOCompletion) throws Exception {
        }

        public void appendPrepareRecord(long j, EncodingSupport encodingSupport, boolean z, IOCompletion iOCompletion) throws Exception {
        }

        public void appendRollbackRecord(long j, boolean z, IOCompletion iOCompletion) throws Exception {
        }

        public void appendUpdateRecord(long j, byte b, EncodingSupport encodingSupport, boolean z, IOCompletion iOCompletion) throws Exception {
        }

        public void sync(IOCompletion iOCompletion) {
        }

        public void runDirectJournalBlast() throws Exception {
        }

        public int getUserVersion() {
            return 0;
        }

        public void appendCommitRecord(long j, boolean z, IOCompletion iOCompletion, boolean z2) throws Exception {
        }

        public void lineUpContext(IOCompletion iOCompletion) {
        }

        public JournalLoadInformation loadSyncOnly(Journal.JournalState journalState) throws Exception {
            return null;
        }

        public Map<Long, JournalFile> createFilesForBackupSync(long[] jArr) throws Exception {
            return null;
        }

        public void synchronizationLock() {
        }

        public void synchronizationUnlock() {
        }

        public void forceMoveNextFile() throws Exception {
        }

        public JournalFile[] getDataFiles() {
            return null;
        }

        public SequentialFileFactory getFileFactory() {
            return null;
        }

        public int getFileSize() {
            return 0;
        }

        public void scheduleCompactAndBlock(int i) throws Exception {
        }

        public void replicationSyncPreserveOldFiles() {
        }

        public void replicationSyncFinished() {
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/ReplicationTest$TestInterceptor.class */
    public static final class TestInterceptor implements Interceptor {
        static AtomicBoolean value = new AtomicBoolean(true);

        public boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
            return value.get();
        }
    }

    private void setupServer(boolean z, String... strArr) throws Exception {
        setupServer(false, z, null, strArr);
    }

    private void setupServer(boolean z, boolean z2, ExtraConfigurer extraConfigurer, String... strArr) throws Exception {
        TransportConfiguration inVMConnector;
        TransportConfiguration inVMConnector2;
        TransportConfiguration inVMAcceptor;
        TransportConfiguration transportConfiguration = null;
        if (z) {
            inVMConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
            transportConfiguration = TransportConfigurationUtils.getNettyAcceptor(true, 0);
            inVMConnector2 = TransportConfigurationUtils.getNettyConnector(false, 0);
            inVMAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
        } else {
            inVMConnector = TransportConfigurationUtils.getInVMConnector(true);
            inVMConnector2 = TransportConfigurationUtils.getInVMConnector(false);
            inVMAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
        }
        Configuration createDefaultInVMConfig = createDefaultInVMConfig();
        Configuration incomingInterceptorClassNames = createDefaultInVMConfig().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setIncomingInterceptorClassNames(strArr.length > 0 ? Arrays.asList(strArr) : new ArrayList());
        ReplicatedBackupUtils.configureReplicationPair(incomingInterceptorClassNames, inVMConnector2, inVMAcceptor, createDefaultInVMConfig, inVMConnector, transportConfiguration);
        if (extraConfigurer != null) {
            extraConfigurer.config(createDefaultInVMConfig, incomingInterceptorClassNames);
        }
        if (z2) {
            this.liveServer = createServer(createDefaultInVMConfig);
            this.liveServer.start();
            waitForComponent(this.liveServer);
        }
        this.backupServer = createServer(incomingInterceptorClassNames);
        if (z) {
            this.locator = createNettyNonHALocator();
        } else {
            this.locator = createInVMNonHALocator();
        }
        this.backupServer.start();
        if (z2) {
            ActiveMQTestBase.waitForRemoteBackup((ClientSessionFactory) null, 5, true, this.backupServer);
        }
        waitForReplication(0);
    }

    private void waitForReplication(int i) throws InterruptedException {
        if (this.liveServer == null) {
            return;
        }
        while (this.liveServer.getReplicationManager() == null && i < 10) {
            Thread.sleep(50L);
            i++;
        }
    }

    private static void waitForComponent(ActiveMQComponent activeMQComponent) throws Exception {
        waitForComponent(activeMQComponent, 3L);
    }

    @Test
    public void testBasicConnection() throws Exception {
        setupServer(true, new String[0]);
        waitForComponent(this.liveServer.getReplicationManager());
    }

    @Test
    public void testConnectIntoNonBackup() throws Exception {
        setupServer(false, new String[0]);
        try {
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            this.manager = new ReplicationManager(createSessionFactory.getConnection(), createSessionFactory.getServerLocator().getCallTimeout(), this.factory);
            addActiveMQComponent(this.manager);
            this.manager.start();
            Assert.fail("Exception was expected");
        } catch (ActiveMQException e) {
            fail("Invalid Exception type:" + e.getType());
        } catch (ActiveMQNotConnectedException e2) {
        }
    }

    @Test
    public void testSendPackets() throws Exception {
        setupServer(true, new String[0]);
        JournalStorageManager storage = getStorage();
        this.manager = this.liveServer.getReplicationManager();
        waitForComponent(this.manager);
        ReplicatedJournal replicatedJournal = new ReplicatedJournal((byte) 1, new FakeJournal(), this.manager);
        replicatedJournal.appendPrepareRecord(1L, new FakeData(), false);
        replicatedJournal.appendAddRecord(1L, (byte) 1, new FakeData(), false);
        replicatedJournal.appendUpdateRecord(1L, (byte) 2, new FakeData(), false);
        replicatedJournal.appendDeleteRecord(1L, false);
        replicatedJournal.appendAddRecordTransactional(2L, 2L, (byte) 1, new FakeData());
        replicatedJournal.appendUpdateRecordTransactional(2L, 2L, (byte) 2, new FakeData());
        replicatedJournal.appendCommitRecord(2L, false);
        replicatedJournal.appendDeleteRecordTransactional(3L, 4L, new FakeData());
        replicatedJournal.appendPrepareRecord(3L, new FakeData(), false);
        replicatedJournal.appendRollbackRecord(3L, false);
        blockOnReplication(storage, this.manager);
        Assert.assertTrue("Expecting no active tokens:" + this.manager.getActiveTokens(), this.manager.getActiveTokens().isEmpty());
        ServerMessageImpl serverMessageImpl = new ServerMessageImpl(1L, 1024);
        SimpleString simpleString = new SimpleString("dummy");
        serverMessageImpl.setAddress(simpleString);
        replicatedJournal.appendAddRecordTransactional(23L, 24L, (byte) 1, new FakeData());
        PagedMessageImpl pagedMessageImpl = new PagedMessageImpl(serverMessageImpl, new long[0]);
        this.manager.pageWrite(pagedMessageImpl, 1);
        this.manager.pageWrite(pagedMessageImpl, 2);
        this.manager.pageWrite(pagedMessageImpl, 3);
        this.manager.pageWrite(pagedMessageImpl, 4);
        blockOnReplication(storage, this.manager);
        PagingStore pageStore = createPageManager(this.backupServer.getStorageManager(), this.backupServer.getConfiguration(), this.backupServer.getExecutorFactory(), this.backupServer.getAddressSettingsRepository()).getPageStore(simpleString);
        pageStore.start();
        Assert.assertEquals(4L, pageStore.getNumberOfPages());
        pageStore.stop();
        this.manager.pageDeleted(simpleString, 1);
        this.manager.pageDeleted(simpleString, 2);
        this.manager.pageDeleted(simpleString, 3);
        this.manager.pageDeleted(simpleString, 4);
        this.manager.pageDeleted(simpleString, 5);
        this.manager.pageDeleted(simpleString, 6);
        blockOnReplication(storage, this.manager);
        ServerMessageImpl serverMessageImpl2 = new ServerMessageImpl();
        serverMessageImpl2.setMessageID(500L);
        serverMessageImpl2.setAddress(new SimpleString("tttt"));
        serverMessageImpl2.encodeHeadersAndProperties(ActiveMQBuffers.dynamicBuffer(100));
        this.manager.largeMessageBegin(500L);
        this.manager.largeMessageWrite(500L, new byte[1024]);
        this.manager.largeMessageDelete(500L);
        blockOnReplication(storage, this.manager);
        pageStore.start();
        Assert.assertEquals(0L, pageStore.getNumberOfPages());
    }

    @Test
    public void testSendPacketsWithFailure() throws Exception {
        setupServer(true, TestInterceptor.class.getName());
        this.manager = this.liveServer.getReplicationManager();
        waitForComponent(this.manager);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession();
        ClientSession createSession2 = createSessionFactory.createSession();
        createSession.createQueue(ADDRESS, ADDRESS, (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(ADDRESS);
        createSession.start();
        createSession2.start();
        try {
            ClientConsumer createConsumer = createSession2.createConsumer(ADDRESS);
            for (int i = 0; i < 100; i++) {
                ClientMessage createMessage = createSession.createMessage(true);
                setBody(i, createMessage);
                createMessage.putIntProperty("counter", i);
                createProducer.send(createMessage);
                if (i == 37) {
                    TestInterceptor.value.set(false);
                }
                ClientMessage receive = createConsumer.receive(1000L);
                Assert.assertNotNull("Message should exist!", receive);
                assertMessageBody(i, receive);
                Assert.assertEquals(i, receive.getIntProperty("counter").intValue());
                receive.acknowledge();
            }
            TestInterceptor.value.set(false);
            if (!createSession.isClosed()) {
                createSession.close();
            }
            if (createSession2.isClosed()) {
                return;
            }
            createSession2.close();
        } catch (Throwable th) {
            TestInterceptor.value.set(false);
            if (!createSession.isClosed()) {
                createSession.close();
            }
            if (!createSession2.isClosed()) {
                createSession2.close();
            }
            throw th;
        }
    }

    @Test
    public void testExceptionSettingActionBefore() throws Exception {
        OperationContext context = OperationContextImpl.getContext(this.factory);
        context.storeLineUp();
        context.onError(ActiveMQExceptionType.UNBLOCKED.getCode(), "I'm an exception");
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        context.executeOnCompletion(new IOCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.1
            public void onError(int i, String str) {
                atomicInteger.set(i);
                arrayList.add(str);
                countDownLatch.countDown();
            }

            public void done() {
            }
        });
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assert.assertEquals(5L, atomicInteger.get());
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals("I'm an exception", arrayList.get(0));
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        context.executeOnCompletion(new IOCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.2
            public void onError(int i, String str) {
                atomicInteger.set(i);
                arrayList.add(str);
                countDownLatch2.countDown();
            }

            public void done() {
            }
        });
        Assert.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals("I'm an exception", arrayList.get(0));
        Assert.assertEquals("I'm an exception", arrayList.get(1));
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        context.executeOnCompletion(new IOCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.3
            public void onError(int i, String str) {
            }

            public void done() {
                countDownLatch3.countDown();
            }
        });
        Assert.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testClusterConnectionConfigs() throws Exception {
        setupServer(true, true, new ExtraConfigurer() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.4
            @Override // org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.ExtraConfigurer
            public void config(Configuration configuration, Configuration configuration2) {
                List clusterConfigurations = configuration2.getClusterConfigurations();
                Assert.assertTrue(clusterConfigurations.size() > 0);
                ClusterConnectionConfiguration clusterConnectionConfiguration = (ClusterConnectionConfiguration) clusterConfigurations.get(0);
                clusterConnectionConfiguration.setConnectionTTL(123456789L);
                clusterConnectionConfiguration.setClientFailureCheckPeriod(987654321L);
            }
        }, new String[0]);
        assertTrue(this.backupServer instanceof ActiveMQServerImpl);
        ServerLocator replicationLocator = this.backupServer.getClusterManager().getClusterController().getReplicationLocator();
        assertNotNull(replicationLocator);
        assertEquals(123456789L, replicationLocator.getConnectionTTL());
        assertEquals(987654321L, replicationLocator.getClientFailureCheckPeriod());
    }

    private JournalStorageManager getStorage() throws Exception {
        return new JournalStorageManager(createDefaultInVMConfig(), this.factory, (IOCriticalErrorListener) null);
    }

    private void blockOnReplication(StorageManager storageManager, ReplicationManager replicationManager) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        storageManager.afterCompleteOperations(new IOCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.5
            public void onError(int i, String str) {
            }

            public void done() {
                countDownLatch.countDown();
            }
        });
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
    }

    @Test
    public void testNoActions() throws Exception {
        setupServer(true, new String[0]);
        JournalStorageManager storage = getStorage();
        this.manager = this.liveServer.getReplicationManager();
        waitForComponent(this.manager);
        new ReplicatedJournal((byte) 1, new FakeJournal(), this.manager).appendPrepareRecord(1L, new FakeData(), false);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        storage.afterCompleteOperations(new IOCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.6
            public void onError(int i, String str) {
            }

            public void done() {
                countDownLatch.countDown();
            }
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        Assert.assertEquals("should be empty " + this.manager.getActiveTokens(), 0L, this.manager.getActiveTokens().size());
    }

    @Test
    public void testOrderOnNonPersistency() throws Exception {
        setupServer(true, new String[0]);
        final ArrayList arrayList = new ArrayList();
        JournalStorageManager storage = getStorage();
        this.manager = this.liveServer.getReplicationManager();
        ReplicatedJournal replicatedJournal = new ReplicatedJournal((byte) 1, new FakeJournal(), this.manager);
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        OperationContext context = storage.getContext();
        for (int i = 0; i < 200; i++) {
            final int i2 = i;
            if (i % 2 == 0) {
                replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
            }
            context.executeOnCompletion(new IOCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.ReplicationTest.7
                public void onError(int i3, String str) {
                }

                public void done() {
                    arrayList.add(Integer.valueOf(i2));
                    countDownLatch.countDown();
                }
            });
        }
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        for (int i3 = 0; i3 < 200; i3++) {
            Assert.assertEquals(i3, ((Integer) arrayList.get(i3)).intValue());
        }
        Assert.assertEquals(0L, this.manager.getActiveTokens().size());
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.tFactory = new ActiveMQThreadFactory("ActiveMQ-ReplicationTest", false, getClass().getClassLoader());
        this.executor = Executors.newCachedThreadPool(this.tFactory);
        this.scheduledExecutor = new ScheduledThreadPoolExecutor(10, this.tFactory);
        this.factory = new OrderedExecutorFactory(this.executor);
    }

    @After
    public void tearDown() throws Exception {
        stopComponent(this.manager);
        this.manager = null;
        closeServerLocator(this.locator);
        stopComponent(this.backupServer);
        this.backupServer = null;
        stopComponent(this.liveServer);
        this.liveServer = null;
        this.executor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
        this.tFactory = null;
        this.scheduledExecutor = null;
        super.tearDown();
    }

    protected PagingManager createPageManager(StorageManager storageManager, Configuration configuration, ExecutorFactory executorFactory, HierarchicalRepository<AddressSettings> hierarchicalRepository) throws Exception {
        PagingManagerImpl pagingManagerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000L, (ScheduledExecutorService) null, executorFactory, false, (IOCriticalErrorListener) null), hierarchicalRepository);
        pagingManagerImpl.start();
        return pagingManagerImpl;
    }
}
