package org.kie.kogito.test.springboot.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.kie.kogito.testcontainers.springboot.KafkaSpringBootTestResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;

@ConditionalOnProperty(name = {KafkaSpringBootTestResource.KOGITO_KAFKA_PROPERTY})
@Component
/* loaded from: input_file:org/kie/kogito/test/springboot/kafka/KafkaTestClient.class */
public class KafkaTestClient implements ApplicationListener<ConsumerStartedEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestClient.class);

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaBootstrapServers;
    private KafkaTemplate<String, String> producer;
    private KafkaMessageListenerContainer<String, String> container;
    private CountDownLatch latch = new CountDownLatch(1);

    @PostConstruct
    public void setup() {
        this.producer = new KafkaTemplate<>(producerFactory());
    }

    public KafkaTestClient() {
    }

    public KafkaTestClient(KafkaTemplate<String, String> kafkaTemplate) {
        this.producer = kafkaTemplate;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        HashMap hashMap = new HashMap();
        hashMap.put("enable.auto.commit", "true");
        hashMap.put("auto.commit.interval.ms", "100");
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("bootstrap.servers", this.kafkaBootstrapServers);
        hashMap.put("key.deserializer", StringDeserializer.class.getName());
        hashMap.put("value.deserializer", StringDeserializer.class.getName());
        hashMap.put("group.id", KafkaTestClient.class.getName() + "Consumer");
        return new DefaultKafkaConsumerFactory(hashMap);
    }

    private ProducerFactory<String, String> producerFactory() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafkaBootstrapServers);
        hashMap.put("acks", "1");
        hashMap.put("client.id", KafkaTestClient.class.getName() + "Producer");
        hashMap.put("key.serializer", StringSerializer.class.getName());
        hashMap.put("value.serializer", StringSerializer.class.getName());
        return new DefaultKafkaProducerFactory(hashMap);
    }

    public void consume(Collection<String> collection, final Consumer<String> consumer) {
        if (this.container != null) {
            this.latch = new CountDownLatch(1);
            this.container.stop();
            this.container = null;
            consume(collection, consumer);
            return;
        }
        this.container = new KafkaMessageListenerContainer<>(consumerFactory(), new ContainerProperties((String[]) collection.toArray(new String[0])));
        this.container.setupMessageListener(new MessageListener<String, String>() { // from class: org.kie.kogito.test.springboot.kafka.KafkaTestClient.1
            public void onMessage(ConsumerRecord<String, String> consumerRecord) {
                consumer.accept((String) consumerRecord.value());
            }
        });
        this.container.setBeanName("kafka-test-client");
        this.container.start();
        try {
            this.latch.await(10L, TimeUnit.SECONDS);
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void consume(String str, Consumer<String> consumer) {
        consume(Collections.singletonList(str), consumer);
    }

    public void produce(String str, String str2) {
        LOGGER.info("Publishing event with data {} for topic {}", str, str2);
        this.producer.send(str2, str).addCallback(produceCallback());
        this.producer.flush();
    }

    public ListenableFutureCallback<SendResult<String, String>> produceCallback() {
        return new ListenableFutureCallback<SendResult<String, String>>() { // from class: org.kie.kogito.test.springboot.kafka.KafkaTestClient.2
            public void onFailure(Throwable th) {
                KafkaTestClient.LOGGER.error("Event publishing failed", th);
            }

            public void onSuccess(SendResult<String, String> sendResult) {
                KafkaTestClient.LOGGER.info("Event published {}", sendResult.getRecordMetadata());
            }
        };
    }

    public void onApplicationEvent(ConsumerStartedEvent consumerStartedEvent) {
        this.latch.countDown();
    }

    @PreDestroy
    public void shutdown() {
        if (this.producer != null) {
            this.producer.destroy();
        }
        if (this.container != null) {
            this.container.stop();
        }
    }
}
