package org.apache.activemq.artemis.tests.integration.consumer;

import jakarta.jms.Connection;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.class */
public class DetectOrphanedConsumerTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Test
    public void testOrphanedConsumerCORE() throws Exception {
        testOrphanedConsumer("CORE");
    }

    @Test
    public void testOrphanedConsumerAMQP() throws Exception {
        testOrphanedConsumer("AMQP");
    }

    @Test
    public void testOrphanedConsumerOpenWire() throws Exception {
        testOrphanedConsumer("OPENWIRE");
    }

    private void testOrphanedConsumer(String str) throws Exception {
        ActiveMQServer createServer = createServer(false, createDefaultConfig(true));
        createServer.start();
        Queue createQueue = createServer.createQueue(QueueConfiguration.of(getName()).setDurable(true).setName(getName()).setRoutingType(RoutingType.ANYCAST));
        Connection createConnection = CFUtil.createConnectionFactory(str, SimpleManagementTest.LOCALHOST).createConnection();
        Objects.requireNonNull(createConnection);
        runAfter(createConnection::close);
        Session createSession = createConnection.createSession(false, 1);
        createSession.createConsumer(createSession.createQueue(getName()));
        Objects.requireNonNull(createQueue);
        Wait.assertEquals(1, createQueue::getConsumerCount, 5000L);
        QueueControl queueControl = (QueueControl) createServer.getManagementService().getResource("queue." + createQueue.getName().toString());
        Assertions.assertNotNull(queueControl);
        String listConsumersAsJSON = queueControl.listConsumersAsJSON();
        logger.debug("json: {}", listConsumersAsJSON);
        JsonArray readJsonArray = JsonUtil.readJsonArray(listConsumersAsJSON);
        Assertions.assertEquals(1, readJsonArray.size());
        Assertions.assertEquals("OK", readJsonArray.getJsonObject(0).getString(ConsumerField.STATUS.getName()));
        createQueue.getConsumers().forEach(consumer -> {
            ServerConsumerImpl serverConsumerImpl = (ServerConsumerImpl) consumer;
            logger.debug("Removing connection for {} on connectionID {}", serverConsumerImpl, serverConsumerImpl.getConnectionID());
            logger.debug("removed {}", createServer.getRemotingService().removeConnection(serverConsumerImpl.getConnectionID()));
        });
        String listConsumersAsJSON2 = queueControl.listConsumersAsJSON();
        logger.debug("json: {}", listConsumersAsJSON2);
        JsonArray readJsonArray2 = JsonUtil.readJsonArray(listConsumersAsJSON2);
        Assertions.assertEquals(1, readJsonArray2.size());
        Assertions.assertEquals("Orphaned", readJsonArray2.getJsonObject(0).getString(ConsumerField.STATUS.getName()));
        ActiveMQServerControl activeMQServerControl = (ActiveMQServerControl) createServer.getManagementService().getResource("broker");
        String string = readJsonArray2.getJsonObject(0).getString(ConsumerField.SESSION.getAlternativeName());
        int i = readJsonArray2.getJsonObject(0).getInt(ConsumerField.SEQUENTIAL_ID.getAlternativeName());
        logger.debug("SessionID{} ConsumerID::{}", string, Integer.valueOf(i));
        Assertions.assertTrue(activeMQServerControl.closeConsumerWithID(string, String.valueOf(i)));
    }
}
