package org.fusesource.fabric.bridge.internal;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.fusesource.fabric.bridge.MessageConverter;
import org.fusesource.fabric.bridge.model.BridgeDestinationsConfig;
import org.fusesource.fabric.bridge.model.BridgedDestination;
import org.fusesource.fabric.bridge.model.BrokerConfig;
import org.fusesource.fabric.bridge.model.DispatchPolicy;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fusesource/fabric/bridge/internal/SourceConnectorTest.class */
public class SourceConnectorTest extends AbstractConnectorTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SourceConnectorTest.class);
    private static final String TEST_HEADER = SourceConnectorTest.class.getName() + ".testHeader";
    private static final String TEST_VALUE = SourceConnectorTest.class.getName() + ".testValue";
    private SourceConnector connector;

    @Before
    public void setUp() throws Exception {
        this.connector = new SourceConnector();
        BrokerConfig brokerConfig = new BrokerConfig();
        brokerConfig.setBrokerUrl("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10");
        this.connector.setLocalBrokerConfig(brokerConfig);
        BrokerConfig brokerConfig2 = new BrokerConfig();
        brokerConfig2.setBrokerUrl("vm://remote?broker.persistent=false&broker.brokerName=remote&jms.prefetchPolicy.queuePrefetch=10");
        this.connector.setRemoteBrokerConfig(brokerConfig2);
        BridgeDestinationsConfig bridgeDestinationsConfig = new BridgeDestinationsConfig();
        bridgeDestinationsConfig.setDestinations(createNewDestinations(null, null));
        this.connector.setOutboundDestinations(bridgeDestinationsConfig);
    }

    @After
    public void tearDown() throws Exception {
        this.connector.destroy();
    }

    @Test
    public void testAfterPropertiesSet() throws Exception {
        this.connector.afterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testMissingLocalBrokerConfig() throws Exception {
        this.connector.setLocalBrokerConfig((BrokerConfig) null);
        testAfterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testMissingLocalDestinations() throws Exception {
        this.connector.setOutboundDestinations((BridgeDestinationsConfig) null);
        testAfterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testMissingRemoteBrokerConfig() throws Exception {
        this.connector.setRemoteBrokerConfig((BrokerConfig) null);
        testAfterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testIgnoredRemoteBrokerConfig() throws Exception {
        this.connector.getOutboundDestinations().setDefaultStagingLocation(false);
        testAfterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidLocalBrokerConfig() throws Exception {
        this.connector.getLocalBrokerConfig().setBrokerUrl((String) null);
        testAfterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidRemoteBrokerConfig() throws Exception {
        this.connector.getRemoteBrokerConfig().setBrokerUrl((String) null);
        testAfterPropertiesSet();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidNoStagingQueue() throws Exception {
        this.connector.getRemoteBrokerConfig().setBrokerUrl((String) null);
        this.connector.getOutboundDestinations().setUseStagingQueue(false);
        testAfterPropertiesSet();
    }

    @Test
    public void testStart() throws Exception {
        this.connector.afterPropertiesSet();
        this.connector.start();
        this.connector.start();
    }

    @Test
    public void testDispatch() throws Exception {
        this.connector.afterPropertiesSet();
        this.connector.start();
        long nanoTime = System.nanoTime();
        for (String str : TEST_SOURCES) {
            sendMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", str, 100, null);
        }
        receiveMessages("vm://remote?broker.persistent=false&broker.brokerName=remote&jms.prefetchPolicy.queuePrefetch=10", "org.fusesource.fabric.bridge.stagingQueue", 100 * TEST_SOURCES.length, new BaseMatcher<TextMessage>() { // from class: org.fusesource.fabric.bridge.internal.SourceConnectorTest.1
            public boolean matches(Object obj) {
                boolean z = false;
                try {
                    z = ((TextMessage) obj).getStringProperty("org.fusesource.fabric.bridge.destinationName").matches("source[1-3]");
                } catch (JMSException e) {
                    Assert.fail(e.getMessage());
                }
                return z;
            }

            public void describeTo(Description description) {
                description.appendText("TextMessage containing org.fusesource.fabric.bridge.destinationName property");
            }
        });
        LOG.info("Test took " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " milliseconds");
    }

    @Test
    public void testDispatchWithMessageConverter() throws Exception {
        this.connector.getOutboundDestinations().getDispatchPolicy().setMessageConverter(new MessageConverter() { // from class: org.fusesource.fabric.bridge.internal.SourceConnectorTest.2
            public Message convert(Message message) throws JMSException {
                message.setStringProperty(SourceConnectorTest.TEST_HEADER, SourceConnectorTest.TEST_VALUE);
                return message;
            }
        });
        this.connector.afterPropertiesSet();
        this.connector.start();
        long nanoTime = System.nanoTime();
        for (String str : TEST_SOURCES) {
            sendMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", str, 100, null);
        }
        receiveMessages("vm://remote?broker.persistent=false&broker.brokerName=remote&jms.prefetchPolicy.queuePrefetch=10", "org.fusesource.fabric.bridge.stagingQueue", 100 * TEST_SOURCES.length, new BaseMatcher<TextMessage>() { // from class: org.fusesource.fabric.bridge.internal.SourceConnectorTest.3
            public boolean matches(Object obj) {
                TextMessage textMessage;
                boolean z;
                boolean z2 = false;
                try {
                    textMessage = (TextMessage) obj;
                } catch (JMSException e) {
                    Assert.fail(e.getMessage());
                }
                if (textMessage.getStringProperty("org.fusesource.fabric.bridge.destinationName").matches("source[1-3]")) {
                    if (SourceConnectorTest.TEST_VALUE.equals(textMessage.getStringProperty(SourceConnectorTest.TEST_HEADER))) {
                        z = true;
                        z2 = z;
                        return z2;
                    }
                }
                z = false;
                z2 = z;
                return z2;
            }

            public void describeTo(Description description) {
                description.appendText("TextMessage containing " + SourceConnectorTest.TEST_HEADER + " property");
            }
        });
        LOG.info("Test took " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " milliseconds");
    }

    @Test
    public void testDispatchWithDefaultListener() throws Exception {
        DispatchPolicy dispatchPolicy = new DispatchPolicy();
        dispatchPolicy.setBatchSize(0L);
        dispatchPolicy.setBatchTimeout(0L);
        Iterator it = this.connector.getDestinationsConfig().getDestinations().iterator();
        while (it.hasNext()) {
            ((BridgedDestination) it.next()).setDispatchPolicy(dispatchPolicy);
        }
        testDispatch();
    }

    @Test
    public void testDispatchWithConcurrentConsumers() throws Exception {
        this.connector.getOutboundDestinations().getDispatchPolicy().setConcurrentConsumers(10);
        testDispatch();
    }

    @Test
    public void testDispatchWithLocalBroker() throws Exception {
        this.connector.getOutboundDestinations().setDefaultStagingLocation(false);
        this.connector.setRemoteBrokerConfig((BrokerConfig) null);
        this.connector.afterPropertiesSet();
        this.connector.start();
        long nanoTime = System.nanoTime();
        for (String str : TEST_SOURCES) {
            sendMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", str, 100, null);
        }
        receiveMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", "org.fusesource.fabric.bridge.stagingQueue", 100 * TEST_SOURCES.length, new BaseMatcher<TextMessage>() { // from class: org.fusesource.fabric.bridge.internal.SourceConnectorTest.4
            public boolean matches(Object obj) {
                boolean z = false;
                try {
                    z = ((TextMessage) obj).getStringProperty("org.fusesource.fabric.bridge.destinationName").matches("source[1-3]");
                } catch (JMSException e) {
                    Assert.fail(e.getMessage());
                }
                return z;
            }

            public void describeTo(Description description) {
                description.appendText("TextMessage containing org.fusesource.fabric.bridge.destinationName property");
            }
        });
        LOG.info("Test took " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " milliseconds");
    }

    @Test
    public void testDispatchWithoutRemoteStagingQueue() throws Exception {
        this.connector.getDestinationsConfig().setUseStagingQueue(false);
        this.connector.afterPropertiesSet();
        this.connector.start();
        long nanoTime = System.nanoTime();
        for (String str : TEST_SOURCES) {
            sendMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", str, 100, null);
        }
        for (final String str2 : TEST_SOURCES) {
            receiveMessages("vm://remote?broker.persistent=false&broker.brokerName=remote&jms.prefetchPolicy.queuePrefetch=10", str2, 100, new BaseMatcher<TextMessage>() { // from class: org.fusesource.fabric.bridge.internal.SourceConnectorTest.5
                public boolean matches(Object obj) {
                    boolean z = false;
                    try {
                        z = ((TextMessage) obj).getStringProperty("org.fusesource.fabric.bridge.destinationName").matches(str2);
                    } catch (JMSException e) {
                        Assert.fail(e.getMessage());
                    }
                    return z;
                }

                public void describeTo(Description description) {
                    description.appendText("TextMessage containing org.fusesource.fabric.bridge.destinationName property");
                }
            });
        }
        LOG.info("Test took " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " milliseconds");
    }

    @Test
    public void testStop() {
        this.connector.stop();
        this.connector.start();
        this.connector.stop();
    }

    @Test
    public void testDestroy() throws Exception {
        this.connector.afterPropertiesSet();
        this.connector.destroy();
        this.connector.start();
        this.connector.stop();
        this.connector.destroy();
        this.connector.start();
        this.connector.destroy();
    }

    @Test
    public void testGetDestinationsConfig() {
        assertNotNull("Null destinations config", this.connector.getDestinationsConfig());
    }

    @Test
    public void testSetDestinationsConfig() throws Exception {
        this.connector.afterPropertiesSet();
        this.connector.setDestinationsConfig(this.connector.getOutboundDestinations());
        this.connector.start();
        this.connector.setDestinationsConfig(this.connector.getOutboundDestinations());
        this.connector.stop();
        this.connector.setDestinationsConfig(this.connector.getOutboundDestinations());
        this.connector.destroy();
        this.connector.setDestinationsConfig(this.connector.getOutboundDestinations());
    }

    @Test
    public void testAddDestinations() throws Exception {
        this.connector.afterPropertiesSet();
        this.connector.addDestinations(createNewDestinations(".1", null));
        this.connector.start();
        this.connector.addDestinations(createNewDestinations(".2", null));
        this.connector.stop();
        this.connector.addDestinations(createNewDestinations(".3", null));
        this.connector.destroy();
        this.connector.addDestinations(createNewDestinations(".4", null));
    }

    @Test
    public void testRemoveDestinations() throws Exception {
        List<BridgedDestination> createNewDestinations = createNewDestinations(null, null);
        this.connector.afterPropertiesSet();
        this.connector.removeDestinations(createNewDestinations);
    }

    @Test
    public void testRemoveDestinationsAfterStart() throws Exception {
        List<BridgedDestination> createNewDestinations = createNewDestinations(null, null);
        this.connector.afterPropertiesSet();
        this.connector.start();
        this.connector.removeDestinations(createNewDestinations);
    }

    @Test
    public void testRemoveDestinationsAfterStop() throws Exception {
        List<BridgedDestination> createNewDestinations = createNewDestinations(null, null);
        this.connector.afterPropertiesSet();
        this.connector.start();
        this.connector.stop();
        this.connector.removeDestinations(createNewDestinations);
    }

    @Test
    public void testRemoveDestinationsAfterDestroy() throws Exception {
        List<BridgedDestination> createNewDestinations = createNewDestinations(null, null);
        this.connector.afterPropertiesSet();
        this.connector.start();
        this.connector.destroy();
        this.connector.removeDestinations(createNewDestinations);
    }
}
