package io.smallrye.reactive.messaging.kafka.health;

import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaAdmin;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/health/KafkaSourceHealth.class */
public class KafkaSourceHealth extends BaseHealth {
    private final KafkaAdmin admin;
    private final KafkaSource<?, ?> source;
    private final ReactiveKafkaConsumer<?, ?> client;
    private final Set<String> topics;
    private final Pattern pattern;
    private final Duration adminClientTimeout;
    private Metric metric;

    public KafkaSourceHealth(KafkaSource<?, ?> kafkaSource, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, ReactiveKafkaConsumer<?, ?> reactiveKafkaConsumer, Set<String> set, Pattern pattern) {
        super(kafkaConnectorIncomingConfiguration.getChannel(), kafkaConnectorIncomingConfiguration.getHealthReadinessTopicVerification().orElse(kafkaConnectorIncomingConfiguration.getHealthTopicVerificationEnabled()).booleanValue(), kafkaConnectorIncomingConfiguration.getHealthTopicVerificationStartupDisabled().booleanValue(), kafkaConnectorIncomingConfiguration.getHealthTopicVerificationReadinessDisabled().booleanValue());
        this.adminClientTimeout = Duration.ofMillis(kafkaConnectorIncomingConfiguration.getHealthReadinessTimeout().orElse(kafkaConnectorIncomingConfiguration.getHealthTopicVerificationTimeout()).longValue());
        this.source = kafkaSource;
        this.client = reactiveKafkaConsumer;
        this.topics = set;
        this.pattern = pattern;
        if (kafkaConnectorIncomingConfiguration.getHealthReadinessTopicVerification().orElse(kafkaConnectorIncomingConfiguration.getHealthTopicVerificationEnabled()).booleanValue()) {
            this.admin = KafkaAdminHelper.createAdminClient(new HashMap(reactiveKafkaConsumer.configuration()), kafkaConnectorIncomingConfiguration.getChannel(), true);
        } else {
            this.admin = null;
        }
    }

    protected synchronized Metric getMetric() {
        Consumer<?, ?> unwrap;
        if (this.metric == null && (unwrap = this.client.unwrap()) != null) {
            this.metric = getMetric(unwrap.metrics());
        }
        return this.metric;
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void metricsBasedStartupCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        Metric metric = getMetric();
        if (metric == null) {
            healthReportBuilder.add(this.channel, true).build();
            return;
        }
        boolean z = ((Double) metric.metricValue()).doubleValue() >= 1.0d;
        boolean hasSubscribers = this.source.hasSubscribers();
        if (z) {
            healthReportBuilder.add(this.channel, true);
        } else if (hasSubscribers) {
            healthReportBuilder.add(this.channel, false);
        } else {
            healthReportBuilder.add(this.channel, true, "no subscription yet, so no connection to the Kafka broker yet");
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void metricsBasedReadinessCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        metricsBasedStartupCheck(healthReportBuilder);
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void clientBasedStartupCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.pattern == null) {
            checkTopicExists(healthReportBuilder);
        } else {
            checkTopicExistsForPattern(healthReportBuilder);
        }
    }

    private void checkTopicExists(HealthReport.HealthReportBuilder healthReportBuilder) {
        try {
            Map map = (Map) this.admin.describeTopics(this.topics, new DescribeTopicsOptions().includeAuthorizedOperations(false).timeoutMs(Integer.valueOf((int) this.adminClientTimeout.toMillis()))).await().atMost(this.adminClientTimeout);
            if (!map.keySet().containsAll(this.topics)) {
                healthReportBuilder.add(this.channel, false, "Unable to find topic(s) " + this.topics + " in " + map.keySet());
            } else if (this.topics.stream().allMatch(str -> {
                return ((TopicDescription) map.get(str)).partitions().stream().allMatch(topicPartitionInfo -> {
                    return topicPartitionInfo.leader() != null;
                });
            })) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, "Unable to find leaders for all partitions of topics " + this.topics);
            }
        } catch (Exception e) {
            healthReportBuilder.add(this.channel, false, "No response from broker for topics " + this.topics + " : " + e);
        }
    }

    private void checkTopicExistsForPattern(HealthReport.HealthReportBuilder healthReportBuilder) {
        try {
            Set set = (Set) this.admin.listTopics(new ListTopicsOptions().timeoutMs(Integer.valueOf((int) this.adminClientTimeout.toMillis()))).await().atMost(this.adminClientTimeout);
            if (set.stream().anyMatch(str -> {
                return this.pattern.matcher(str).matches();
            })) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, "Unable to find topic(s) matching pattern " + this.pattern + " in " + set);
            }
        } catch (Exception e) {
            healthReportBuilder.add(this.channel, false, "No response from broker for topic(s) " + this.pattern + " : " + e);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void clientBasedReadinessCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (!this.source.hasSubscribers()) {
            healthReportBuilder.add(this.channel, true, "no subscription yet, so no partition assignments");
            return;
        }
        try {
            if (((Set) this.client.getAssignments().await().atMost(this.adminClientTimeout)).isEmpty()) {
                healthReportBuilder.add(this.channel, false, "No partition assignments for channel " + this.channel);
            } else {
                healthReportBuilder.add(this.channel, true);
            }
        } catch (Exception e) {
            healthReportBuilder.add(this.channel, false, "No response from broker for channel " + this.channel + " : " + e);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    public KafkaAdmin getAdmin() {
        return this.admin;
    }
}
