package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;

import jakarta.jms.Connection;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.class */
public class JMSNotificationTest extends MultiprotocolJMSClientTestSupport {
    private ClientConsumer notificationConsumer;
    private String clientID;

    @Override // org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.clientID = RandomUtil.randomString();
        ClientSession addClientSession = addClientSession(addSessionFactory(addServerLocator(createInVMNonHALocator()).createSessionFactory()).createSession(false, true, true));
        addClientSession.start();
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        addClientSession.createQueue(QueueConfiguration.of(randomSimpleString).setAddress(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress()).setDurable(false));
        this.notificationConsumer = addClientConsumer(addClientSession.createConsumer(randomSimpleString));
    }

    private void flush() throws ActiveMQException {
        do {
        } while (this.notificationConsumer.receiveImmediate() != null);
    }

    @Override // org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport
    protected void addConfiguration(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getConfiguration().addAcceptorConfiguration("invm", "vm://0");
    }

    @Timeout(30)
    @Test
    public void testConsumerNotificationAMQP() throws Exception {
        testConsumerNotifications(createConnection(getBrokerQpidJMSConnectionURI(), null, null, this.clientID, true));
    }

    @Timeout(30)
    @Test
    public void testConsumerNotificationCore() throws Exception {
        testConsumerNotifications(createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, this.clientID, true));
    }

    @Timeout(30)
    @Test
    public void testConsumerNotificationOpenWire() throws Exception {
        testConsumerNotifications(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, this.clientID, true));
    }

    private void testConsumerNotifications(Connection connection) throws Exception {
        try {
            flush();
            Session createSession = connection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(getTopicName());
            flush();
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "mySub");
            ClientMessage receiveNotification = receiveNotification(CoreNotificationType.CONSUMER_CREATED, this.notificationConsumer);
            validateClientIdOnNotification(receiveNotification, CoreNotificationType.CONSUMER_CREATED);
            String validatePropertyOnNotification = validatePropertyOnNotification(receiveNotification, CoreNotificationType.CONSUMER_CREATED, ManagementHelper.HDR_CONSUMER_NAME, null, false);
            createDurableSubscriber.close();
            ClientMessage receiveNotification2 = receiveNotification(CoreNotificationType.CONSUMER_CLOSED, this.notificationConsumer);
            validateClientIdOnNotification(receiveNotification2, CoreNotificationType.CONSUMER_CLOSED);
            validatePropertyOnNotification(receiveNotification2, CoreNotificationType.CONSUMER_CLOSED, ManagementHelper.HDR_CONSUMER_NAME, validatePropertyOnNotification, true);
            createSession.unsubscribe("mySub");
            connection.close();
        } catch (Throwable th) {
            connection.close();
            throw th;
        }
    }

    ClientMessage receiveNotification(CoreNotificationType coreNotificationType, ClientConsumer clientConsumer) throws Exception {
        ClientMessage receive;
        do {
            receive = clientConsumer.receive(1000L);
            if (receive == null) {
                return null;
            }
        } while (!String.valueOf(receive.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)).equals(coreNotificationType.toString()));
        return receive;
    }

    @Timeout(30)
    @Test
    public void testSessionNotificationAMQP() throws Exception {
        testSessionNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, null, this.clientID, true));
    }

    @Timeout(30)
    @Test
    public void testSessionNotificationCore() throws Exception {
        testSessionNotification(createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, this.clientID, true));
    }

    @Timeout(30)
    @Test
    public void testSessionNotificationOpenWire() throws Exception {
        testSessionNotification(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, this.clientID, true));
    }

    private void testSessionNotification(Connection connection) throws Exception {
        try {
            flush();
            Session createSession = connection.createSession(false, 1);
            validateClientIdOnNotification(this.notificationConsumer.receive(1000L), CoreNotificationType.SESSION_CREATED);
            createSession.close();
            validateClientIdOnNotification(this.notificationConsumer.receive(1000L), CoreNotificationType.SESSION_CLOSED);
        } finally {
            connection.close();
        }
    }

    private void validateClientIdOnNotification(Message message, CoreNotificationType coreNotificationType) {
        validatePropertyOnNotification(message, coreNotificationType, ManagementHelper.HDR_CLIENT_ID, this.clientID, true);
    }

    private String validatePropertyOnNotification(Message message, CoreNotificationType coreNotificationType, SimpleString simpleString, String str, boolean z) {
        Assertions.assertNotNull(message);
        Assertions.assertEquals(coreNotificationType.toString(), message.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE));
        Assertions.assertTrue(message.getPropertyNames().contains(simpleString));
        if (z) {
            Assertions.assertEquals(str, message.getStringProperty(simpleString));
        }
        return message.getStringProperty(simpleString);
    }
}
