package org.apache.activemq.bugs;

import java.lang.Thread;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4607Test.class */
public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Thread.UncaughtExceptionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class);
    public static final int BROKER_COUNT = 3;
    public static final int CONSUMER_COUNT = 1;
    public static final int MESSAGE_COUNT = 0;
    public static final boolean CONDUIT = true;
    public static final int TIMEOUT = 20000;
    protected Map<String, MessageConsumer> consumerMap;
    public boolean duplex = true;
    Map<Thread, Throwable> unhandeledExceptions = new HashMap();

    private void assertNoUnhandeledExceptions() {
        for (Map.Entry<Thread, Throwable> entry : this.unhandeledExceptions.entrySet()) {
            LOG.error("Thread:" + entry.getKey() + " Had unexpected: " + entry.getValue());
        }
        assertTrue("There are no unhandelled exceptions, see: log for detail on: " + this.unhandeledExceptions, this.unhandeledExceptions.isEmpty());
    }

    public NetworkConnector bridge(String str, String str2) throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers(str, str2, true, -1, true);
        bridgeBrokers.setSuppressDuplicateQueueSubscriptions(true);
        bridgeBrokers.setDecreaseNetworkConsumerPriority(true);
        bridgeBrokers.setConsumerTTL(1);
        bridgeBrokers.setDuplex(this.duplex);
        return bridgeBrokers;
    }

    public static Test suite() {
        return suite(AMQ4607Test.class);
    }

    public void initCombos() {
        addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testMigratingConsumer() throws Exception {
        bridge("Broker0", "Broker1");
        if (!this.duplex) {
            bridge("Broker1", "Broker0");
        }
        bridge("Broker1", "Broker2");
        if (!this.duplex) {
            bridge("Broker2", "Broker1");
        }
        bridge("Broker0", "Broker2");
        if (!this.duplex) {
            bridge("Broker2", "Broker0");
        }
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        sendMessages("Broker0", createDestination, 1);
        for (int i = 0; i < 3; i++) {
            MessageConsumer createConsumer = createConsumer("Broker" + i, (Destination) createDestination, "DoNotConsume = 'true'");
            for (int i2 = 0; i2 < 3; i2++) {
                assertExactConsumersConnect("Broker" + i2, createDestination, 1, 20000L);
            }
            assertNoUnhandeledExceptions();
            assertExactMessageCount("Broker" + i, createDestination, 1, 20000L);
            createConsumer.close();
            LOG.info("Check for no consumers..");
            for (int i3 = 0; i3 < 3; i3++) {
                assertExactConsumersConnect("Broker" + i3, createDestination, 0, 20000L);
            }
        }
        MessageConsumer createConsumer2 = createConsumer("Broker2", createDestination);
        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4607Test.1
            public boolean isSatisified() throws Exception {
                return ((JmsMultipleBrokersTestSupport.BrokerItem) AMQ4607Test.this.brokers.get("Broker2")).allMessages.getMessageIds().size() == 1;
            }
        }));
        createConsumer2.close();
    }

    public void testMigratingConsumerFullCircle() throws Exception {
        bridge("Broker0", "Broker1");
        if (!this.duplex) {
            bridge("Broker1", "Broker0");
        }
        bridge("Broker1", "Broker2");
        if (!this.duplex) {
            bridge("Broker2", "Broker1");
        }
        bridge("Broker0", "Broker2");
        if (!this.duplex) {
            bridge("Broker2", "Broker0");
        }
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        this.brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        sendMessages("Broker0", createDestination, 1);
        for (int i = 0; i < 3; i++) {
            MessageConsumer createConsumer = createConsumer("Broker" + i, (Destination) createDestination, "DoNotConsume = 'true'");
            for (int i2 = 0; i2 < 3; i2++) {
                assertExactConsumersConnect("Broker" + i2, createDestination, 1, 20000L);
            }
            assertNoUnhandeledExceptions();
            assertExactMessageCount("Broker" + i, createDestination, 1, 20000L);
            createConsumer.close();
            LOG.info("Check for no consumers..");
            for (int i3 = 0; i3 < 3; i3++) {
                assertExactConsumersConnect("Broker" + i3, createDestination, 0, 20000L);
            }
        }
        LOG.info("Consume from origin...");
        MessageConsumer createConsumer2 = createConsumer("Broker0", createDestination);
        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4607Test.2
            public boolean isSatisified() throws Exception {
                return ((JmsMultipleBrokersTestSupport.BrokerItem) AMQ4607Test.this.brokers.get("Broker0")).allMessages.getMessageIds().size() == 1;
            }
        }));
        createConsumer2.close();
    }

    public void testMigratingConsumerSelectorAwareTrue() throws Exception {
        bridge("Broker0", "Broker1");
        if (!this.duplex) {
            bridge("Broker1", "Broker0");
        }
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        conditionalNetworkBridgeFilterFactory.setSelectorAware(true);
        this.brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        sendMessages("Broker0", createDestination, 1);
        assertExactMessageCount("Broker0", createDestination, 1, 20000L);
        createConsumer("Broker1", (Destination) createDestination, "DoNotConsume = 'true'");
        assertExactConsumersConnect("Broker0", createDestination, 1, 20000L);
        assertExactConsumersConnect("Broker1", createDestination, 1, 20000L);
        assertExactMessageCount("Broker1", createDestination, 1, 20000L);
        assertExactMessageCount("Broker0", createDestination, 0, 20000L);
        MessageConsumer createConsumer = createConsumer("Broker0", createDestination);
        assertExactConsumersConnect("Broker0", createDestination, 2, 20000L);
        assertExactConsumersConnect("Broker1", createDestination, 2, 20000L);
        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4607Test.3
            public boolean isSatisified() throws Exception {
                return ((JmsMultipleBrokersTestSupport.BrokerItem) AMQ4607Test.this.brokers.get("Broker0")).allMessages.getMessageIds().size() == 1;
            }
        }));
        createConsumer.close();
    }

    public void testMigratingConsumerSelectorAwareFalse() throws Exception {
        bridge("Broker0", "Broker1");
        if (!this.duplex) {
            bridge("Broker1", "Broker0");
        }
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        conditionalNetworkBridgeFilterFactory.setSelectorAware(false);
        this.brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        sendMessages("Broker0", createDestination, 1);
        assertExactMessageCount("Broker0", createDestination, 1, 20000L);
        createConsumer("Broker1", (Destination) createDestination, "DoNotConsume = 'true'");
        assertExactConsumersConnect("Broker0", createDestination, 1, 20000L);
        assertExactConsumersConnect("Broker1", createDestination, 1, 20000L);
        assertExactMessageCount("Broker1", createDestination, 1, 20000L);
        assertExactMessageCount("Broker0", createDestination, 0, 20000L);
        MessageConsumer createConsumer = createConsumer("Broker0", createDestination);
        assertExactConsumersConnect("Broker0", createDestination, 2, 20000L);
        assertExactConsumersConnect("Broker1", createDestination, 2, 20000L);
        assertExactMessageCount("Broker1", createDestination, 1, 20000L);
        assertExactMessageCount("Broker0", createDestination, 0, 20000L);
        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4607Test.4
            public boolean isSatisified() throws Exception {
                return ((JmsMultipleBrokersTestSupport.BrokerItem) AMQ4607Test.this.brokers.get("Broker0")).allMessages.getMessageIds().size() == 0;
            }
        }));
        createConsumer.close();
    }

    protected void assertExactMessageCount(final String str, Destination destination, final int i, long j) throws Exception {
        final QueueViewMBean queueViewMBean = (QueueViewMBean) this.brokers.get(str).broker.getManagementContext().newProxyInstance(this.brokers.get(str).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
        assertTrue("Excepected queue depth: " + i + " on: " + str, Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4607Test.5
            public boolean isSatisified() throws Exception {
                long queueSize = queueViewMBean.getQueueSize();
                AMQ4607Test.LOG.info("On " + str + " current queue size for " + queueViewMBean + ", " + queueSize);
                if (i != queueSize) {
                    AMQ4607Test.LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
                }
                return queueSize == ((long) i);
            }
        }, j));
    }

    protected void assertExactConsumersConnect(final String str, Destination destination, final int i, long j) throws Exception {
        final ManagementContext managementContext = this.brokers.get(str).broker.getManagementContext();
        assertTrue("Excepected consumers count: " + i + " on: " + str, Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4607Test.6
            public boolean isSatisified() throws Exception {
                try {
                    QueueViewMBean queueViewMBean = (QueueViewMBean) managementContext.newProxyInstance(((JmsMultipleBrokersTestSupport.BrokerItem) AMQ4607Test.this.brokers.get(str)).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
                    long consumerCount = queueViewMBean.getConsumerCount();
                    AMQ4607Test.LOG.info("On " + str + " current consumer count for " + queueViewMBean + ", " + consumerCount);
                    if (i != consumerCount) {
                        AMQ4607Test.LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
                    }
                    return consumerCount == ((long) i);
                } catch (Exception e) {
                    AMQ4607Test.LOG.warn("Unexpected: " + e, e);
                    return false;
                }
            }
        }, j));
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setUp();
        this.unhandeledExceptions.clear();
        Thread.setDefaultUncaughtExceptionHandler(this);
        for (int i = 0; i < 3; i++) {
            createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true"));
        }
        this.consumerMap = new LinkedHashMap();
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    protected void configureBroker(BrokerService brokerService) {
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        synchronized (this.unhandeledExceptions) {
            this.unhandeledExceptions.put(thread, th);
        }
    }
}
