package org.kie.kogito.test.quarkus.kafka;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/test/quarkus/kafka/KafkaTestClient.class */
public class KafkaTestClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestClient.class);
    private static final int TIMEOUT = 10;
    private KafkaProducer<String, String> producer;
    private KafkaConsumer<String, String> consumer;
    private Vertx vertx = Vertx.vertx();
    private String hosts;

    public KafkaTestClient(String str) {
        this.hosts = str;
    }

    public KafkaTestClient(KafkaProducer<String, String> kafkaProducer, KafkaConsumer<String, String> kafkaConsumer) {
        this.producer = kafkaProducer;
        this.consumer = kafkaConsumer;
    }

    private KafkaConsumer<String, String> createDefaultConsumer(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("enable.auto.commit", "true");
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("bootstrap.servers", str);
        hashMap.put("key.deserializer", StringDeserializer.class.getName());
        hashMap.put("value.deserializer", StringDeserializer.class.getName());
        hashMap.put("group.id", KafkaTestClient.class.getName() + "Consumer");
        return KafkaConsumer.create(this.vertx, hashMap);
    }

    private KafkaProducer<String, String> createDefaultProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("acks", "1");
        properties.put("client.id", KafkaTestClient.class.getName() + "Producer");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return KafkaProducer.create(this.vertx, properties);
    }

    public void consume(Set<String> set, Consumer<String> consumer) {
        if (this.consumer == null) {
            this.consumer = createDefaultConsumer(this.hosts);
        } else {
            waitForCompletion(this.consumer.unsubscribe());
        }
        this.consumer.handler(kafkaConsumerRecord -> {
            consumer.accept((String) kafkaConsumerRecord.value());
        });
        waitForCompletion(this.consumer.subscribe(set).onSuccess(r5 -> {
            LOGGER.debug("Kafka consumer subscribed to topic(s): {}", set);
        }));
    }

    public void consume(String str, Consumer<String> consumer) {
        consume(Collections.singleton(str), consumer);
    }

    public void produce(String str, String str2) {
        if (this.producer == null) {
            this.producer = createDefaultProducer(this.hosts);
        }
        LOGGER.info("Publishing event with data {} for topic {}", str, str2);
        this.producer.send(KafkaProducerRecord.create(str2, str), this::produceCallback);
        this.producer.flush();
    }

    public void produceCallback(AsyncResult<RecordMetadata> asyncResult) {
        if (asyncResult.failed()) {
            LOGGER.error("Event publishing failed", asyncResult.cause());
        } else {
            LOGGER.info("Event published {}", asyncResult.result());
        }
    }

    public void shutdown() {
        if (this.producer != null) {
            waitForCompletion(this.producer.close());
        }
        if (this.consumer != null) {
            waitForCompletion(this.consumer.close());
        }
    }

    public void waitForCompletion(Future future) {
        try {
            future.toCompletionStage().toCompletableFuture().get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof RuntimeException)) {
                throw new KafkaException(e2.getCause());
            }
            throw ((RuntimeException) e2.getCause());
        } catch (TimeoutException e3) {
            throw new IllegalStateException(e3);
        }
    }
}
