package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Arrays;
import java.util.Enumeration;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.QueueBrowser;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

/* loaded from: input_file:org/apache/activemq/usecases/BrowseOverNetworkTest.class */
public class BrowseOverNetworkTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(QueueSubscription.class);
    protected static final int MESSAGE_COUNT = 10;

    /* loaded from: input_file:org/apache/activemq/usecases/BrowseOverNetworkTest$Browser.class */
    public class Browser extends Thread {
        String broker;
        Destination dest;
        int totalCount;
        QueueBrowser browser = null;
        MessageConsumer consumer = null;
        boolean consume = false;

        public Browser(String str, Destination destination) {
            this.broker = str;
            this.dest = destination;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= 5) {
                    return;
                }
                try {
                    try {
                        int browseMessages = BrowseOverNetworkTest.this.browseMessages(BrowseOverNetworkTest.this.createBrowser(this.broker, this.dest), this.broker);
                        if (!this.consume) {
                            this.totalCount = browseMessages;
                        } else if (browseMessages != 0) {
                            MessageConsumer createSyncConsumer = BrowseOverNetworkTest.this.createSyncConsumer(this.broker, this.dest);
                            this.totalCount += browseMessages;
                            for (int i3 = 0; i3 < browseMessages; i3++) {
                                ActiveMQTextMessage receive = createSyncConsumer.receive(1000L);
                                BrowseOverNetworkTest.LOG.info(this.broker + " consumer: " + receive.getText() + " " + receive.getDestination() + " " + receive.getMessageId() + " " + Arrays.toString(receive.getBrokerPath()));
                                if (receive == null) {
                                    break;
                                }
                            }
                        }
                        BrowseOverNetworkTest.LOG.info("browser '" + this.broker + "' browsed " + this.totalCount);
                        Thread.sleep(1000L);
                        try {
                            if (this.browser != null) {
                                this.browser.close();
                            }
                            if (this.consumer != null) {
                                this.consumer.close();
                            }
                        } catch (Exception e) {
                            BrowseOverNetworkTest.LOG.info("Exception closing browser " + e, e);
                        }
                    } catch (Exception e2) {
                        BrowseOverNetworkTest.LOG.info("Exception browsing " + e2, e2);
                        try {
                            if (this.browser != null) {
                                this.browser.close();
                            }
                            if (this.consumer != null) {
                                this.consumer.close();
                            }
                        } catch (Exception e3) {
                            BrowseOverNetworkTest.LOG.info("Exception closing browser " + e3, e3);
                        }
                    }
                } catch (Throwable th) {
                    try {
                        if (this.browser != null) {
                            this.browser.close();
                        }
                        if (this.consumer != null) {
                            this.consumer.close();
                        }
                    } catch (Exception e4) {
                        BrowseOverNetworkTest.LOG.info("Exception closing browser " + e4, e4);
                    }
                    throw th;
                }
            }
        }

        public int getTotalCount() {
            return this.totalCount;
        }
    }

    public void testBrowse() throws Exception {
        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        bridgeBrokers("BrokerA", "BrokerB");
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        sendMessages("BrokerA", createDestination, 10);
        Thread.sleep(1000L);
        int browseMessages = browseMessages("BrokerB", (Destination) createDestination);
        Thread.sleep(1000L);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer("BrokerA", createDestination));
        consumerMessages.waitForMessagesToArrive(10);
        Thread.sleep(1000L);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer("BrokerB", createDestination));
        consumerMessages2.waitForMessagesToArrive(10);
        LOG.info("A+B: " + consumerMessages.getMessageCount() + "+" + consumerMessages2.getMessageCount());
        assertEquals("Consumer on Broker A, should've consumed all messages", 10, consumerMessages.getMessageCount());
        assertEquals("Broker B shouldn't get any messages", 0, browseMessages);
    }

    public void testConsumerInfo() throws Exception {
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker1.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker2.xml"));
        startAllBrokers();
        this.brokers.get("broker1").broker.waitUntilStarted();
        ActiveMQDestination createDestination = createDestination("QUEUE.A,QUEUE.B", false);
        assertEquals("Browsed a message on an empty queue", 0, browseMessages("broker1", (Destination) createDestination));
        Thread.sleep(1000L);
        assertEquals("Browsed a message on an empty queue", 0, browseMessages("broker2", (Destination) createDestination));
    }

    protected NetworkConnector bridgeBrokersWithIncludedDestination(String str, String str2, ActiveMQDestination activeMQDestination, ActiveMQDestination activeMQDestination2) throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers(str, str2, false, 4, true);
        bridgeBrokers.addStaticallyIncludedDestination(activeMQDestination);
        if (activeMQDestination2 != null) {
            bridgeBrokers.addExcludedDestination(activeMQDestination2);
        }
        bridgeBrokers.setPrefetchSize(1);
        return bridgeBrokers;
    }

    public void testAMQ3020() throws Exception {
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker1A.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker1B.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker2A.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker2B.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker3A.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/browse-broker3B.xml"));
        this.brokers.get("broker-1A").broker.waitUntilStarted();
        this.brokers.get("broker-2A").broker.waitUntilStarted();
        this.brokers.get("broker-3A").broker.waitUntilStarted();
        ActiveMQDestination createDestination = createDestination("PROD.FUSESOURCE.3.A,PROD.FUSESOURCE.3.B", false);
        Browser browser = new Browser("broker-3A", createDestination);
        browser.start();
        Thread.sleep(1000L);
        Browser browser2 = new Browser("broker-3B", createDestination);
        browser2.start();
        Thread.sleep(1000L);
        sendMessages("broker-1A", createDestination, 10);
        browser.join();
        browser2.join();
        LOG.info("broker-3A browsed " + browser.getTotalCount());
        LOG.info("broker-3B browsed " + browser2.getTotalCount());
        assertEquals(20, browser.getTotalCount() + browser2.getTotalCount());
    }

    protected int browseMessages(QueueBrowser queueBrowser, String str) throws Exception {
        Enumeration enumeration = queueBrowser.getEnumeration();
        int i = 0;
        while (enumeration.hasMoreElements()) {
            i++;
            ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) enumeration.nextElement();
            LOG.info(str + " browsed: " + activeMQTextMessage.getText() + " " + activeMQTextMessage.getDestination() + " " + activeMQTextMessage.getMessageId() + " " + Arrays.toString(activeMQTextMessage.getBrokerPath()));
        }
        return i;
    }

    protected int browseMessages(String str, Destination destination) throws Exception {
        QueueBrowser createBrowser = createBrowser(str, destination);
        int browseMessages = browseMessages(createBrowser, "browser");
        createBrowser.close();
        return browseMessages;
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
    }
}
