/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.conversions;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AmqpAndMqttTest {
    protected BrokerService broker;
    private TransportConnector amqpConnector;
    private TransportConnector mqttConnector;

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setAdvisorySupport(false);
        broker.setSchedulerSupport(false);
        this.amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
        this.mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
        return broker;
    }

    @Test(timeout=60000L)
    public void testFromMqttToAmqp() throws Exception {
        Connection amqp = this.createAmqpConnection();
        Session session = amqp.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)session.createTopic("topic://FOO"));
        BlockingConnection mqtt = this.createMQTTConnection().blockingConnection();
        mqtt.connect();
        byte[] payload = this.bytes("Hello World");
        mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
        mqtt.disconnect();
        Message msg = consumer.receive(5000L);
        Assert.assertNotNull((Object)msg);
        Assert.assertTrue((boolean)(msg instanceof BytesMessage));
        BytesMessage bmsg = (BytesMessage)msg;
        byte[] actual = new byte[(int)bmsg.getBodyLength()];
        bmsg.readBytes(actual);
        Assert.assertTrue((boolean)Arrays.equals(actual, payload));
        amqp.close();
    }

    private byte[] bytes(String value) {
        try {
            return value.getBytes("UTF-8");
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }

    protected MQTT createMQTTConnection() throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setConnectAttemptsMax(1L);
        mqtt.setReconnectAttemptsMax(0L);
        mqtt.setHost("localhost", this.mqttConnector.getConnectUri().getPort());
        return mqtt;
    }

    public Connection createAmqpConnection() throws Exception {
        String amqpURI = "amqp://localhost:" + this.amqpConnector.getConnectUri().getPort();
        JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.createConnection();
        connection.setExceptionListener(new ExceptionListener(){

            public void onException(JMSException exception) {
                exception.printStackTrace();
            }
        });
        connection.start();
        return connection;
    }
}

