package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Arrays;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/AdvisoryViaNetworkTest.class */
public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(AdvisoryViaNetworkTest.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public BrokerService createBroker(String str) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setBrokerName(str);
        brokerService.addConnector(new URI(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT));
        this.brokers.put(str, new JmsMultipleBrokersTestSupport.BrokerItem(brokerService));
        return brokerService;
    }

    public void testAdvisoryForwarding() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
        createBroker("A");
        createBroker("B");
        bridgeBrokers("A", "B").addStaticallyIncludedDestination(activeMQTopic);
        startAllBrokers();
        verifyPeerBrokerInfo(this.brokers.get("A"), 1);
        MessageConsumer createConsumer = createConsumer("A", activeMQTopic);
        MessageConsumer createConsumer2 = createConsumer("B", activeMQTopic);
        sendMessages("A", new ActiveMQTopic("FOO"), 1);
        MessageIdList consumerMessages = getConsumerMessages("A", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("B", createConsumer2);
        LOG.info("consumerA = " + consumerMessages);
        LOG.info("consumerB = " + consumerMessages2);
        consumerMessages.assertMessagesReceived(2);
        consumerMessages2.assertMessagesReceived(2);
    }

    public void testAdvisoryForwardingDuplexNC() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
        createBroker("A");
        createBroker("B");
        NetworkConnector bridgeBrokers = bridgeBrokers("A", "B");
        bridgeBrokers.addStaticallyIncludedDestination(activeMQTopic);
        bridgeBrokers.setDuplex(true);
        startAllBrokers();
        verifyPeerBrokerInfo(this.brokers.get("A"), 1);
        MessageConsumer createConsumer = createConsumer("A", activeMQTopic);
        MessageConsumer createConsumer2 = createConsumer("B", activeMQTopic);
        sendMessages("A", new ActiveMQTopic("FOO"), 1);
        MessageIdList consumerMessages = getConsumerMessages("A", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("B", createConsumer2);
        LOG.info("consumerA = " + consumerMessages);
        LOG.info("consumerB = " + consumerMessages2);
        consumerMessages.assertMessagesReceived(2);
        consumerMessages2.assertMessagesReceived(2);
    }

    public void testBridgeRelevantAdvisoryNotAvailable() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.FOO");
        createBroker("A");
        createBroker("B");
        bridgeBrokers("A", "B").addStaticallyIncludedDestination(activeMQTopic);
        startAllBrokers();
        verifyPeerBrokerInfo(this.brokers.get("A"), 1);
        MessageConsumer createConsumer = createConsumer("A", activeMQTopic);
        MessageConsumer createConsumer2 = createConsumer("B", activeMQTopic);
        createConsumer("A", new ActiveMQTopic("FOO"));
        MessageIdList consumerMessages = getConsumerMessages("A", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("B", createConsumer2);
        LOG.info("consumerA = " + consumerMessages);
        LOG.info("consumerB = " + consumerMessages2);
        consumerMessages.assertMessagesReceived(1);
        consumerMessages2.assertMessagesReceived(0);
    }

    public void testAdvisoryViaVirtualDest() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("advQ");
        createBroker("A");
        VirtualDestination compositeTopic = new CompositeTopic();
        compositeTopic.setName("ActiveMQ.Advisory.Connection");
        compositeTopic.setForwardOnly(false);
        compositeTopic.setForwardTo(Arrays.asList(activeMQQueue));
        DestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{compositeTopic});
        this.brokers.get("A").broker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
        createBroker("B");
        NetworkConnector bridgeBrokers = bridgeBrokers("A", "B");
        bridgeBrokers.setDuplex(true);
        bridgeBrokers.setPrefetchSize(1);
        startAllBrokers();
        verifyPeerBrokerInfo(this.brokers.get("A"), 1);
        verifyPeerBrokerInfo(this.brokers.get("B"), 1);
        MessageConsumer createConsumer = createConsumer("B", activeMQQueue);
        createConsumer("A", new ActiveMQTopic("FOO"));
        getConsumerMessages("B", createConsumer).waitForMessagesToArrive(2);
        assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.AdvisoryViaNetworkTest.1
            public boolean isSatisified() throws Exception {
                RegionBroker regionBroker = ((JmsMultipleBrokersTestSupport.BrokerItem) AdvisoryViaNetworkTest.this.brokers.get("A")).broker.getRegionBroker();
                AdvisoryViaNetworkTest.LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
                AdvisoryViaNetworkTest.LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
                return regionBroker.getDestinationStatistics().getDequeues().getCount() > 2 && regionBroker.getDestinationStatistics().getInflight().getCount() == 0;
            }
        }));
    }

    private void verifyPeerBrokerInfo(JmsMultipleBrokersTestSupport.BrokerItem brokerItem, final int i) throws Exception {
        final BrokerService brokerService = brokerItem.broker;
        final RegionBroker regionBroker = brokerService.getRegionBroker();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.AdvisoryViaNetworkTest.2
            public boolean isSatisified() throws Exception {
                AdvisoryViaNetworkTest.LOG.info("verify infos " + brokerService.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
                return i == regionBroker.getPeerBrokerInfos().length;
            }
        }, 120000L);
        LOG.info("verify infos " + brokerService.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
        for (BrokerInfo brokerInfo : regionBroker.getPeerBrokerInfos()) {
            LOG.info(brokerInfo.getBrokerName());
        }
        assertEquals(brokerService.getBrokerName(), i, regionBroker.getPeerBrokerInfos().length);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
    }
}
