package org.kie.kogito.test.quarkus.kafka;

import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-quarkus-test-utils-1.38.0-SNAPSHOT.jar:org/kie/kogito/test/quarkus/kafka/KafkaTypedTestClient.class */
public class KafkaTypedTestClient<T, S extends Serializer<T>, D extends Deserializer<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaTypedTestClient.class);
    private static final int TIMEOUT = 10;
    private ExecutorService executorService;
    private KafkaConsumerLoop<T> consumer;
    private Class<S> serializer;
    private Class<D> deserializer;
    private String hosts;

    public KafkaTypedTestClient(String str, Class<S> cls, Class<D> cls2) {
        this.hosts = str;
        this.serializer = cls;
        this.deserializer = cls2;
    }

    private KafkaConsumer<String, T> createDefaultConsumer(String str) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put("bootstrap.servers", str);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.deserializer.getName());
        properties.put("group.id", KafkaTypedTestClient.class.getName() + "Consumer");
        return new KafkaConsumer<>(properties);
    }

    private KafkaProducer<String, T> createDefaultProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put("client.id", KafkaTypedTestClient.class.getName() + "Producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.serializer.getName());
        return new KafkaProducer<>(properties);
    }

    public void consume(Set<String> set, Consumer<T> consumer) {
        if (this.consumer != null) {
            shutdown();
        }
        this.executorService = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.consumer = new KafkaConsumerLoop<>(createDefaultConsumer(this.hosts), set, consumer, r3 -> {
            countDownLatch.countDown();
            return null;
        });
        this.executorService.execute(this.consumer);
        try {
            if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
            } else {
                throw new IllegalStateException(String.format("Timeout while waiting for KafkaTestClient to subscribe to topics: %s", set));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void consume(String str, Consumer<T> consumer) {
        consume(Collections.singleton(str), consumer);
    }

    public void produce(T t, String str) {
        KafkaProducer<String, T> createDefaultProducer = createDefaultProducer(this.hosts);
        try {
            LOGGER.info("Publishing event with data {} for topic {}", t, str);
            waitForCompletion(createDefaultProducer.send(new ProducerRecord<>(str, t)));
            if (createDefaultProducer != null) {
                createDefaultProducer.close();
            }
        } catch (Throwable th) {
            if (createDefaultProducer != null) {
                try {
                    createDefaultProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void shutdown() {
        if (this.consumer != null) {
            this.consumer.shutdown();
            this.consumer = null;
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    this.executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.executorService.shutdownNow();
            }
            this.executorService = null;
        }
    }

    public void waitForCompletion(Future<RecordMetadata> future) {
        try {
            future.get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof RuntimeException)) {
                throw new KafkaException(e2.getCause());
            }
            throw ((RuntimeException) e2.getCause());
        } catch (TimeoutException e3) {
            throw new IllegalStateException(e3);
        }
    }
}
