package org.apache.activemq.artemis.tests.integration.management;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonValue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.class */
public class ManagementWithPagingServerTest extends ManagementTestBase {
    private ActiveMQServer server;
    private ClientSession session1;
    private ClientSession session2;
    private ServerLocator locator;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest$ManagementThread.class */
    private class ManagementThread extends Thread {
        private QueueControl queueControl;
        private volatile boolean stop = false;
        private Exception error = null;

        private ManagementThread(QueueControl queueControl) {
            this.queueControl = queueControl;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stop) {
                try {
                    this.queueControl.countMessages((String) null);
                    this.queueControl.listMessagesAsJSON((String) null);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                } catch (Exception e2) {
                    this.error = e2;
                    return;
                }
            }
        }

        public Exception getError() {
            return this.error;
        }

        public void exit() {
            this.stop = true;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest$ReceiverThread.class */
    private class ReceiverThread extends Thread {
        private SimpleString queue;
        private int num;
        private long delay;
        private volatile Exception error = null;

        private ReceiverThread(SimpleString simpleString, int i, long j) {
            this.queue = simpleString;
            this.num = i;
            this.delay = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ClientConsumer createConsumer = ManagementWithPagingServerTest.this.session2.createConsumer(this.queue);
                for (int i = 0; i < this.num; i++) {
                    createConsumer.receive(5000L).acknowledge();
                    ManagementWithPagingServerTest.this.session2.commit();
                    try {
                        Thread.sleep(this.delay);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception e2) {
                this.error = e2;
            }
        }

        public Exception getError() {
            return this.error;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest$SenderThread.class */
    private class SenderThread extends Thread {
        private SimpleString address;
        private int num;
        private long delay;
        private volatile Exception error = null;

        private SenderThread(SimpleString simpleString, int i, long j) {
            this.address = simpleString;
            this.num = i;
            this.delay = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            byte[] bArr = new byte[128];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            for (int i = 1; i <= 128; i++) {
                wrap.put(ActiveMQTestBase.getSamplebyte(i));
            }
            try {
                ClientProducer createProducer = ManagementWithPagingServerTest.this.session1.createProducer(this.address);
                for (int i2 = 0; i2 < this.num; i2++) {
                    ClientMessage createMessage = ManagementWithPagingServerTest.this.session1.createMessage(true);
                    createMessage.setPriority((byte) 1);
                    createMessage.getBodyBuffer().writeBytes(bArr);
                    createProducer.send(createMessage);
                    try {
                        Thread.sleep(this.delay);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception e2) {
                this.error = e2;
            }
        }

        public Exception getError() {
            return this.error;
        }
    }

    @Test
    public void testListMessagesAsJSON() throws Exception {
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        SimpleString randomSimpleString2 = RandomUtil.randomSimpleString();
        this.session1.createQueue(QueueConfiguration.of(randomSimpleString2).setAddress(randomSimpleString));
        QueueControl createManagementControl = createManagementControl(randomSimpleString, randomSimpleString2);
        SenderThread senderThread = new SenderThread(randomSimpleString, 1000, 0L);
        ReceiverThread receiverThread = new ReceiverThread(randomSimpleString2, 1000, 0L);
        senderThread.start();
        senderThread.join();
        Assertions.assertNull(senderThread.getError());
        Assertions.assertEquals(1000, createManagementControl.countMessages((String) null));
        JsonArray readJsonArray = JsonUtil.readJsonArray(createManagementControl.listMessagesAsJSON((String) null));
        ArrayList arrayList = new ArrayList();
        Iterator it = readJsonArray.iterator();
        while (it.hasNext()) {
            arrayList.add(Long.valueOf(((JsonValue) ((JsonValue) it.next()).get("messageID")).longValue()));
        }
        Assertions.assertEquals(1000, readJsonArray.size());
        receiverThread.start();
        receiverThread.join();
        Assertions.assertNull(receiverThread.getError());
        Assertions.assertEquals(0, JsonUtil.readJsonArray(createManagementControl.listMessagesAsJSON((String) null)).size());
    }

    @Test
    public void testListMessagesAsJSONWithFilter() throws Exception {
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        SimpleString randomSimpleString2 = RandomUtil.randomSimpleString();
        this.session1.createQueue(QueueConfiguration.of(randomSimpleString2).setAddress(randomSimpleString));
        QueueControl createManagementControl = createManagementControl(randomSimpleString, randomSimpleString2);
        SimpleString of = SimpleString.of("key");
        long randomLong = RandomUtil.randomLong();
        long j = randomLong + 1;
        String str = of + " =" + randomLong;
        ByteBuffer wrap = ByteBuffer.wrap(new byte[64]);
        for (int i = 1; i <= 64; i++) {
            wrap.put(getSamplebyte(i));
        }
        ClientProducer createProducer = this.session1.createProducer(randomSimpleString);
        for (int i2 = 0; i2 < 1000; i2++) {
            ClientMessage createMessage = this.session1.createMessage(true);
            if (i2 % 2 == 0) {
                createMessage.putLongProperty(of, randomLong);
            } else {
                createMessage.putLongProperty(of, j);
            }
            createProducer.send(createMessage);
        }
        String listMessagesAsJSON = createManagementControl.listMessagesAsJSON(str);
        Assertions.assertNotNull(listMessagesAsJSON);
        JsonArray readJsonArray = JsonUtil.readJsonArray(listMessagesAsJSON);
        Assertions.assertEquals(1000 / 2, readJsonArray.size());
        Assertions.assertEquals(randomLong, Long.parseLong(((JsonValue) readJsonArray.getJsonObject(0).get("key")).toString().replaceAll("\"", "")));
        Assertions.assertEquals(1000 / 2, createManagementControl.countMessages(str));
        ReceiverThread receiverThread = new ReceiverThread(randomSimpleString2, 1000, 1L);
        receiverThread.start();
        receiverThread.join();
    }

    @Test
    public void testListMessagesAsJSONWhilePagingOnGoing() throws Exception {
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        SimpleString randomSimpleString2 = RandomUtil.randomSimpleString();
        this.session1.createQueue(QueueConfiguration.of(randomSimpleString2).setAddress(randomSimpleString));
        QueueControl createManagementControl = createManagementControl(randomSimpleString, randomSimpleString2);
        SenderThread senderThread = new SenderThread(randomSimpleString, 1000, 1L);
        ReceiverThread receiverThread = new ReceiverThread(randomSimpleString2, 1000, 2L);
        ManagementThread managementThread = new ManagementThread(createManagementControl);
        senderThread.start();
        managementThread.start();
        senderThread.join();
        Assertions.assertNull(senderThread.getError());
        receiverThread.start();
        receiverThread.join();
        Assertions.assertNull(receiverThread.getError());
        managementThread.exit();
        managementThread.join();
        Assertions.assertNull(managementThread.getError());
    }

    @Override // org.apache.activemq.artemis.tests.integration.management.ManagementTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setJMXManagementEnabled(true), this.mbeanServer, true));
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(5120).setMaxSizeBytes(10240L).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setManagementBrowsePageSize(1000));
        this.server.start();
        this.locator = createInVMNonHALocator().setBlockOnNonDurableSend(false).setConsumerWindowSize(0);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        this.session1 = createSessionFactory.createSession(false, true, false);
        this.session1.start();
        this.session2 = createSessionFactory.createSession(false, true, false);
        this.session2.start();
    }
}
