package org.apache.activemq.artemis.tests.integration.paging;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.replication.ReplicationOrderTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingWithFailoverAndCountersTest.class */
public class PagingWithFailoverAndCountersTest extends ActiveMQTestBase {
    Process liveProcess;
    Process backupProcess;
    PagingWithFailoverServer inProcessBackup;
    private static final int PORT1 = 5050;
    private static final int PORT2 = 5051;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingWithFailoverAndCountersTest$ConsumerThread.class */
    class ConsumerThread extends TestThread {
        ClientSessionFactory factory;
        String queueName;
        final AtomicInteger errors;
        final int txSize;

        ConsumerThread(ClientSessionFactory clientSessionFactory, String str, long j, int i) {
            super();
            this.errors = new AtomicInteger(0);
            this.factory = clientSessionFactory;
            this.queueName = str;
            this.txSize = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ClientSession createSession = this.txSize == 0 ? this.factory.createSession(true, true) : this.factory.createSession(false, false);
                ClientConsumer createConsumer = createSession.createConsumer(this.queueName);
                createSession.start();
                long j = 0;
                int i = 0;
                long j2 = 0;
                while (isRunning(0L)) {
                    try {
                        ClientMessage receive = createConsumer.receive(100L);
                        if (receive != null) {
                            j2 = receive.getLongProperty("count").longValue();
                            if (j2 < j) {
                                failed("Message received in duplicate out of order, LastCommit = " + j + ", currentMsg = " + j2);
                            }
                            receive.acknowledge();
                            if (this.txSize > 0 && i > 0 && i % this.txSize == 0) {
                                createSession.commit();
                                if (j2 > j) {
                                    j = j2;
                                }
                            }
                            i++;
                        }
                        if (i % 100 == 0) {
                            PagingWithFailoverAndCountersTest.this.instanceLog.debug("received " + i + " on " + this.queueName);
                        }
                    } catch (Throwable th) {
                        PagingWithFailoverAndCountersTest.this.instanceLog.warn("=====> expected Error at " + j2 + " with lastCommit=" + j);
                    }
                }
                createSession.close();
            } catch (Exception e) {
                e.printStackTrace();
                this.errors.incrementAndGet();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingWithFailoverAndCountersTest$MonitorThread.class */
    class MonitorThread extends TestThread {
        MonitorThread() {
            super("Monitor-thread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                PagingWithFailoverAndCountersTest.this.waitForServerToStart(PagingWithFailoverAndCountersTest.this.inProcessBackup.getServer());
                try {
                    ServerLocator retryInterval = SpawnedServerSupport.createLocator(PagingWithFailoverAndCountersTest.PORT2).setInitialConnectAttempts(100).setRetryInterval(100L);
                    ClientSession createSession = retryInterval.createSessionFactory().createSession();
                    createSession.createQueue(new QueueConfiguration("new-queue").setRoutingType(RoutingType.ANYCAST));
                    createSession.start();
                    createSession.createProducer("new-queue").send(createSession.createMessage(true));
                    ClientConsumer createConsumer = createSession.createConsumer("new-queue");
                    createConsumer.receive(500L).acknowledge();
                    createConsumer.close();
                    createSession.deleteQueue("new-queue");
                    retryInterval.close();
                } catch (Throwable th) {
                    th.printStackTrace();
                    Assert.fail(th.getMessage());
                }
                Queue locateQueue = PagingWithFailoverAndCountersTest.this.inProcessBackup.getServer().locateQueue(SimpleString.toSimpleString("cons2"));
                while (isRunning(1L)) {
                    long messageCount = PagingWithFailoverAndCountersTest.this.getMessageCount(locateQueue);
                    if (messageCount < 0) {
                        Assert.fail("count < 0 .... being " + messageCount);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingWithFailoverAndCountersTest$TestThread.class */
    class TestThread extends Thread {
        boolean running;
        final Object waitNotify;
        private boolean failed;

        TestThread() {
            this.running = true;
            this.waitNotify = new Object();
            this.failed = false;
        }

        TestThread(String str) {
            super(str);
            this.running = true;
            this.waitNotify = new Object();
            this.failed = false;
        }

        public void failed(String str) {
            System.err.println(str);
            this.failed = true;
        }

        public boolean isFailed() {
            return this.failed;
        }

        public void stopTest() {
            synchronized (this.waitNotify) {
                this.running = false;
                this.waitNotify.notifyAll();
            }
            while (isAlive()) {
                try {
                    join(5000L);
                } catch (Throwable th) {
                }
                if (isAlive()) {
                    interrupt();
                }
            }
            Assert.assertFalse(this.failed);
        }

        public boolean isRunning(long j) {
            boolean z;
            synchronized (this.waitNotify) {
                if (j > 0) {
                    try {
                        long currentTimeMillis = System.currentTimeMillis() + j;
                        while (this.running && currentTimeMillis > System.currentTimeMillis()) {
                            this.waitNotify.wait(j);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        Thread.currentThread().interrupt();
                    }
                }
                z = this.running;
            }
            return z;
        }
    }

    private void startLive() throws Exception {
        assertNull(this.liveProcess);
        this.liveProcess = PagingWithFailoverServer.spawnVM(getTestDir(), PORT1, PORT2);
    }

    private void startBackupInProcess() throws Exception {
        assertNull(this.backupProcess);
        assertNull(this.inProcessBackup);
        this.inProcessBackup = new PagingWithFailoverServer();
        this.inProcessBackup.perform(getTestDir(), PORT2, PORT1, true);
    }

    @After
    public void tearDown() throws Exception {
        killLive();
        killBackup();
        super.tearDown();
    }

    private void killBackup() {
        try {
            if (this.backupProcess != null) {
                this.backupProcess.destroy();
            }
        } catch (Throwable th) {
        }
        this.backupProcess = null;
        if (this.inProcessBackup != null) {
            try {
                this.inProcessBackup.getServer().fail(false);
            } catch (Throwable th2) {
                th2.printStackTrace();
            }
            this.inProcessBackup = null;
        }
    }

    private void killLive() {
        try {
            if (this.liveProcess != null) {
                this.liveProcess.destroy();
            }
        } catch (Throwable th) {
        }
        this.liveProcess = null;
    }

    @Test
    public void testValidateDeliveryAndCounters() throws Exception {
        startLive();
        ClientSessionFactory createSessionFactory = SpawnedServerSupport.createLocator(PORT1).setInitialConnectAttempts(ReplicationOrderTest.NUM).setReconnectAttempts(ReplicationOrderTest.NUM).setRetryInterval(100L).createSessionFactory();
        ClientSession createSession = createSessionFactory.createSession();
        createSession.createQueue(new QueueConfiguration("DeadConsumer").setAddress("myAddress"));
        createSession.createQueue(new QueueConfiguration("cons2").setAddress("myAddress"));
        startBackupInProcess();
        waitForRemoteBackup(createSessionFactory, 10);
        ConsumerThread consumerThread = new ConsumerThread(createSessionFactory, "cons2", 0L, 10);
        consumerThread.start();
        MonitorThread monitorThread = new MonitorThread();
        ClientProducer createProducer = createSession.createProducer("myAddress");
        long j = 0;
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        long currentTimeMillis2 = System.currentTimeMillis() + 2000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            j++;
            if (System.currentTimeMillis() > currentTimeMillis2 && this.liveProcess != null) {
                killLive();
                monitorThread.start();
            }
            try {
                ClientMessage createMessage = createSession.createMessage(true);
                createMessage.putLongProperty("count", j);
                createProducer.send(createMessage);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        try {
            consumerThread.stopTest();
            monitorThread.stopTest();
            killBackup();
            killLive();
            createSessionFactory.close();
            verifyServer();
        } catch (Throwable th) {
            killBackup();
            killLive();
            throw th;
        }
    }

    public void verifyServer() throws Exception {
        ActiveMQServer createServer = PagingWithFailoverServer.createServer(getTestDir(), PORT1, PORT2, false);
        createServer.start();
        waitForServerToStart(createServer);
        int messageCount = getMessageCount(createServer.locateQueue(SimpleString.toSimpleString("cons2")));
        assertTrue(messageCount >= 0);
        ServerLocator retryInterval = SpawnedServerSupport.createLocator(PORT1).setInitialConnectAttempts(100).setReconnectAttempts(ReplicationOrderTest.NUM).setRetryInterval(100L);
        ClientSessionFactory createSessionFactory = retryInterval.createSessionFactory();
        ClientSession createSession = createSessionFactory.createSession();
        createSession.start();
        try {
            drainConsumer(createSession.createConsumer("cons2"), "cons2", messageCount);
            createSession.close();
            createSessionFactory.close();
            retryInterval.close();
            createServer.stop();
        } catch (Throwable th) {
            createSession.close();
            createSessionFactory.close();
            retryInterval.close();
            createServer.stop();
            throw th;
        }
    }

    private void drainConsumer(ClientConsumer clientConsumer, String str, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            ClientMessage receive = clientConsumer.receive(5000L);
            assertNotNull(receive);
            receive.acknowledge();
        }
        assertNull(clientConsumer.receiveImmediate());
        clientConsumer.close();
    }
}
