package org.apache.activemq.artemis.tests.integration.jms.server.management;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.json.JsonArray;
import javax.management.Notification;
import javax.management.NotificationFilter;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
import org.apache.activemq.artemis.api.jms.management.SubscriptionInfo;
import org.apache.activemq.artemis.api.jms.management.TopicControl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/server/management/TopicControlTest.class */
public class TopicControlTest extends ManagementTestBase {
    private ActiveMQServer server;
    private JMSServerManagerImpl serverManager;
    private String clientID;
    private String subscriptionName;
    protected ActiveMQTopic topic;
    private String topicBinding = "/topic/" + RandomUtil.randomString();

    @Test
    public void testGetAttributes() throws Exception {
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(this.topic.getTopicName(), createManagementControl.getName());
        Assert.assertEquals(this.topic.getAddress(), createManagementControl.getAddress());
        Assert.assertEquals(Boolean.valueOf(this.topic.isTemporary()), Boolean.valueOf(createManagementControl.isTemporary()));
        String[] registryBindings = createManagementControl.getRegistryBindings();
        Assert.assertEquals(1L, registryBindings.length);
        Assert.assertEquals(this.topicBinding, registryBindings[0]);
    }

    @Test
    public void testGetXXXSubscriptionsCount() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createConsumer(createConnection, this.topic);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID, this.subscriptionName);
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection3, this.topic, this.clientID + "2", this.subscriptionName + "2");
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(3L, createManagementControl.getSubscriptionCount());
        Assert.assertEquals(1L, createManagementControl.getNonDurableSubscriptionCount());
        Assert.assertEquals(2L, createManagementControl.getDurableSubscriptionCount());
        createConnection.close();
        createConnection2.close();
        createConnection3.close();
    }

    @Test
    public void testGetXXXMessagesCount() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createConsumer(createConnection, this.topic);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID, this.subscriptionName);
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection3, this.topic, this.clientID + "_2", this.subscriptionName + "2");
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(0L, createManagementControl.getMessageCount());
        Assert.assertEquals(0L, createManagementControl.getNonDurableMessageCount());
        Assert.assertEquals(0L, createManagementControl.getDurableMessageCount());
        JMSUtil.sendMessages(this.topic, 2);
        waitForMessageCount(6L, createManagementControl, 3000L);
        waitForNonDurableMessageCount(2L, createManagementControl, 3000L);
        waitForDurableMessageCount(4L, createManagementControl, 3000L);
        createConnection.close();
        createConnection2.close();
        createConnection3.close();
    }

    @Test
    public void testListXXXSubscriptionsCount() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createConsumer(createConnection, this.topic);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID, this.subscriptionName);
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection3, this.topic, this.clientID + "2", this.subscriptionName + "2");
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(3L, createManagementControl.listAllSubscriptions().length);
        Assert.assertEquals(1L, createManagementControl.listNonDurableSubscriptions().length);
        Assert.assertEquals(2L, createManagementControl.listDurableSubscriptions().length);
        System.out.println("Json: " + createManagementControl.listAllSubscriptionsAsJSON());
        Assert.assertEquals(3L, JsonUtil.readJsonArray(r0).size());
        createConnection.close();
        createConnection2.close();
        createConnection3.close();
    }

    @Test
    public void testListXXXSubscriptionsAsJSON() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createConsumer(createConnection, this.topic);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID, this.subscriptionName);
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection3, this.topic, this.clientID + "2", this.subscriptionName + "2");
        TopicControl createManagementControl = createManagementControl();
        SubscriptionInfo[] from = SubscriptionInfo.from(createManagementControl.listDurableSubscriptionsAsJSON());
        Assert.assertEquals(2L, from.length);
        List asList = Arrays.asList(this.clientID, this.clientID + "2");
        List asList2 = Arrays.asList(this.subscriptionName, this.subscriptionName + "2");
        Assert.assertTrue(asList.contains(from[0].getClientID()));
        Assert.assertTrue(asList2.contains(from[0].getName()));
        Assert.assertTrue(asList.contains(from[1].getClientID()));
        Assert.assertTrue(asList2.contains(from[1].getName()));
        SubscriptionInfo[] from2 = SubscriptionInfo.from(createManagementControl.listNonDurableSubscriptionsAsJSON());
        Assert.assertEquals(1L, from2.length);
        Assert.assertEquals((Object) null, from2[0].getClientID());
        Assert.assertEquals((Object) null, from2[0].getName());
        Assert.assertEquals(3L, SubscriptionInfo.from(createManagementControl.listAllSubscriptionsAsJSON()).length);
        createConnection.close();
        createConnection2.close();
        createConnection3.close();
    }

    @Test
    public void testListXXXSubscriptionsAsJSONJMS2() throws Exception {
        ConnectionFactory createFactory = JMSUtil.createFactory(InVMConnectorFactory.class.getName(), -1L, -1L);
        JMSContext createContext = createFactory.createContext();
        createContext.createSharedDurableConsumer(this.topic, this.subscriptionName, (String) null);
        JMSContext createContext2 = createFactory.createContext();
        createContext2.createSharedDurableConsumer(this.topic, this.subscriptionName + "2", (String) null);
        JMSContext createContext3 = createFactory.createContext();
        createContext3.createConsumer(this.topic);
        TopicControl createManagementControl = createManagementControl();
        SubscriptionInfo[] from = SubscriptionInfo.from(createManagementControl.listDurableSubscriptionsAsJSON());
        Assert.assertEquals(2L, from.length);
        Assert.assertNull(from[0].getClientID());
        Assert.assertTrue(from[0].getName().equals(this.subscriptionName));
        Assert.assertNull(from[1].getClientID());
        Assert.assertTrue(from[1].getName().equals(this.subscriptionName + "2"));
        SubscriptionInfo[] from2 = SubscriptionInfo.from(createManagementControl.listNonDurableSubscriptionsAsJSON());
        Assert.assertEquals(1L, from2.length);
        Assert.assertNull(from2[0].getClientID());
        Assert.assertNull(from2[0].getName());
        Assert.assertEquals(3L, SubscriptionInfo.from(createManagementControl.listAllSubscriptionsAsJSON()).length);
        createContext.close();
        createContext2.close();
        createContext3.close();
    }

    @Test
    public void testListSubscriptionsAsJSONWithHierarchicalTopics() throws Exception {
        this.serverManager.createTopic(false, "my.jms.#", new String[]{"jms/all"});
        this.serverManager.createTopic(false, "my.jms.A", new String[]{"jms/A"});
        SubscriptionInfo[] from = SubscriptionInfo.from(ManagementControlHelper.createTopicControl(ActiveMQJMSClient.createTopic("my.jms.A"), this.mbeanServer).listDurableSubscriptionsAsJSON());
        Assert.assertEquals(1L, from.length);
        Assert.assertEquals("ActiveMQ", from[0].getClientID());
        Assert.assertEquals("ActiveMQ", from[0].getName());
    }

    @Test
    public void testCountMessagesForSubscription() throws Exception {
        long randomLong = RandomUtil.randomLong();
        long j = randomLong + 1;
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection, this.topic, this.clientID, this.subscriptionName);
        Session createSession = createConnection.createSession(false, 1);
        JMSUtil.sendMessageWithProperty(createSession, (Destination) this.topic, "key", randomLong);
        JMSUtil.sendMessageWithProperty(createSession, (Destination) this.topic, "key", j);
        JMSUtil.sendMessageWithProperty(createSession, (Destination) this.topic, "key", randomLong);
        Iterator it = this.server.getPostOffice().getBindingsForAddress(this.topic.getSimpleAddress()).getBindings().iterator();
        while (it.hasNext()) {
            ((Binding) it.next()).getQueue().flushExecutor();
        }
        Assert.assertEquals(3L, createManagementControl().getMessageCount());
        Assert.assertEquals(2L, r0.countMessagesForSubscription(this.clientID, this.subscriptionName, "key =" + randomLong));
        Assert.assertEquals(1L, r0.countMessagesForSubscription(this.clientID, this.subscriptionName, "key =" + j));
        createConnection.close();
    }

    @Test
    public void testCountMessagesForUnknownSubscription() throws Exception {
        String randomString = RandomUtil.randomString();
        try {
            createManagementControl().countMessagesForSubscription(this.clientID, randomString, (String) null);
            Assert.fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testCountMessagesForUnknownClientID() throws Exception {
        String randomString = RandomUtil.randomString();
        try {
            createManagementControl().countMessagesForSubscription(randomString, this.subscriptionName, (String) null);
            Assert.fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testDropDurableSubscriptionWithExistingSubscription() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection, this.topic, this.clientID, this.subscriptionName);
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(1L, createManagementControl.getDurableSubscriptionCount());
        createConnection.close();
        createManagementControl.dropDurableSubscription(this.clientID, this.subscriptionName);
        Assert.assertEquals(0L, createManagementControl.getDurableSubscriptionCount());
    }

    @Test
    public void testDropDurableSubscriptionWithUnknownSubscription() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection, this.topic, this.clientID, this.subscriptionName);
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(1L, createManagementControl.getDurableSubscriptionCount());
        try {
            createManagementControl.dropDurableSubscription(this.clientID, "this subscription does not exist");
            Assert.fail("should throw an exception");
        } catch (Exception e) {
        }
        Assert.assertEquals(1L, createManagementControl.getDurableSubscriptionCount());
        createConnection.close();
    }

    @Test
    public void testDropAllSubscriptions() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        createConnection.setClientID(this.clientID);
        TopicSubscriber createDurableSubscriber = createConnection.createSession(false, 1).createDurableSubscriber(this.topic, this.subscriptionName);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        createConnection2.setClientID(this.clientID + "2");
        TopicSubscriber createDurableSubscriber2 = createConnection.createSession(false, 1).createDurableSubscriber(this.topic, this.subscriptionName + "2");
        createConnection.start();
        createConnection2.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createProducer(this.topic).send(createSession.createTextMessage("tst1"));
        Assert.assertNotNull(createDurableSubscriber.receive(5000L));
        Assert.assertNotNull(createDurableSubscriber2.receive(5000L));
        createConnection.close();
        createConnection2.close();
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(2L, createManagementControl.getSubscriptionCount());
        createManagementControl.dropAllSubscriptions();
        Assert.assertEquals(0L, createManagementControl.getSubscriptionCount());
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        createConnection3.setClientID(this.clientID);
        Session createSession2 = createConnection3.createSession(false, 1);
        createSession2.createProducer(this.topic).send(createSession2.createTextMessage("tst2"));
    }

    @Test
    public void testRemoveAllMessages() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection, this.topic, this.clientID, this.subscriptionName);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID + "2", this.subscriptionName + "2");
        JMSUtil.sendMessages(this.topic, 3);
        TopicControl createManagementControl = createManagementControl();
        waitForMessageCount(6L, createManagementControl, 3000L);
        Assert.assertEquals(6L, createManagementControl.removeMessages((String) null));
        Assert.assertEquals(0L, createManagementControl.getMessageCount());
        createConnection.close();
        createConnection2.close();
    }

    @Test
    public void testListMessagesForSubscription() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection, this.topic, this.clientID, this.subscriptionName);
        JMSUtil.sendMessages(this.topic, 3);
        Assert.assertEquals(3L, createManagementControl().listMessagesForSubscription(ActiveMQDestination.createQueueNameForDurableSubscription(true, this.clientID, this.subscriptionName)).length);
        createConnection.close();
    }

    @Test
    public void testListMessagesForSubscriptionAsJSON() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection, this.topic, this.clientID, this.subscriptionName);
        String[] sendMessages = JMSUtil.sendMessages(this.topic, 3);
        String listMessagesForSubscriptionAsJSON = createManagementControl().listMessagesForSubscriptionAsJSON(ActiveMQDestination.createQueueNameForDurableSubscription(true, this.clientID, this.subscriptionName));
        Assert.assertNotNull(listMessagesForSubscriptionAsJSON);
        JsonArray readJsonArray = JsonUtil.readJsonArray(listMessagesForSubscriptionAsJSON);
        Assert.assertEquals(3L, readJsonArray.size());
        for (int i = 0; i < 3; i++) {
            Assert.assertEquals(sendMessages[i], readJsonArray.getJsonObject(i).getString("JMSMessageID"));
        }
        createConnection.close();
    }

    @Test
    public void testListMessagesForSubscriptionWithUnknownClientID() throws Exception {
        String randomString = RandomUtil.randomString();
        try {
            createManagementControl().listMessagesForSubscription(ActiveMQDestination.createQueueNameForDurableSubscription(true, randomString, this.subscriptionName));
            Assert.fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testListMessagesForSubscriptionWithUnknownSubscription() throws Exception {
        String randomString = RandomUtil.randomString();
        try {
            createManagementControl().listMessagesForSubscription(ActiveMQDestination.createQueueNameForDurableSubscription(true, this.clientID, randomString));
            Assert.fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testGetMessagesAdded() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createConsumer(createConnection, this.topic);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID, this.subscriptionName);
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        JMSUtil.createDurableSubscriber(createConnection3, this.topic, this.clientID + "2", this.subscriptionName + "2");
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(0L, createManagementControl.getMessagesAdded());
        JMSUtil.sendMessages(this.topic, 2);
        waitForAddedMessageCount(6L, createManagementControl, 3000L);
        createConnection.close();
        createConnection2.close();
        createConnection3.close();
    }

    @Test
    public void testGetMessagesDelivering() throws Exception {
        Connection createConnection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        MessageConsumer createConsumer = JMSUtil.createConsumer(createConnection, this.topic, 2);
        Connection createConnection2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        TopicSubscriber createDurableSubscriber = JMSUtil.createDurableSubscriber(createConnection2, this.topic, this.clientID, this.subscriptionName, 2);
        Connection createConnection3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
        TopicSubscriber createDurableSubscriber2 = JMSUtil.createDurableSubscriber(createConnection3, this.topic, this.clientID + "2", this.subscriptionName + "2", 2);
        TopicControl createManagementControl = createManagementControl();
        Assert.assertEquals(0L, createManagementControl.getDeliveringCount());
        JMSUtil.sendMessages(this.topic, 2);
        Assert.assertEquals(0L, createManagementControl.getDeliveringCount());
        createConnection.start();
        createConnection2.start();
        createConnection3.start();
        Message message = null;
        Message message2 = null;
        Message message3 = null;
        for (int i = 0; i < 2; i++) {
            message = createConsumer.receive(5000L);
            Assert.assertNotNull(message);
            message2 = createDurableSubscriber.receive(5000L);
            Assert.assertNotNull(message2);
            message3 = createDurableSubscriber2.receive(5000L);
            Assert.assertNotNull(message3);
        }
        Assert.assertEquals(6L, createManagementControl.getDeliveringCount());
        message.acknowledge();
        Assert.assertEquals(4L, createManagementControl.getDeliveringCount());
        message2.acknowledge();
        Assert.assertEquals(2L, createManagementControl.getDeliveringCount());
        message3.acknowledge();
        Assert.assertEquals(0L, createManagementControl.getDeliveringCount());
        createConnection.close();
        createConnection2.close();
        createConnection3.close();
    }

    @Test
    public void testCreateTopicNotification() throws Exception {
        JMSUtil.JMXListener jMXListener = new JMSUtil.JMXListener();
        this.mbeanServer.addNotificationListener(ObjectNameBuilder.DEFAULT.getJMSServerObjectName(), jMXListener, (NotificationFilter) null, (Object) null);
        new ArrayList().add("invm");
        this.serverManager.createTopic(true, "newTopic", new String[]{"newTopic"});
        Notification notification = jMXListener.getNotification();
        Assert.assertEquals(JMSNotificationType.TOPIC_CREATED.toString(), notification.getType());
        Assert.assertEquals("newTopic", notification.getMessage());
        this.serverManager.destroyTopic("newTopic");
        Notification notification2 = jMXListener.getNotification();
        Assert.assertEquals(JMSNotificationType.TOPIC_DESTROYED.toString(), notification2.getType());
        Assert.assertEquals("newTopic", notification2.getMessage());
        JMSServerControl createJMSServerControl = ManagementControlHelper.createJMSServerControl(this.mbeanServer);
        createJMSServerControl.createTopic("newTopic");
        Notification notification3 = jMXListener.getNotification();
        Assert.assertEquals(JMSNotificationType.TOPIC_CREATED.toString(), notification3.getType());
        Assert.assertEquals("newTopic", notification3.getMessage());
        createJMSServerControl.destroyTopic("newTopic");
        Notification notification4 = jMXListener.getNotification();
        Assert.assertEquals(JMSNotificationType.TOPIC_DESTROYED.toString(), notification4.getType());
        Assert.assertEquals("newTopic", notification4.getMessage());
    }

    @Override // org.apache.activemq.artemis.tests.integration.management.ManagementTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setJMXManagementEnabled(true), this.mbeanServer, false));
        this.server.start();
        this.serverManager = new JMSServerManagerImpl(this.server);
        this.serverManager.start();
        this.serverManager.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
        this.serverManager.activated();
        this.clientID = RandomUtil.randomString();
        this.subscriptionName = RandomUtil.randomString();
        String randomString = RandomUtil.randomString();
        this.serverManager.createTopic(false, randomString, new String[]{this.topicBinding});
        this.topic = ActiveMQJMSClient.createTopic(randomString);
    }

    protected TopicControl createManagementControl() throws Exception {
        return ManagementControlHelper.createTopicControl(this.topic, this.mbeanServer);
    }

    private void waitForMessageCount(long j, TopicControl topicControl, long j2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (j == topicControl.getMessageCount()) {
                return;
            } else {
                Thread.sleep(100L);
            }
        }
        assertEquals(j, topicControl.getMessageCount());
    }

    private void waitForNonDurableMessageCount(long j, TopicControl topicControl, long j2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (j == topicControl.getNonDurableMessageCount()) {
                return;
            } else {
                Thread.sleep(100L);
            }
        }
        assertEquals(j, topicControl.getNonDurableMessageCount());
    }

    private void waitForDurableMessageCount(long j, TopicControl topicControl, long j2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (j == topicControl.getDurableMessageCount()) {
                return;
            } else {
                Thread.sleep(100L);
            }
        }
        assertEquals(j, topicControl.getDurableMessageCount());
    }

    private void waitForAddedMessageCount(long j, TopicControl topicControl, long j2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (j == topicControl.getMessagesAdded()) {
                return;
            } else {
                Thread.sleep(100L);
            }
        }
        assertEquals(j, topicControl.getMessagesAdded());
    }
}
