package io.smallrye.reactive.messaging.kafka.health;

import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaAdmin;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/health/KafkaSourceReadinessHealth.class */
public class KafkaSourceReadinessHealth extends BaseHealth {
    private final KafkaAdmin admin;
    private final KafkaConnectorIncomingConfiguration config;
    private final Pattern pattern;
    private final String channel;
    private final Set<String> topics;
    private final Metric metric;
    private final KafkaSource<?, ?> source;

    public KafkaSourceReadinessHealth(KafkaSource<?, ?> kafkaSource, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Map<String, ?> map, Consumer<?, ?> consumer, Set<String> set, Pattern pattern) {
        super(kafkaConnectorIncomingConfiguration.getChannel());
        this.config = kafkaConnectorIncomingConfiguration;
        this.channel = kafkaConnectorIncomingConfiguration.getChannel();
        this.topics = set;
        this.pattern = pattern;
        this.source = kafkaSource;
        if (kafkaConnectorIncomingConfiguration.getHealthReadinessTopicVerification().booleanValue()) {
            this.admin = KafkaAdminHelper.createAdminClient(new HashMap(map), kafkaConnectorIncomingConfiguration.getChannel(), true);
            this.metric = null;
        } else {
            this.admin = null;
            this.metric = getMetric(consumer.metrics());
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void metricsBasedHealthCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.metric == null) {
            healthReportBuilder.add(this.channel, true).build();
            return;
        }
        boolean z = ((Double) this.metric.metricValue()).doubleValue() >= 1.0d;
        boolean hasSubscribers = this.source.hasSubscribers();
        if (z) {
            healthReportBuilder.add(this.channel, true);
        } else if (hasSubscribers) {
            healthReportBuilder.add(this.channel, false);
        } else {
            healthReportBuilder.add(this.channel, true, "no subscription yet, so no connection to the Kafka broker yet");
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    protected void adminBasedHealthCheck(HealthReport.HealthReportBuilder healthReportBuilder) {
        try {
            Set<String> atMost = this.admin.listTopics().await().atMost(Duration.ofMillis(this.config.getHealthReadinessTimeout().longValue()));
            if (this.pattern == null && atMost.containsAll(this.topics)) {
                healthReportBuilder.add(this.channel, true);
            } else if (this.pattern != null) {
                boolean anyMatch = atMost.stream().anyMatch(str -> {
                    return this.pattern.matcher(str).matches();
                });
                if (anyMatch) {
                    healthReportBuilder.add(this.channel, anyMatch);
                } else {
                    healthReportBuilder.add(this.channel, false, "Unable to find a topic matching the given pattern: " + this.pattern);
                }
            } else {
                healthReportBuilder.add(this.channel, false, "Unable to find topic(s): " + ((String) this.topics.stream().filter(str2 -> {
                    return !atMost.contains(str2);
                }).collect(Collectors.joining())));
            }
        } catch (Exception e) {
            healthReportBuilder.add(this.channel, false, "No response from broker for channel " + this.channel + " : " + e);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.health.BaseHealth
    public KafkaAdmin getAdmin() {
        return this.admin;
    }
}
