package org.apache.activemq.network;

import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicRequestor;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource;

/* loaded from: input_file:org/apache/activemq/network/SimpleNetworkTest.class */
public class SimpleNetworkTest {
    protected static final int MESSAGE_COUNT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class);
    protected AbstractApplicationContext context;
    protected Connection localConnection;
    protected Connection remoteConnection;
    protected BrokerService localBroker;
    protected BrokerService remoteBroker;
    protected Session localSession;
    protected Session remoteSession;
    protected ActiveMQTopic included;
    protected ActiveMQTopic excluded;
    protected String consumerName = "durableSubs";

    @Test(timeout = 60000)
    public void testMessageCompression() throws Exception {
        this.localConnection.setUseCompression(true);
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
            ActiveMQMessage receive = createConsumer.receive(3000L);
            Assert.assertNotNull("not null? message: " + i, receive);
            Assert.assertTrue(receive.isCompressed());
        }
        Assert.assertNull(createConsumer.receive(1000L));
    }

    @Test(timeout = 60000)
    public void testRequestReply() throws Exception {
        final MessageProducer createProducer = this.remoteSession.createProducer((Destination) null);
        this.remoteSession.createConsumer(this.included).setMessageListener(new MessageListener() { // from class: org.apache.activemq.network.SimpleNetworkTest.1
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String str = "REPLY: " + textMessage.getText();
                    Destination jMSReplyTo = message.getJMSReplyTo();
                    textMessage.clearBody();
                    textMessage.setText(str);
                    createProducer.send(jMSReplyTo, textMessage);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        TopicRequestor topicRequestor = new TopicRequestor(this.localSession, this.included);
        Thread.sleep(5000L);
        for (int i = 0; i < 10; i++) {
            TextMessage request = topicRequestor.request(this.localSession.createTextMessage("test msg: " + i));
            Assert.assertNotNull(request);
            LOG.info(request.getText());
        }
    }

    @Test(timeout = 60000)
    public void testFiltering() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageConsumer createConsumer2 = this.remoteSession.createConsumer(this.excluded);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        MessageProducer createProducer2 = this.localSession.createProducer(this.excluded);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        createProducer.send(createTextMessage);
        createProducer2.send(createTextMessage);
        Assert.assertNull(createConsumer2.receive(1000L));
        Assert.assertNotNull(createConsumer.receive(1000L));
    }

    @Test(timeout = 60000)
    public void testConduitBridge() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageConsumer createConsumer2 = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 2, this.included);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
            Assert.assertNotNull(createConsumer.receive(1000L));
            Assert.assertNotNull(createConsumer2.receive(1000L));
        }
        Assert.assertNull(createConsumer.receive(1000L));
        Assert.assertNull(createConsumer2.receive(1000L));
    }

    private void waitForConsumerRegistration(final BrokerService brokerService, final int i, final ActiveMQDestination activeMQDestination) throws Exception {
        Assert.assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.SimpleNetworkTest.2
            public boolean isSatisified() throws Exception {
                Object[] array = ((NetworkConnector) brokerService.getNetworkConnectors().get(0)).bridges.values().toArray();
                if (array.length <= 0) {
                    return false;
                }
                SimpleNetworkTest.LOG.info(brokerService + " bridges " + Arrays.toString(array));
                DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) array[0];
                ConcurrentMap localSubscriptionMap = demandForwardingBridgeSupport.getLocalSubscriptionMap();
                SimpleNetworkTest.LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + localSubscriptionMap);
                if (localSubscriptionMap.isEmpty()) {
                    return false;
                }
                for (DemandSubscription demandSubscription : localSubscriptionMap.values()) {
                    if (demandSubscription.getLocalInfo().getDestination().equals(activeMQDestination)) {
                        SimpleNetworkTest.LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
                        return demandSubscription.size() >= i;
                    }
                }
                return false;
            }
        }));
    }

    @Test(timeout = 60000)
    public void testDurableTopicSubForwardMemoryUsage() throws Exception {
        TopicSubscriber createDurableSubscriber = this.remoteSession.createDurableSubscriber(this.included, this.consumerName);
        Thread.sleep(1000L);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
        }
        Thread.sleep(1000L);
        Assert.assertEquals(10L, this.localBroker.getDestination(this.included).getDestinationStatistics().getForwards().getCount());
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.SimpleNetworkTest.3
            public boolean isSatisified() throws Exception {
                return SimpleNetworkTest.this.localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
            }
        }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L));
        createDurableSubscriber.close();
    }

    @Test(timeout = 60000)
    public void testTopicSubForwardMemoryUsage() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        Thread.sleep(1000L);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
        }
        Thread.sleep(1000L);
        Assert.assertEquals(10L, this.localBroker.getDestination(this.included).getDestinationStatistics().getForwards().getCount());
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.SimpleNetworkTest.4
            public boolean isSatisified() throws Exception {
                return SimpleNetworkTest.this.localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
            }
        }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L));
        createConsumer.close();
    }

    @Test(timeout = 60000)
    public void testQueueSubForwardMemoryUsage() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("include.test.foo");
        MessageConsumer createConsumer = this.remoteSession.createConsumer(activeMQQueue);
        Thread.sleep(1000L);
        MessageProducer createProducer = this.localSession.createProducer(activeMQQueue);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
        }
        Thread.sleep(1000L);
        Assert.assertEquals(10L, this.localBroker.getDestination(activeMQQueue).getDestinationStatistics().getForwards().getCount());
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.SimpleNetworkTest.5
            public boolean isSatisified() throws Exception {
                return SimpleNetworkTest.this.localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
            }
        }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L));
        createConsumer.close();
    }

    @Test(timeout = 60000)
    public void testDurableStoreAndForward() throws Exception {
        this.remoteSession.createDurableSubscriber(this.included, this.consumerName);
        Thread.sleep(1000L);
        doTearDown();
        doSetUp(false);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
        }
        Thread.sleep(1000L);
        Assert.assertEquals(10L, this.localBroker.getDestination(this.included).getDestinationStatistics().getForwards().getCount());
        doTearDown();
        doSetUp(false);
        TopicSubscriber createDurableSubscriber = this.remoteSession.createDurableSubscriber(this.included, this.consumerName);
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertNotNull("message count: " + i2, createDurableSubscriber.receive(2500L));
        }
    }

    @Ignore("This seems like a simple use case, but it is problematic to consume an existing topic store, it requires a connection per durable to match that connectionId")
    public void testDurableStoreAndForwardReconnect() throws Exception {
        this.localSession.createDurableSubscriber(this.included, this.consumerName);
        Thread.sleep(5000L);
        doTearDown();
        doSetUp(false);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.localSession.createTextMessage("test-" + i));
        }
        Thread.sleep(5000L);
        TopicSubscriber createDurableSubscriber = this.localSession.createDurableSubscriber(this.included, this.consumerName);
        LOG.info("Consume from local consumer: " + createDurableSubscriber);
        for (int i2 = 0; i2 < 5; i2++) {
            Assert.assertNotNull("message count: " + i2, createDurableSubscriber.receive(2500L));
        }
        Thread.sleep(5000L);
        doTearDown();
        doSetUp(false);
        Thread.sleep(5000L);
        LOG.info("Consume from remote");
        TopicSubscriber createDurableSubscriber2 = this.remoteSession.createDurableSubscriber(this.included, this.consumerName);
        LOG.info("Remote consumer: " + createDurableSubscriber2);
        Thread.sleep(5000L);
        for (int i3 = 0; i3 < 5; i3++) {
            Assert.assertNotNull("message count: " + i3, createDurableSubscriber2.receive(DurableSubProcessWithRestartTest.BROKER_RESTART));
        }
    }

    @Before
    public void setUp() throws Exception {
        doSetUp(true);
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    protected void doTearDown() throws Exception {
        this.localConnection.close();
        this.remoteConnection.close();
        this.localBroker.stop();
        this.remoteBroker.stop();
    }

    protected void doSetUp(boolean z) throws Exception {
        this.remoteBroker = createRemoteBroker();
        this.remoteBroker.setDeleteAllMessagesOnStartup(z);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.localBroker = createLocalBroker();
        this.localBroker.setDeleteAllMessagesOnStartup(z);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        this.localConnection = activeMQConnectionFactory.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        this.remoteConnection = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI()).createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.included = new ActiveMQTopic("include.test.bar");
        this.excluded = new ActiveMQTopic("exclude.test.bar");
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected String getRemoteBrokerURI() {
        return "org/apache/activemq/network/remoteBroker.xml";
    }

    protected String getLocalBrokerURI() {
        return "org/apache/activemq/network/localBroker.xml";
    }

    protected BrokerService createBroker(String str) throws Exception {
        new BrokerFactoryBean(new ClassPathResource(str));
        BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean(new ClassPathResource(str));
        brokerFactoryBean.afterPropertiesSet();
        return brokerFactoryBean.getBroker();
    }

    protected BrokerService createLocalBroker() throws Exception {
        return createBroker(getLocalBrokerURI());
    }

    protected BrokerService createRemoteBroker() throws Exception {
        return createBroker(getRemoteBrokerURI());
    }
}
