package org.apache.activemq.conversions;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/conversions/AmqpAndMqttTest.class */
public class AmqpAndMqttTest {
    protected BrokerService broker;
    private TransportConnector amqpConnector;
    private TransportConnector mqttConnector;

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setAdvisorySupport(false);
        brokerService.setSchedulerSupport(false);
        this.amqpConnector = brokerService.addConnector("amqp://0.0.0.0:0");
        this.mqttConnector = brokerService.addConnector("mqtt://0.0.0.0:0");
        return brokerService;
    }

    @Test(timeout = 60000)
    public void testFromMqttToAmqp() throws Exception {
        Connection createAmqpConnection = createAmqpConnection();
        Session createSession = createAmqpConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("topic://FOO"));
        BlockingConnection blockingConnection = createMQTTConnection().blockingConnection();
        blockingConnection.connect();
        byte[] bytes = bytes("Hello World");
        blockingConnection.publish("FOO", bytes, QoS.AT_LEAST_ONCE, false);
        blockingConnection.disconnect();
        BytesMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertTrue(receive instanceof BytesMessage);
        BytesMessage bytesMessage = receive;
        byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
        bytesMessage.readBytes(bArr);
        Assert.assertTrue(Arrays.equals(bArr, bytes));
        createAmqpConnection.close();
    }

    private byte[] bytes(String str) {
        try {
            return str.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }

    protected MQTT createMQTTConnection() throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setConnectAttemptsMax(1L);
        mqtt.setReconnectAttemptsMax(0L);
        mqtt.setHost("localhost", this.mqttConnector.getConnectUri().getPort());
        return mqtt;
    }

    public Connection createAmqpConnection() throws Exception {
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:" + this.amqpConnector.getConnectUri().getPort());
        jmsConnectionFactory.setUsername("admin");
        jmsConnectionFactory.setPassword("password");
        Connection createConnection = jmsConnectionFactory.createConnection();
        createConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.conversions.AmqpAndMqttTest.1
            public void onException(JMSException jMSException) {
                jMSException.printStackTrace();
            }
        });
        createConnection.start();
        return createConnection;
    }
}
