package org.wildfly.camel.test.paho;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.PollingConsumer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.as.arquillian.api.ServerSetup;
import org.jboss.as.arquillian.api.ServerSetupTask;
import org.jboss.as.arquillian.container.ManagementClient;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.wildfly.camel.test.atom.feed.FeedConstants;
import org.wildfly.camel.test.common.utils.AvailablePortFinder;
import org.wildfly.camel.test.common.utils.EnvironmentUtils;
import org.wildfly.extension.camel.CamelAware;

@CamelAware
@ServerSetup({BrokerSetup.class})
@RunWith(Arquillian.class)
/* loaded from: input_file:org/wildfly/camel/test/paho/PahoIntegrationTest.class */
public class PahoIntegrationTest {

    /* loaded from: input_file:org/wildfly/camel/test/paho/PahoIntegrationTest$BrokerSetup.class */
    static class BrokerSetup implements ServerSetupTask {
        static final int PORT = AvailablePortFinder.getNextAvailable();
        static final String MQTT_CONNECTION = "mqtt://127.0.0.1:" + PORT;
        static final String TCP_CONNECTION = "tcp://127.0.0.1:" + PORT;
        static final String TEST_TOPIC = "ComponentTestTopic";
        private BrokerService brokerService;

        BrokerSetup() {
        }

        public void setup(ManagementClient managementClient, String str) throws Exception {
            this.brokerService = new BrokerService();
            this.brokerService.setPersistent(false);
            this.brokerService.setAdvisorySupport(false);
            this.brokerService.addConnector(MQTT_CONNECTION);
            this.brokerService.start();
        }

        public void tearDown(ManagementClient managementClient, String str) throws Exception {
            this.brokerService.stop();
        }
    }

    @Deployment
    public static JavaArchive deployment() {
        JavaArchive create = ShrinkWrap.create(JavaArchive.class, "mqtt-tests");
        create.addClasses(new Class[]{EnvironmentUtils.class});
        create.addAsResource(new StringAsset(BrokerSetup.TCP_CONNECTION), "tcp-connection");
        return create;
    }

    @Test
    public void testPahoConsumer() throws Exception {
        Assume.assumeFalse("[#1648] PahoIntegrationTest fails on AIX", EnvironmentUtils.isAIX());
        DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
        defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.wildfly.camel.test.paho.PahoIntegrationTest.1
            public void configure() throws Exception {
                from("paho:ComponentTestTopic?brokerUrl=" + PahoIntegrationTest.this.getConnection()).transform(body().prepend("Hello ")).to("seda:end");
            }
        });
        defaultCamelContext.start();
        try {
            PollingConsumer createPollingConsumer = defaultCamelContext.getEndpoint("seda:end").createPollingConsumer();
            createPollingConsumer.start();
            MqttClient mqttClient = null;
            try {
                mqttClient = new MqttClient(getConnection(), "MqttClient", new MemoryPersistence());
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setCleanSession(true);
                mqttClient.connect(mqttConnectOptions);
                MqttMessage mqttMessage = new MqttMessage("Kermit".getBytes());
                mqttMessage.setQos(2);
                mqttClient.publish("ComponentTestTopic", mqttMessage);
                mqttClient.disconnect();
                Assert.assertEquals(FeedConstants.ENTRY_TITLE, (String) createPollingConsumer.receive(3000L).getIn().getBody(String.class));
                defaultCamelContext.stop();
            } catch (Throwable th) {
                mqttClient.disconnect();
                throw th;
            }
        } catch (Throwable th2) {
            defaultCamelContext.stop();
            throw th2;
        }
    }

    @Test
    public void testMQTTProducer() throws Exception {
        Assume.assumeFalse("[#1648] PahoIntegrationTest fails on AIX", EnvironmentUtils.isAIX());
        DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
        defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.wildfly.camel.test.paho.PahoIntegrationTest.2
            public void configure() throws Exception {
                from("direct:start").transform(body().prepend("Hello ")).to("paho:ComponentTestTopic?brokerUrl=" + PahoIntegrationTest.this.getConnection());
            }
        });
        defaultCamelContext.start();
        try {
            MqttClient mqttClient = new MqttClient(getConnection(), "MqttClient", new MemoryPersistence());
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            mqttClient.connect(mqttConnectOptions);
            mqttClient.subscribe("ComponentTestTopic", 2);
            final ArrayList arrayList = new ArrayList();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            mqttClient.setCallback(new MqttCallback() { // from class: org.wildfly.camel.test.paho.PahoIntegrationTest.3
                public void connectionLost(Throwable th) {
                }

                public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                    arrayList.add(new String(mqttMessage.getPayload()));
                    countDownLatch.countDown();
                }

                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                }
            });
            defaultCamelContext.createProducerTemplate().asyncSendBody("direct:start", "Kermit");
            Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assert.assertEquals("One message", 1L, arrayList.size());
            Assert.assertEquals(FeedConstants.ENTRY_TITLE, arrayList.get(0));
            defaultCamelContext.stop();
        } catch (Throwable th) {
            defaultCamelContext.stop();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getConnection() throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(getClass().getResourceAsStream("/tcp-connection")));
        Throwable th = null;
        try {
            String readLine = bufferedReader.readLine();
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            return readLine;
        } catch (Throwable th3) {
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th3;
        }
    }
}
