package io.smallrye.reactive.messaging.kafka;

import io.reactivex.Flowable;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumer;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecord;
import java.util.HashMap;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/KafkaSource.class */
public class KafkaSource<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaSource.class);
    private final PublisherBuilder<KafkaMessage> source;
    private final KafkaConsumer<K, V> consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSource(Vertx vertx, Config config, String str) {
        HashMap hashMap = new HashMap();
        String str2 = (String) config.getOptionalValue(ConsumerConfig.GROUP_ID_CONFIG, String.class).orElseGet(() -> {
            String uuid = UUID.randomUUID().toString();
            LOGGER.warn("No `group.id` set in the configuration, generate a random id: {}", uuid);
            return uuid;
        });
        JsonHelper.asJsonObject(config).forEach(entry -> {
        });
        hashMap.put(ConsumerConfig.GROUP_ID_CONFIG, str2);
        if (!hashMap.containsKey("bootstrap.servers")) {
            LOGGER.info("Setting {} to {}", "bootstrap.servers", str);
            hashMap.put("bootstrap.servers", str);
        }
        if (!hashMap.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
            LOGGER.info("Key deserializer omitted, using String as default");
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        }
        this.consumer = KafkaConsumer.create(vertx, hashMap);
        String topicOrFail = getTopicOrFail(config);
        Objects.requireNonNull(topicOrFail, "The topic must be set, or the name must be set");
        Flowable<KafkaConsumerRecord<K, V>> flowable = this.consumer.toFlowable();
        if (((Boolean) config.getOptionalValue("retry", Boolean.class).orElse(true)).booleanValue()) {
            Integer num = (Integer) config.getOptionalValue("retry-attempts", Integer.class).orElse(5);
            flowable = flowable.retryWhen(flowable2 -> {
                return flowable2.zipWith(Flowable.range(1, num.intValue()), (th, num2) -> {
                    return num2;
                }).flatMap(num3 -> {
                    return Flowable.timer(num3.intValue(), TimeUnit.SECONDS);
                });
            });
        }
        this.source = ReactiveStreams.fromPublisher((((Boolean) config.getOptionalValue("broadcast", Boolean.class).orElse(false)).booleanValue() ? flowable.publish().autoConnect() : flowable).doOnSubscribe(subscription -> {
            this.consumer.subscribe(topicOrFail);
        })).map(kafkaConsumerRecord -> {
            return new ReceivedKafkaMessage(this.consumer, kafkaConsumerRecord);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherBuilder<KafkaMessage> getSource() {
        return this.source;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.consumer.close();
    }

    private String getTopicOrFail(Config config) {
        return (String) config.getOptionalValue(ConsumerProtocol.TOPIC_KEY_NAME, String.class).orElseGet(() -> {
            return (String) config.getOptionalValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class).orElseThrow(() -> {
                return new IllegalArgumentException("Topic attribute must be set");
            });
        });
    }
}
