package org.acme.travel;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/acme/travel/KafkaTester.class */
public class KafkaTester {
    private KafkaProducer<String, String> producer;
    private KafkaConsumer<String, String> consumer;
    private CountDownLatch shutdownLatch = new CountDownLatch(1);
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    private static Logger LOGGER = LoggerFactory.getLogger(KafkaTester.class);

    public KafkaTester(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("client.id", getClass().getName() + "Producer");
        properties.put("key.serializer", IntegerSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("enable.auto.commit", "true");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("bootstrap.servers", str);
        properties2.put("key.deserializer", IntegerDeserializer.class.getName());
        properties2.put("value.deserializer", StringDeserializer.class.getName());
        properties2.put("group.id", getClass().getName() + "Consumer");
        this.consumer = new KafkaConsumer<>(properties2);
    }

    public void consume(String str, Consumer<String> consumer) {
        this.consumer.subscribe(Collections.singletonList(str));
        CompletableFuture.runAsync(() -> {
            while (!this.shutdown.get()) {
                Stream map = StreamSupport.stream(this.consumer.poll(Duration.ofMillis(500L)).spliterator(), true).map((v0) -> {
                    return v0.value();
                });
                Objects.requireNonNull(consumer);
                map.forEach((v1) -> {
                    r1.accept(v1);
                });
                this.consumer.commitSync();
            }
            this.shutdownLatch.countDown();
        });
    }

    public void produce(String str, String str2) {
        this.producer.send(new ProducerRecord(str2, str), (recordMetadata, exc) -> {
            Optional.ofNullable(exc).ifPresent(exc -> {
                LOGGER.error("Error publishing message {}", recordMetadata, exc);
            });
        });
    }

    public void shutdown() {
        CompletableFuture.runAsync(() -> {
            this.producer.close();
        });
        try {
            this.shutdown.set(true);
            this.shutdownLatch.await(1L, TimeUnit.MINUTES);
            this.consumer.close();
        } catch (InterruptedException e) {
            LOGGER.error("Error shutting down kafka consumer/producer", e);
        }
    }
}
