/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.health;

import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaAdmin;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.health.BaseHealth;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;

public class KafkaSourceHealth
extends BaseHealth {
    private final KafkaAdmin admin;
    private final KafkaSource<?, ?> source;
    private final ReactiveKafkaConsumer<?, ?> client;
    private final Set<String> topics;
    private final Pattern pattern;
    private final Duration adminClientTimeout;
    private Metric metric;

    public KafkaSourceHealth(KafkaSource<?, ?> source, KafkaConnectorIncomingConfiguration config, ReactiveKafkaConsumer<?, ?> client, Set<String> topics, Pattern pattern) {
        super(config.getChannel(), config.getHealthReadinessTopicVerification().orElse(config.getHealthTopicVerificationEnabled()), config.getHealthTopicVerificationStartupDisabled(), config.getHealthTopicVerificationReadinessDisabled());
        this.adminClientTimeout = Duration.ofMillis(config.getHealthReadinessTimeout().orElse(config.getHealthTopicVerificationTimeout()));
        this.source = source;
        this.client = client;
        this.topics = topics;
        this.pattern = pattern;
        if (config.getHealthReadinessTopicVerification().orElse(config.getHealthTopicVerificationEnabled()).booleanValue()) {
            HashMap<String, Object> adminConfiguration = new HashMap<String, Object>(client.configuration());
            this.admin = KafkaAdminHelper.createAdminClient(adminConfiguration, config.getChannel(), true);
        } else {
            this.admin = null;
        }
    }

    protected synchronized Metric getMetric() {
        Consumer<?, ?> consumer;
        if (this.metric == null && (consumer = this.client.unwrap()) != null) {
            this.metric = this.getMetric(consumer.metrics());
        }
        return this.metric;
    }

    @Override
    protected void metricsBasedStartupCheck(HealthReport.HealthReportBuilder builder) {
        Metric metric = this.getMetric();
        if (metric != null) {
            boolean connected = (Double)metric.metricValue() >= 1.0;
            boolean hasSubscribers = this.source.hasSubscribers();
            if (connected) {
                builder.add(this.channel, true);
            } else if (!hasSubscribers) {
                builder.add(this.channel, true, "no subscription yet, so no connection to the Kafka broker yet");
            } else {
                builder.add(this.channel, false);
            }
        } else {
            builder.add(this.channel, true).build();
        }
    }

    @Override
    protected void metricsBasedReadinessCheck(HealthReport.HealthReportBuilder builder) {
        this.metricsBasedStartupCheck(builder);
    }

    @Override
    protected void clientBasedStartupCheck(HealthReport.HealthReportBuilder builder) {
        if (this.pattern == null) {
            this.checkTopicExists(builder);
        } else {
            this.checkTopicExistsForPattern(builder);
        }
    }

    private void checkTopicExists(HealthReport.HealthReportBuilder builder) {
        try {
            Map found = (Map)this.admin.describeTopics(this.topics, new DescribeTopicsOptions().includeAuthorizedOperations(false).timeoutMs(Integer.valueOf((int)this.adminClientTimeout.toMillis()))).await().atMost(this.adminClientTimeout);
            if (found.keySet().containsAll(this.topics)) {
                if (this.topics.stream().allMatch(t -> ((TopicDescription)found.get(t)).partitions().stream().allMatch(info -> info.leader() != null))) {
                    builder.add(this.channel, true);
                } else {
                    builder.add(this.channel, false, "Unable to find leaders for all partitions of topics " + String.valueOf(this.topics));
                }
            } else {
                builder.add(this.channel, false, "Unable to find topic(s) " + String.valueOf(this.topics) + " in " + String.valueOf(found.keySet()));
            }
        }
        catch (Exception failed) {
            builder.add(this.channel, false, "No response from broker for topics " + String.valueOf(this.topics) + " : " + String.valueOf(failed));
        }
    }

    private void checkTopicExistsForPattern(HealthReport.HealthReportBuilder builder) {
        try {
            Set found = (Set)this.admin.listTopics(new ListTopicsOptions().timeoutMs(Integer.valueOf((int)this.adminClientTimeout.toMillis()))).await().atMost(this.adminClientTimeout);
            if (found.stream().anyMatch(t -> this.pattern.matcher((CharSequence)t).matches())) {
                builder.add(this.channel, true);
            } else {
                builder.add(this.channel, false, "Unable to find topic(s) matching pattern " + String.valueOf(this.pattern) + " in " + String.valueOf(found));
            }
        }
        catch (Exception failed) {
            builder.add(this.channel, false, "No response from broker for topic(s) " + String.valueOf(this.pattern) + " : " + String.valueOf(failed));
        }
    }

    @Override
    protected void clientBasedReadinessCheck(HealthReport.HealthReportBuilder builder) {
        block5: {
            if (this.source.hasSubscribers()) {
                try {
                    Set partitions = (Set)this.client.getAssignments().await().atMost(this.adminClientTimeout);
                    if (partitions.isEmpty()) {
                        builder.add(this.channel, false, "No partition assignments for channel " + this.channel);
                        break block5;
                    }
                    builder.add(this.channel, true);
                }
                catch (Exception failed) {
                    builder.add(this.channel, false, "No response from broker for channel " + this.channel + " : " + String.valueOf(failed));
                }
            } else {
                builder.add(this.channel, true, "no subscription yet, so no partition assignments");
            }
        }
    }

    @Override
    public KafkaAdmin getAdmin() {
        return this.admin;
    }
}

