/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.stream;

import io.quarkus.runtime.Startup;
import io.quarkus.runtime.StartupEvent;
import io.vertx.kafka.admin.NewTopic;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Startup
@ApplicationScoped
public class KafkaConfiguration {
    private Instance<Map<String, Object>> defaultKafkaConfiguration;
    private Vertx vertx;
    private Optional<Boolean> enabled;
    private String topic;
    private KafkaAdminClient adminClient;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);

    @Inject
    public KafkaConfiguration(@Named(value="default-kafka-broker") Instance<Map<String, Object>> defaultKafkaConfiguration, Vertx vertx, @ConfigProperty(name="kogito.jobs-service.events-support") Optional<Boolean> enabled, @ConfigProperty(name="kogito.jobs-events-topic") String topic) {
        this.defaultKafkaConfiguration = defaultKafkaConfiguration;
        this.vertx = vertx;
        this.enabled = enabled;
        this.topic = topic;
    }

    void topicConfiguration(@Observes StartupEvent event) {
        LOGGER.info("Kafka topic configuration check.");
        Map<String, String> config = ((Map)this.defaultKafkaConfiguration.get()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, el -> (String)el.getValue()));
        this.enabled.filter(Boolean.TRUE::equals).map(e -> this.getOrCreateClient(config)).ifPresent(client -> client.listTopics().subscribe().with(t -> Optional.ofNullable(t.contains(this.topic)).map(match -> new NewTopic(this.topic, 1, 1)).ifPresent(newTopic -> client.createTopics(Arrays.asList(newTopic)).subscribe().with(r -> LOGGER.info("Created topic {}", (Object)this.topic)))));
    }

    private KafkaAdminClient getOrCreateClient(Map<String, String> config) {
        this.adminClient = Optional.ofNullable(this.adminClient).orElseGet(() -> KafkaAdminClient.create((Vertx)this.vertx, (Map)config));
        return this.adminClient;
    }

    protected KafkaAdminClient getAdminClient() {
        return this.adminClient;
    }
}

