package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest.class */
public class ClusteredMessageCounterTest extends ClusterTestBase {
    private AtomicInteger total = new AtomicInteger();
    private AtomicBoolean stopFlag = new AtomicBoolean();
    private Timer timer1 = new Timer();
    private Timer timer2 = new Timer();
    private int numMsg = 1000;
    private List<MessageCounterInfo> results = new ArrayList();

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest$MessageCounterCollector.class */
    private class MessageCounterCollector extends TimerTask {
        private QueueControl queueControl;
        private CountDownLatch resultLatch;

        MessageCounterCollector(QueueControl queueControl, CountDownLatch countDownLatch) {
            this.queueControl = queueControl;
            this.resultLatch = countDownLatch;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                if (ClusteredMessageCounterTest.this.stopFlag.get()) {
                    return;
                }
                try {
                    MessageCounterInfo fromJSON = MessageCounterInfo.fromJSON(this.queueControl.listMessageCounter());
                    ClusteredMessageCounterTest.this.results.add(fromJSON);
                    this.resultLatch.countDown();
                    if (fromJSON.getCountDelta() < 0) {
                        ClusteredMessageCounterTest.this.stopFlag.set(true);
                        while (this.resultLatch.getCount() > 0) {
                            this.resultLatch.countDown();
                        }
                    }
                    if (ClusteredMessageCounterTest.this.stopFlag.get()) {
                        return;
                    }
                    ClusteredMessageCounterTest.this.timer1.schedule(new MessageCounterCollector(this.queueControl, this.resultLatch), 200L);
                } catch (Exception e) {
                    e.printStackTrace();
                    if (ClusteredMessageCounterTest.this.stopFlag.get()) {
                        return;
                    }
                    ClusteredMessageCounterTest.this.timer1.schedule(new MessageCounterCollector(this.queueControl, this.resultLatch), 200L);
                }
            } catch (Throwable th) {
                if (!ClusteredMessageCounterTest.this.stopFlag.get()) {
                    ClusteredMessageCounterTest.this.timer1.schedule(new MessageCounterCollector(this.queueControl, this.resultLatch), 200L);
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest$PeriodicalReceiver.class */
    private class PeriodicalReceiver extends TimerTask {
        private int batchSize;
        private int serverID;
        private long period;

        PeriodicalReceiver(int i, int i2, long j) {
            this.batchSize = i;
            this.serverID = i2;
            this.period = j;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ClientMessage receive;
            if (ClusteredMessageCounterTest.this.stopFlag.get()) {
                return;
            }
            int i = 0;
            ClientSession clientSession = null;
            ClientConsumer clientConsumer = null;
            try {
                try {
                    clientSession = ClusteredMessageCounterTest.this.sfs[this.serverID].createSession(false, true, false);
                    clientConsumer = clientSession.createConsumer("queue0", (String) null);
                    clientSession.start();
                    while (true) {
                        if ((i >= this.batchSize && !ClusteredMessageCounterTest.this.stopFlag.get()) || (receive = clientConsumer.receive(2000L)) == null) {
                            break;
                        }
                        receive.acknowledge();
                        i++;
                    }
                    clientSession.commit();
                    if (clientConsumer != null) {
                        try {
                            clientConsumer.close();
                        } catch (ActiveMQException e) {
                            e.printStackTrace();
                        }
                    }
                    if (clientSession != null) {
                        try {
                            clientSession.close();
                        } catch (ActiveMQException e2) {
                            e2.printStackTrace();
                        }
                    }
                    if (ClusteredMessageCounterTest.this.stopFlag.get() || ClusteredMessageCounterTest.this.total.addAndGet(i) >= ClusteredMessageCounterTest.this.numMsg - 200) {
                        return;
                    }
                    ClusteredMessageCounterTest.this.timer2.schedule(new PeriodicalReceiver(this.batchSize, this.serverID, this.period), this.period);
                } catch (Throwable th) {
                    if (clientConsumer != null) {
                        try {
                            clientConsumer.close();
                        } catch (ActiveMQException e3) {
                            e3.printStackTrace();
                        }
                    }
                    if (clientSession != null) {
                        try {
                            clientSession.close();
                        } catch (ActiveMQException e4) {
                            e4.printStackTrace();
                        }
                    }
                    if (!ClusteredMessageCounterTest.this.stopFlag.get() && ClusteredMessageCounterTest.this.total.addAndGet(i) < ClusteredMessageCounterTest.this.numMsg - 200) {
                        ClusteredMessageCounterTest.this.timer2.schedule(new PeriodicalReceiver(this.batchSize, this.serverID, this.period), this.period);
                    }
                    throw th;
                }
            } catch (ActiveMQException e5) {
                e5.printStackTrace();
                if (clientConsumer != null) {
                    try {
                        clientConsumer.close();
                    } catch (ActiveMQException e6) {
                        e6.printStackTrace();
                    }
                }
                if (clientSession != null) {
                    try {
                        clientSession.close();
                    } catch (ActiveMQException e7) {
                        e7.printStackTrace();
                    }
                }
                if (ClusteredMessageCounterTest.this.stopFlag.get() || ClusteredMessageCounterTest.this.total.addAndGet(i) >= ClusteredMessageCounterTest.this.numMsg - 200) {
                    return;
                }
                ClusteredMessageCounterTest.this.timer2.schedule(new PeriodicalReceiver(this.batchSize, this.serverID, this.period), this.period);
            }
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        setupServers();
        setupClusters();
        this.total.set(0);
        this.stopFlag.set(false);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @After
    public void tearDown() throws Exception {
        this.timer1.cancel();
        this.timer2.cancel();
        super.tearDown();
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
    }

    protected void setupClusters() {
        setupClusterConnection("cluster0", 0, 1, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false);
        setupClusterConnection("cluster1", 1, 0, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false);
    }

    protected boolean isNetty() {
        return true;
    }

    protected ConfigurationImpl createBasicConfig(int i) {
        ConfigurationImpl createBasicConfig = super.createBasicConfig(i);
        Map addressesSettings = createBasicConfig.getAddressesSettings();
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setMaxSizeBytes(10240L);
        addressSettings.setPageSizeBytes(5120);
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
        addressesSettings.put("queues", addressSettings);
        if (i == 1) {
            createBasicConfig.setMessageCounterEnabled(true);
        }
        return createBasicConfig;
    }

    @Test
    public void testNonDurableMessageAddedWithPaging() throws Exception {
        testMessageAddedWithPaging(false);
    }

    @Test
    public void testDurableMessageAddedWithPaging() throws Exception {
        testMessageAddedWithPaging(true);
    }

    private void testMessageAddedWithPaging(boolean z) throws Exception {
        startServers(0, 1);
        this.numMsg = 100;
        try {
            setupSessionFactory(0, isNetty());
            setupSessionFactory(1, isNetty());
            createQueue(0, "queues", "queue0", null, false);
            createQueue(1, "queues", "queue0", null, false);
            waitForBindings(1, "queues", 1, 0, true);
            waitForBindings(0, "queues", 1, 0, false);
            addConsumer(1, 1, "queue0", null);
            waitForBindings(0, "queues", 1, 1, false);
            send(0, "queues", this.numMsg, z, null);
            verifyReceiveAllOnSingleConsumer(true, 0, this.numMsg, 1);
            QueueControl queueControl = (QueueControl) this.servers[1].getManagementService().getResource("queue.queue0");
            for (long j = 30000; j > 0 && this.numMsg != queueControl.getMessagesAdded(); j -= 1000) {
                Thread.sleep(1000L);
            }
            assertEquals(this.numMsg, queueControl.getMessagesAdded());
            stopServers(0, 1);
        } catch (Throwable th) {
            stopServers(0, 1);
            throw th;
        }
    }

    @Test
    public void testMessageCounterWithPaging() throws Exception {
        startServers(0, 1);
        try {
            setupSessionFactory(0, isNetty());
            setupSessionFactory(1, isNetty());
            createQueue(0, "queues", "queue0", null, false);
            createQueue(1, "queues", "queue0", null, false);
            waitForBindings(1, "queues", 1, 0, true);
            waitForBindings(0, "queues", 1, 0, false);
            Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredMessageCounterTest.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ClusteredMessageCounterTest.this.send(0, "queues", ClusteredMessageCounterTest.this.numMsg, true, null);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            QueueControl queueControl = (QueueControl) this.servers[1].getManagementService().getResource("queue.queue0");
            ((ActiveMQServerControl) this.servers[1].getManagementService().getResource("broker")).setMessageCounterSamplePeriod(300L);
            CountDownLatch countDownLatch = new CountDownLatch(40);
            this.timer1.schedule(new MessageCounterCollector(queueControl, countDownLatch), 0L);
            this.timer2.schedule(new PeriodicalReceiver(50, 1, 100L), 0L);
            thread.start();
            try {
                countDownLatch.await(120L, TimeUnit.SECONDS);
                this.stopFlag.set(true);
                thread.join();
                for (MessageCounterInfo messageCounterInfo : this.results) {
                    assertTrue("countDelta should be positive " + messageCounterInfo.getCountDelta() + dumpResults(this.results), messageCounterInfo.getCountDelta() >= 0);
                }
                this.timer1.cancel();
                this.timer2.cancel();
                stopServers(0, 1);
            } catch (Throwable th) {
                this.stopFlag.set(true);
                throw th;
            }
        } catch (Throwable th2) {
            this.timer1.cancel();
            this.timer2.cancel();
            stopServers(0, 1);
            throw th2;
        }
    }

    private String dumpResults(List<MessageCounterInfo> list) {
        StringBuilder sb = new StringBuilder("\n");
        for (int i = 0; i < list.size(); i++) {
            sb.append("result[" + i + "]: " + list.get(i).getCountDelta() + " " + list.get(i).getCount() + "\n");
        }
        return sb.toString();
    }
}
