/*
 * Decompiled with CFR 0.152.
 */
package org.wildfly.camel.test.paho;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.Expression;
import org.apache.camel.PollingConsumer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.as.arquillian.api.ServerSetup;
import org.jboss.as.arquillian.api.ServerSetupTask;
import org.jboss.as.arquillian.container.ManagementClient;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.Asset;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.wildfly.camel.test.common.utils.AvailablePortFinder;
import org.wildfly.extension.camel.CamelAware;

@CamelAware
@RunWith(value=Arquillian.class)
@ServerSetup(value={BrokerSetup.class})
public class PahoIntegrationTest {
    @Deployment
    public static JavaArchive deployment() {
        JavaArchive archive = (JavaArchive)ShrinkWrap.create(JavaArchive.class, (String)"mqtt-tests");
        archive.addAsResource((Asset)new StringAsset(BrokerSetup.TCP_CONNECTION), "tcp-connection");
        return archive;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPahoConsumer() throws Exception {
        DefaultCamelContext camelctx = new DefaultCamelContext();
        camelctx.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() throws Exception {
                ((RouteDefinition)this.from("paho:ComponentTestTopic?brokerUrl=" + PahoIntegrationTest.this.getConnection()).transform((Expression)this.body().prepend((Object)"Hello "))).to("seda:end");
            }
        });
        camelctx.start();
        try {
            PollingConsumer consumer = camelctx.getEndpoint("seda:end").createPollingConsumer();
            consumer.start();
            MqttClient client = null;
            try {
                client = new MqttClient(this.getConnection(), "MqttClient", (MqttClientPersistence)new MemoryPersistence());
                MqttConnectOptions opts = new MqttConnectOptions();
                opts.setCleanSession(true);
                client.connect(opts);
                MqttMessage message = new MqttMessage("Kermit".getBytes());
                message.setQos(2);
                client.publish("ComponentTestTopic", message);
            }
            finally {
                client.disconnect();
            }
            String result = (String)consumer.receive(3000L).getIn().getBody(String.class);
            Assert.assertEquals((Object)"Hello Kermit", (Object)result);
        }
        finally {
            camelctx.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMQTTProducer() throws Exception {
        DefaultCamelContext camelctx = new DefaultCamelContext();
        camelctx.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() throws Exception {
                ((RouteDefinition)this.from("direct:start").transform((Expression)this.body().prepend((Object)"Hello "))).to("paho:ComponentTestTopic?brokerUrl=" + PahoIntegrationTest.this.getConnection());
            }
        });
        camelctx.start();
        try {
            MqttClient client = new MqttClient(this.getConnection(), "MqttClient", (MqttClientPersistence)new MemoryPersistence());
            MqttConnectOptions opts = new MqttConnectOptions();
            opts.setCleanSession(true);
            client.connect(opts);
            client.subscribe("ComponentTestTopic", 2);
            final ArrayList result = new ArrayList();
            final CountDownLatch latch = new CountDownLatch(1);
            client.setCallback(new MqttCallback(){

                public void connectionLost(Throwable cause) {
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    result.add(new String(message.getPayload()));
                    latch.countDown();
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }
            });
            ProducerTemplate producer = camelctx.createProducerTemplate();
            producer.asyncSendBody("direct:start", (Object)"Kermit");
            Assert.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
            Assert.assertEquals((String)"One message", (long)1L, (long)result.size());
            Assert.assertEquals((Object)"Hello Kermit", result.get(0));
        }
        finally {
            camelctx.stop();
        }
    }

    private String getConnection() throws IOException {
        try (BufferedReader br = new BufferedReader(new InputStreamReader(this.getClass().getResourceAsStream("/tcp-connection")));){
            String string = br.readLine();
            return string;
        }
    }

    static class BrokerSetup
    implements ServerSetupTask {
        static final int PORT = AvailablePortFinder.getNextAvailable();
        static final String MQTT_CONNECTION = "mqtt://127.0.0.1:" + PORT;
        static final String TCP_CONNECTION = "tcp://127.0.0.1:" + PORT;
        static final String TEST_TOPIC = "ComponentTestTopic";
        private BrokerService brokerService;

        BrokerSetup() {
        }

        public void setup(ManagementClient managementClient, String containerId) throws Exception {
            this.brokerService = new BrokerService();
            this.brokerService.setPersistent(false);
            this.brokerService.setAdvisorySupport(false);
            this.brokerService.addConnector(MQTT_CONNECTION);
            this.brokerService.start();
        }

        public void tearDown(ManagementClient managementClient, String containerId) throws Exception {
            this.brokerService.stop();
        }
    }
}

