/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.health;

import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaAdmin;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.health.BaseHealth;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

public class KafkaSinkReadinessHealth
extends BaseHealth {
    private final KafkaConnectorOutgoingConfiguration config;
    private final KafkaAdmin admin;
    private final Metric metric;
    private final String topic;

    public KafkaSinkReadinessHealth(KafkaConnectorOutgoingConfiguration config, Map<String, ?> kafkaConfiguration, Producer<?, ?> producer) {
        super(config.getChannel());
        this.topic = config.getTopic().orElse(config.getChannel());
        this.config = config;
        if (config.getHealthReadinessTopicVerification().booleanValue()) {
            HashMap<String, Object> adminConfiguration = new HashMap<String, Object>(kafkaConfiguration);
            this.admin = KafkaAdminHelper.createAdminClient(adminConfiguration, config.getChannel(), true);
            this.metric = null;
        } else {
            this.admin = null;
            Map<MetricName, Metric> metrics = producer.metrics();
            this.metric = this.getMetric(metrics);
        }
    }

    @Override
    public KafkaAdmin getAdmin() {
        return this.admin;
    }

    @Override
    protected void metricsBasedHealthCheck(HealthReport.HealthReportBuilder builder) {
        if (this.metric != null) {
            builder.add(this.channel, (Double)this.metric.metricValue() >= 1.0);
        } else {
            builder.add(this.channel, true).build();
        }
    }

    @Override
    protected void adminBasedHealthCheck(HealthReport.HealthReportBuilder builder) {
        try {
            Set<String> topics = this.admin.listTopics().await().atMost(Duration.ofMillis(this.config.getHealthReadinessTimeout()));
            if (topics.contains(this.topic)) {
                builder.add(this.config.getChannel(), true);
            } else {
                builder.add(this.config.getChannel(), false, "Unable to find topic " + this.topic);
            }
        }
        catch (Exception failed) {
            builder.add(this.config.getChannel(), false, "No response from broker for topic " + this.topic + " : " + failed);
        }
    }
}

