package org.apache.activemq.usecases;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheViewMBean;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.class */
public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerVirtualTopicSelectorAwareForwardingTest.class);
    private static final String PERSIST_SELECTOR_CACHE_FILE_BASEPATH = "./target/selectorCache-";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest$ProducerThreadTester.class */
    public class ProducerThreadTester extends ProducerThread {
        private Set<String> selectors;
        private Map<String, AtomicInteger> selectorCounts;
        private Random rand;

        public ProducerThreadTester(Session session, Destination destination) {
            super(session, destination);
            this.selectors = new LinkedHashSet();
            this.selectorCounts = new HashMap();
            this.rand = new Random(System.currentTimeMillis());
        }

        protected Message createMessage(int i) throws Exception {
            TextMessage createTextMessage = TwoBrokerVirtualTopicSelectorAwareForwardingTest.this.createTextMessage(this.session, "Message-" + i);
            if (this.selectors.size() > 0) {
                String randomKey = getRandomKey();
                createTextMessage.setStringProperty("SYMBOL", randomKey);
                this.selectorCounts.get(randomKey).incrementAndGet();
            }
            return createTextMessage;
        }

        public void resetCounters() {
            super.resetCounters();
            Iterator<String> it = this.selectorCounts.keySet().iterator();
            while (it.hasNext()) {
                this.selectorCounts.put(it.next(), new AtomicInteger(0));
            }
        }

        private String getRandomKey() {
            ArrayList arrayList = new ArrayList(this.selectors);
            return (String) arrayList.get(this.rand.nextInt(arrayList.size()));
        }

        public void addMessageProperty(String str) {
            if (this.selectors.contains(str)) {
                return;
            }
            this.selectors.add(str);
            this.selectorCounts.put(str, new AtomicInteger(0));
        }

        public int getCountForProperty(String str) {
            return this.selectorCounts.get(str).get();
        }
    }

    public void testJMX() throws Exception {
        clearSelectorCacheFiles();
        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        createConsumer("BrokerB", (Destination) createDestination("Consumer.B.VirtualTopic.tempTopic", false), "foo = 'bar'");
        final BrokerService brokerService = this.brokers.get("BrokerA").broker;
        VirtualDestinationSelectorCacheViewMBean virtualDestinationSelectorCacheMBean = getVirtualDestinationSelectorCacheMBean(brokerService);
        Set selectorsForDestination = virtualDestinationSelectorCacheMBean.selectorsForDestination("queue://Consumer.B.VirtualTopic.tempTopic");
        assertEquals(1, selectorsForDestination.size());
        assertTrue(selectorsForDestination.contains("foo = 'bar'"));
        assertTrue(virtualDestinationSelectorCacheMBean.deleteSelectorForDestination("queue://Consumer.B.VirtualTopic.tempTopic", "foo = 'bar'"));
        assertEquals(0, virtualDestinationSelectorCacheMBean.selectorsForDestination("queue://Consumer.B.VirtualTopic.tempTopic").size());
        createConsumer("BrokerB", (Destination) createDestination("Consumer.B.VirtualTopic.tempTopic", false), "ceposta = 'redhat'");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.1
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 2;
            }
        }, 500L);
        assertEquals(1, virtualDestinationSelectorCacheMBean.selectorsForDestination("queue://Consumer.B.VirtualTopic.tempTopic").size());
        virtualDestinationSelectorCacheMBean.deleteAllSelectorsForDestination("queue://Consumer.B.VirtualTopic.tempTopic");
        assertEquals(0, virtualDestinationSelectorCacheMBean.selectorsForDestination("queue://Consumer.B.VirtualTopic.tempTopic").size());
    }

    public void testMessageLeaks() throws Exception {
        clearSelectorCacheFiles();
        startAllBrokers();
        final BrokerService brokerService = this.brokers.get("BrokerA").broker;
        ActiveMQDestination createDestination = createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", (Destination) createDestination, "SYMBOL = 'AAPL'");
        MessageConsumer createConsumer2 = createConsumer("BrokerA", (Destination) createDestination, "SYMBOL = 'AAPL'");
        ProducerThreadTester createProducerTester = createProducerTester("BrokerA", new ActiveMQTopic("VirtualTopic.tempTopic"));
        createProducerTester.setRunIndefinitely(true);
        createProducerTester.setSleep(5);
        createProducerTester.addMessageProperty("AAPL");
        createProducerTester.addMessageProperty("VIX");
        createProducerTester.start();
        LOG.info(">>>> currently sent: total=" + createProducerTester.getSentCount() + ", AAPL=" + createProducerTester.getCountForProperty("AAPL") + ", VIX=" + createProducerTester.getCountForProperty("VIX"));
        Thread.sleep(2000L);
        getConsumerMessages("BrokerA", createConsumer).waitForMessagesToArrive(50, 1000L);
        createConsumer.close();
        createConsumer("BrokerA", (Destination) createDestination, "SYMBOL = 'VIX'");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.2
            public boolean isSatisified() throws Exception {
                return brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size() == 2;
            }
        });
        LOG.info(">>>> currently sent: total=" + createProducerTester.getSentCount() + ", AAPL=" + createProducerTester.getCountForProperty("AAPL") + ", VIX=" + createProducerTester.getCountForProperty("VIX"));
        Thread.sleep(2000L);
        createConsumer2.close();
        createConsumer("BrokerA", (Destination) createDestination, "SYMBOL = 'VIX'");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.3
            public boolean isSatisified() throws Exception {
                return brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size() == 2;
            }
        });
        LOG.info(">>>> currently sent: total=" + createProducerTester.getSentCount() + ", AAPL=" + createProducerTester.getCountForProperty("AAPL") + ", VIX=" + createProducerTester.getCountForProperty("VIX"));
        Thread.sleep(2000L);
        LOG.info(">>>> currently sent: total=" + createProducerTester.getSentCount() + ", AAPL=" + createProducerTester.getCountForProperty("AAPL") + ", VIX=" + createProducerTester.getCountForProperty("VIX"));
        final long count = brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount();
        LOG.info(">>>>> Orphaned messages? " + count);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.4
            public boolean isSatisified() throws Exception {
                return brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount() > count;
            }
        }, 5000L);
        createProducerTester.setRunning(false);
        createProducerTester.join();
        Thread.sleep(1000L);
        assertTrue(brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount() <= count);
    }

    private ProducerThreadTester createProducerTester(String str, Destination destination) throws Exception {
        Connection createConnection = this.brokers.get(str).createConnection();
        createConnection.start();
        ProducerThreadTester producerThreadTester = new ProducerThreadTester(createConnection.createSession(false, 1), destination);
        producerThreadTester.setPersistent(this.persistentDelivery);
        return producerThreadTester;
    }

    public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception {
        clearSelectorCacheFiles();
        startAllBrokers();
        BrokerService brokerService = this.brokers.get("BrokerA").broker;
        ActiveMQDestination createDestination = createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer establishConsumer = establishConsumer("BrokerA", createDestination);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        sendMessages("BrokerA", activeMQTopic, 1);
        establishConsumer.close();
        MessageConsumer createConsumer = createConsumer("BrokerA", (Destination) createDestination, "foo = 'bar'");
        sendMessages("BrokerA", activeMQTopic, 1, asMap("foo", "bar"));
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        consumerMessages.waitForMessagesToArrive(1, 1000L);
        assertEquals(1, consumerMessages.getMessageCount());
        consumerMessages.waitForMessagesToArrive(10, 1000L);
        assertEquals(1, consumerMessages.getMessageCount());
        waitForMessagesToBeConsumed(brokerService, "Consumer.B.VirtualTopic.tempTopic", false, 2, 1, 5000);
        assertEquals(1, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size());
        assertEquals(2L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(1L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(1L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
    }

    private MessageConsumer establishConsumer(String str, ActiveMQDestination activeMQDestination) throws Exception {
        Connection createConnection = this.brokers.get(str).createConnection();
        createConnection.start();
        return createConnection.createSession(false, 1).createConsumer(activeMQDestination);
    }

    public void testSelectorsAndNonSelectors() throws Exception {
        clearSelectorCacheFiles();
        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        final BrokerService brokerService = this.brokers.get("BrokerA").broker;
        BrokerService brokerService2 = this.brokers.get("BrokerB").broker;
        ActiveMQDestination createDestination = createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer createConsumer = createConsumer("BrokerB", (Destination) createDestination, "foo = 'bar'");
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.5
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 2;
            }
        }, 500L);
        assertEquals(2, org.apache.activemq.TestSupport.getDestination(brokerService2, createDestination).getConsumers().size());
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        sendMessages("BrokerA", activeMQTopic, 10, asMap("foo", "bar"));
        sendMessages("BrokerA", activeMQTopic, 10);
        MessageIdList consumerMessages = getConsumerMessages("BrokerB", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        consumerMessages.waitForMessagesToArrive(5, 1000L);
        assertEquals(5, consumerMessages.getMessageCount());
        consumerMessages2.waitForMessagesToArrive(15, 1000L);
        assertEquals(15, consumerMessages2.getMessageCount());
        waitForMessagesToBeConsumed(brokerService, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        assertEquals(20L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(20L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        waitForMessagesToBeConsumed(brokerService2, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        assertEquals(20L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(20L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        createConsumer2.close();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.6
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, 500L);
        consumerMessages.flushMessages();
        sendMessages("BrokerA", activeMQTopic, 10, asMap("ceposta", "redhat"));
        MessageIdList consumerMessages3 = getConsumerMessages("BrokerB", createConsumer);
        consumerMessages3.waitForMessagesToArrive(1, 1000L);
        assertEquals(0, consumerMessages3.getMessageCount());
        waitForMessagesToBeConsumed(brokerService, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        assertEquals(20L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(20L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        waitForMessagesToBeConsumed(brokerService2, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        assertEquals(20L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(20L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        createConsumer.close();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.7
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 0;
            }
        }, 500L);
        consumerMessages3.flushMessages();
        sendMessages("BrokerA", activeMQTopic, 10, asMap("foo", "bar"));
        waitForMessagesToBeConsumed(brokerService, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        assertEquals(30L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(20L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(10L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        waitForMessagesToBeConsumed(brokerService2, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        assertEquals(20L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(20L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        MessageIdList consumerMessages4 = getConsumerMessages("BrokerB", createConsumer("BrokerB", (Destination) createDestination, "foo = 'bar'"));
        consumerMessages4.waitForMessagesToArrive(10);
        assertEquals(10, consumerMessages4.getMessageCount());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.8
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, 500L);
        waitForMessagesToBeConsumed(brokerService, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
        assertEquals(30L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(30L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        waitForMessagesToBeConsumed(brokerService2, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
        assertEquals(30L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        assertEquals(30L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        assertEquals(0L, brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
    }

    public VirtualDestinationSelectorCacheViewMBean getVirtualDestinationSelectorCacheMBean(BrokerService brokerService) throws MalformedObjectNameException {
        return (VirtualDestinationSelectorCacheViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(brokerService.getBrokerObjectName(), "plugin", "virtualDestinationCache"), VirtualDestinationSelectorCacheViewMBean.class, true);
    }

    public void testSelectorAwareForwarding() throws Exception {
        clearSelectorCacheFiles();
        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        BrokerService brokerService = this.brokers.get("BrokerB").broker;
        final BrokerService brokerService2 = this.brokers.get("BrokerA").broker;
        MessageConsumer createConsumer = createConsumer("BrokerB", (Destination) createDestination("Consumer.B.VirtualTopic.tempTopic", false), "foo = 'bar'");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.9
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, 500L);
        org.apache.activemq.broker.region.Destination destination = org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerB").broker, new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
        assertEquals(1, destination.getConsumers().size());
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, activeMQTopic));
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerB").broker, activeMQTopic));
        sendMessages("BrokerA", activeMQTopic, 1, asMap("foo", "bar"));
        sendMessages("BrokerA", activeMQTopic, 1, asMap("ceposta", "redhat"));
        MessageIdList consumerMessages = getConsumerMessages("BrokerB", createConsumer);
        consumerMessages.waitForMessagesToArrive(1);
        consumerMessages.waitForMessagesToArrive(1, 1000L);
        assertEquals(1, consumerMessages.getMessageCount());
        assertEquals(1L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        createConsumer.close();
        brokerService2.stop();
        brokerService2.waitUntilStopped();
        deleteSelectorCacheFile("BrokerA");
        assertEquals(0, destination.getConsumers().size());
        MessageConsumer createConsumer2 = createConsumer("BrokerB", (Destination) createDestination("Consumer.B.VirtualTopic.tempTopic", false), "ceposta = 'redhat'");
        assertEquals(1, destination.getConsumers().size());
        brokerService2.start(true);
        brokerService2.waitUntilStarted();
        System.out.println(brokerService2.getNetworkConnectors());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.10
            org.apache.activemq.broker.region.Destination dest;

            {
                this.dest = brokerService2.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, 500L);
        sendMessages("BrokerA", activeMQTopic, 1, asMap("foo", "bar"));
        sendMessages("BrokerB", activeMQTopic, 1, asMap("foo", "bar"));
        sendMessages("BrokerA", activeMQTopic, 1, asMap("ceposta", "redhat"));
        sendMessages("BrokerB", activeMQTopic, 1, asMap("ceposta", "redhat"));
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        consumerMessages2.waitForMessagesToArrive(2);
        consumerMessages2.waitForMessagesToArrive(1, 1000L);
        assertEquals(2, consumerMessages2.getMessageCount());
        assertEquals(0L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        assertEquals(3L, brokerService.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
    }

    private HashMap<String, Object> asMap(String str, Object obj) {
        HashMap<String, Object> hashMap = new HashMap<>(1);
        hashMap.put(str, obj);
        return hashMap;
    }

    private void bridgeAndConfigureBrokers(String str, String str2) throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers(str, str2, false, 1, false);
        bridgeBrokers.setDecreaseNetworkConsumerPriority(true);
        bridgeBrokers.setDuplex(true);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        String str = new String("?useJmx=false&deleteAllMessagesOnStartup=true");
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + str));
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + str));
    }

    private void clearSelectorCacheFiles() {
        for (String str : new String[]{"BrokerA", "BrokerB"}) {
            deleteSelectorCacheFile(str);
        }
    }

    private void deleteSelectorCacheFile(String str) {
        File file = new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + str);
        if (file.exists()) {
            file.delete();
        }
    }

    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
        BrokerService createBroker = createBroker(uri);
        createBroker.setUseJmx(true);
        VirtualDestination virtualTopic = new VirtualTopic();
        virtualTopic.setSelectorAware(true);
        DestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
        createBroker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
        configurePersistenceAdapter(createBroker);
        BrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
        subQueueSelectorCacheBrokerPlugin.setSingleSelectorPerDestination(true);
        subQueueSelectorCacheBrokerPlugin.setPersistFile(new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + createBroker.getBrokerName()));
        createBroker.setPlugins(new BrokerPlugin[]{subQueueSelectorCacheBrokerPlugin});
        return createBroker;
    }

    protected void configurePersistenceAdapter(BrokerService brokerService) throws IOException {
        File file = new File("target/test-amq-data/kahadb/" + brokerService.getBrokerName());
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(file);
        brokerService.setPersistenceAdapter(kahaDBStore);
    }

    private void waitForMessagesToBeConsumed(final BrokerService brokerService, String str, boolean z, final int i, final int i2, int i3) throws Exception {
        ActiveMQTopic activeMQTopic = z ? new ActiveMQTopic(str) : new ActiveMQQueue(str);
        final ActiveMQTopic activeMQTopic2 = activeMQTopic;
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.11
            public boolean isSatisified() throws Exception {
                return brokerService.getDestination(activeMQTopic2).getDestinationStatistics().getEnqueues().getCount() == ((long) i);
            }
        }, i3);
        final ActiveMQTopic activeMQTopic3 = activeMQTopic;
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TwoBrokerVirtualTopicSelectorAwareForwardingTest.12
            public boolean isSatisified() throws Exception {
                return brokerService.getDestination(activeMQTopic3).getDestinationStatistics().getDequeues().getCount() == ((long) i2);
            }
        }, i3);
    }
}
