package org.wildfly.camel.test.kafka;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.gravia.resource.ManifestBuilder;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.Asset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.wildfly.camel.test.common.kafka.EmbeddedKafkaCluster;
import org.wildfly.camel.test.common.utils.TestUtils;
import org.wildfly.camel.test.common.zookeeper.EmbeddedZookeeper;
import org.wildfly.camel.test.kafka.subA.SimpleKafkaPartitioner;
import org.wildfly.camel.test.kafka.subA.SimpleKafkaSerializer;
import org.wildfly.extension.camel.CamelAware;

@CamelAware
@RunWith(Arquillian.class)
/* loaded from: input_file:org/wildfly/camel/test/kafka/KafkaProducerIntegrationTest.class */
public class KafkaProducerIntegrationTest {
    private static final String TOPIC_STRINGS = "test";
    private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
    private static final int KAFKA_PORT = 9092;
    static EmbeddedZookeeper embeddedZookeeper;
    static EmbeddedKafkaCluster embeddedKafkaCluster;

    @Deployment
    public static JavaArchive deployment() {
        JavaArchive create = ShrinkWrap.create(JavaArchive.class, "kafka-producer-tests.jar");
        create.addPackage(EmbeddedKafkaCluster.class.getPackage());
        create.addPackage(EmbeddedZookeeper.class.getPackage());
        create.addClasses(new Class[]{SimpleKafkaSerializer.class, SimpleKafkaPartitioner.class, TestUtils.class});
        create.setManifest(new Asset() { // from class: org.wildfly.camel.test.kafka.KafkaProducerIntegrationTest.1
            public InputStream openStream() {
                ManifestBuilder manifestBuilder = new ManifestBuilder();
                manifestBuilder.addManifestHeader("Dependencies", "org.apache.kafka");
                return manifestBuilder.openStream();
            }
        });
        return create;
    }

    @BeforeClass
    public static void before() throws Exception {
        embeddedZookeeper = new EmbeddedZookeeper();
        embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), Collections.singletonList(Integer.valueOf(KAFKA_PORT)));
        embeddedZookeeper.startup(1, TimeUnit.SECONDS);
        System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
        embeddedKafkaCluster.startup();
        System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
    }

    @AfterClass
    public static void after() throws Exception {
        try {
            embeddedKafkaCluster.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            embeddedZookeeper.shutdown();
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }

    @Test
    public void producedStringMessageIsReceivedByKafka() throws Exception {
        final String str = "kafka:localhost:9092?topic=test&requestRequiredAcks=-1";
        DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
        defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.wildfly.camel.test.kafka.KafkaProducerIntegrationTest.2
            public void configure() throws Exception {
                from("direct:start").to(str);
            }
        });
        defaultCamelContext.start();
        try {
            ProducerTemplate createProducerTemplate = defaultCamelContext.createProducerTemplate();
            sendMessagesInRoute(10, createProducerTemplate, "IT test message", "kafka.PARTITION_KEY", "1");
            sendMessagesInRoute(5, createProducerTemplate, "IT test message in other topic", "kafka.PARTITION_KEY", "1", "kafka.TOPIC", TOPIC_STRINGS_IN_HEADER);
            CountDownLatch countDownLatch = new CountDownLatch(15);
            KafkaConsumer<String, String> createKafkaConsumer = createKafkaConsumer();
            Throwable th = null;
            try {
                try {
                    consumeKafkaMessages(createKafkaConsumer, "test", TOPIC_STRINGS_IN_HEADER, countDownLatch);
                    boolean await = countDownLatch.await(2L, TimeUnit.SECONDS);
                    if (createKafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                createKafkaConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createKafkaConsumer.close();
                        }
                    }
                    Assert.assertTrue("Messages published to the kafka topics were received: " + countDownLatch.getCount(), await);
                    defaultCamelContext.stop();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            defaultCamelContext.stop();
            throw th3;
        }
    }

    @Test
    public void testCustomKafkaSerializer() throws Exception {
        final String str = "kafka:localhost:9092?topic=test&requestRequiredAcks=-1" + ("&serializerClass=" + SimpleKafkaSerializer.class.getName());
        DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
        defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.wildfly.camel.test.kafka.KafkaProducerIntegrationTest.3
            public void configure() throws Exception {
                from("direct:start").to(str);
            }
        });
        defaultCamelContext.start();
        try {
            Assert.assertEquals(ServiceStatus.Started, defaultCamelContext.getStatus());
            defaultCamelContext.stop();
        } catch (Throwable th) {
            defaultCamelContext.stop();
            throw th;
        }
    }

    @Test
    public void testCustomKafkaPartitioner() throws Exception {
        final String str = "kafka:localhost:9092?topic=test&requestRequiredAcks=-1" + ("&partitioner=" + SimpleKafkaPartitioner.class.getName());
        DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
        defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.wildfly.camel.test.kafka.KafkaProducerIntegrationTest.4
            public void configure() throws Exception {
                from("direct:start").to(str);
            }
        });
        defaultCamelContext.start();
        try {
            Assert.assertEquals(ServiceStatus.Started, defaultCamelContext.getStatus());
            defaultCamelContext.stop();
        } catch (Throwable th) {
            defaultCamelContext.stop();
            throw th;
        }
    }

    private KafkaConsumer<String, String> createKafkaConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "DemoConsumer");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("auto.offset.reset", "earliest");
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private void consumeKafkaMessages(KafkaConsumer<String, String> kafkaConsumer, String str, String str2, CountDownLatch countDownLatch) {
        kafkaConsumer.subscribe(Arrays.asList(str, str2));
        boolean z = true;
        while (z) {
            Iterator it = kafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                countDownLatch.countDown();
                if (countDownLatch.getCount() == 0) {
                    z = false;
                }
            }
        }
    }

    private void sendMessagesInRoute(int i, ProducerTemplate producerTemplate, Object obj, String... strArr) {
        HashMap hashMap = new HashMap();
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (i3 >= strArr.length) {
                sendMessagesInRoute(i, producerTemplate, obj, hashMap);
                return;
            } else {
                hashMap.put(strArr[i3], strArr[i3 + 1]);
                i2 = i3 + 2;
            }
        }
    }

    private void sendMessagesInRoute(int i, ProducerTemplate producerTemplate, Object obj, Map<String, Object> map) {
        for (int i2 = 0; i2 < i; i2++) {
            producerTemplate.sendBodyAndHeaders("direct:start", obj, map);
        }
    }
}
