package org.kie.kogito.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-test-utils-1.9.0-SNAPSHOT.jar:org/kie/kogito/kafka/KafkaClient.class */
public class KafkaClient {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaClient.class);
    private final KafkaProducer<String, String> producer;
    private final KafkaConsumer<String, String> consumer;
    private final Object shutdownLock;
    private boolean shutdown;

    public KafkaClient(String str) {
        this(createDefaultProducer(str), createDefaultConsumer(str));
    }

    public KafkaClient(KafkaProducer<String, String> kafkaProducer, KafkaConsumer<String, String> kafkaConsumer) {
        this.shutdownLock = new Object();
        this.shutdown = false;
        this.producer = kafkaProducer;
        this.consumer = kafkaConsumer;
    }

    public void consume(Collection<String> collection, Consumer<String> consumer) {
        this.consumer.subscribe(collection);
        CompletableFuture.runAsync(() -> {
            while (!this.shutdown) {
                synchronized (this.shutdownLock) {
                    Stream map = StreamSupport.stream(this.consumer.poll(Duration.ofMillis(500L)).spliterator(), true).map((v0) -> {
                        return v0.value();
                    });
                    Objects.requireNonNull(consumer);
                    map.forEach((v1) -> {
                        r1.accept(v1);
                    });
                    this.consumer.commitSync();
                }
            }
        });
    }

    public void consume(String str, Consumer<String> consumer) {
        consume(Collections.singletonList(str), consumer);
    }

    public void produce(String str, String str2) {
        this.producer.send(new ProducerRecord<>(str2, str), (recordMetadata, exc) -> {
            Optional.ofNullable(exc).ifPresent(exc -> {
                LOGGER.error("Error publishing message {}", recordMetadata, exc);
            });
        });
    }

    public void shutdown() {
        this.shutdown = true;
        synchronized (this.shutdownLock) {
            this.consumer.close();
        }
        this.producer.close();
    }

    private static KafkaConsumer<String, String> createDefaultConsumer(String str) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put("bootstrap.servers", str);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put("group.id", KafkaClient.class.getName() + "Consumer");
        return new KafkaConsumer<>(properties);
    }

    private static KafkaProducer<String, String> createDefaultProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("client.id", KafkaClient.class.getName() + "Producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }
}
