package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.openapi.runtime.io.schema.SchemaConstant;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.commit.KafkaIgnoreCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;
import org.kie.kogito.timer.Interval;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSource.class */
public class KafkaSource<K, V> {
    private final Multi<IncomingKafkaRecord<K, V>> stream;
    private final KafkaConsumer<K, V> consumer;
    private final KafkaFailureHandler failureHandler;
    private final KafkaCommitHandler commitHandler;
    private final KafkaConnectorIncomingConfiguration configuration;
    private final KafkaAdminClient admin;
    private final List<Throwable> failures = new ArrayList();
    private final Set<String> topics;
    private final Pattern pattern;

    public KafkaSource(Vertx vertx, String str, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Instance<KafkaConsumerRebalanceListener> instance) {
        this.topics = getTopics(kafkaConnectorIncomingConfiguration);
        if (kafkaConnectorIncomingConfiguration.getPattern().booleanValue()) {
            this.pattern = Pattern.compile(kafkaConnectorIncomingConfiguration.getTopic().orElseThrow(() -> {
                return new IllegalArgumentException("Invalid Kafka incoming configuration for channel `" + kafkaConnectorIncomingConfiguration.getChannel() + "`, `pattern` must be used with the `topic` attribute");
            }));
            KafkaLogging.log.configuredPattern(kafkaConnectorIncomingConfiguration.getChannel(), this.pattern.toString());
        } else {
            KafkaLogging.log.configuredTopics(kafkaConnectorIncomingConfiguration.getChannel(), this.topics);
            this.pattern = null;
        }
        HashMap hashMap = new HashMap();
        this.configuration = kafkaConnectorIncomingConfiguration;
        JsonHelper.asJsonObject(kafkaConnectorIncomingConfiguration.config()).forEach(entry -> {
        });
        hashMap.put("group.id", str);
        String bootstrapServers = kafkaConnectorIncomingConfiguration.getBootstrapServers();
        if (!hashMap.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", bootstrapServers);
            hashMap.put("bootstrap.servers", bootstrapServers);
        }
        if (!hashMap.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
            KafkaLogging.log.keyDeserializerOmitted();
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConnectorIncomingConfiguration.getKeyDeserializer());
        }
        String orElse = kafkaConnectorIncomingConfiguration.getCommitStrategy().orElse(Boolean.parseBoolean(hashMap.getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")) ? KafkaCommitHandler.Strategy.IGNORE.name() : KafkaCommitHandler.Strategy.LATEST.name());
        hashMap.remove(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE);
        hashMap.remove(ConsumerProtocol.TOPIC_KEY_NAME);
        hashMap.remove(ConsumerProtocol.TOPICS_KEY_NAME);
        hashMap.remove(SchemaConstant.PROP_PATTERN);
        hashMap.remove(ConnectorFactory.CONNECTOR_ATTRIBUTE);
        hashMap.remove("retry");
        hashMap.remove("retry-attempts");
        hashMap.remove("broadcast");
        hashMap.remove(ConsumerProtocol.PARTITIONS_KEY_NAME);
        hashMap.remove("commit-strategy");
        hashMap.remove("consumer-rebalance-listener.name");
        hashMap.remove("health.enabled");
        KafkaConsumer<K, V> create = KafkaConsumer.create(vertx, hashMap);
        this.commitHandler = createCommitHandler(create, hashMap, orElse);
        this.admin = KafkaAdminHelper.createAdminClient(this.configuration, vertx, new HashMap(hashMap));
        Optional<U> map = kafkaConnectorIncomingConfiguration.getConsumerRebalanceListenerName().map(str2 -> {
            KafkaLogging.log.loadingConsumerRebalanceListenerFromConfiguredName(str2);
            return NamedLiteral.of(str2);
        });
        instance.getClass();
        Optional optional = (Optional) map.map(annotation -> {
            return instance.select(annotation);
        }).map((v0) -> {
            return v0.get();
        }).map((v0) -> {
            return Optional.of(v0);
        }).orElseGet(() -> {
            Instance select = instance.select(NamedLiteral.of(str));
            if (select.isUnsatisfied()) {
                return Optional.empty();
            }
            KafkaLogging.log.loadingConsumerRebalanceListenerFromGroupId(str);
            return Optional.of(select.get());
        });
        if (optional.isPresent()) {
            KafkaConsumerRebalanceListener kafkaConsumerRebalanceListener = (KafkaConsumerRebalanceListener) optional.get();
            long parseLong = Long.parseLong(hashMap.getOrDefault("max.poll.interval.ms", "300000")) + (hashMap.get("group.instance.id") == null ? 0L : Long.parseLong(hashMap.getOrDefault("session.timeout.ms", "10000"))) + 11000;
            create.partitionsAssignedHandler(set -> {
                create.pause();
                KafkaLogging.log.executingConsumerAssignedRebalanceListener(str);
                kafkaConsumerRebalanceListener.onPartitionsAssigned(create, set).onFailure().invoke(th -> {
                    KafkaLogging.log.unableToExecuteConsumerAssignedRebalanceListener(str, th);
                }).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(10L)).expireIn(parseLong).subscribe().with(r9 -> {
                    KafkaLogging.log.executedConsumerAssignedRebalanceListener(str);
                    this.commitHandler.partitionsAssigned(vertx.getOrCreateContext(), set);
                    create.resume();
                }, th2 -> {
                    KafkaLogging.log.reEnablingConsumerforGroup(str);
                    this.commitHandler.partitionsAssigned(vertx.getOrCreateContext(), set);
                    create.resume();
                });
            });
            create.partitionsRevokedHandler(set2 -> {
                KafkaLogging.log.executingConsumerRevokedRebalanceListener(str);
                kafkaConsumerRebalanceListener.onPartitionsRevoked(create, set2).subscribe().with(r4 -> {
                    KafkaLogging.log.executedConsumerRevokedRebalanceListener(str);
                }, th -> {
                    KafkaLogging.log.unableToExecuteConsumerRevokedRebalanceListener(str, th);
                });
            });
        } else {
            create.partitionsAssignedHandler(set3 -> {
                this.commitHandler.partitionsAssigned(vertx.getOrCreateContext(), set3);
            });
        }
        this.consumer = create;
        this.failureHandler = createFailureHandler(kafkaConnectorIncomingConfiguration, vertx, hashMap);
        Multi<KafkaConsumerRecord<K, V>> invoke = this.consumer.toMulti().onFailure().invoke(th -> {
            KafkaLogging.log.unableToReadRecord(this.topics, th);
            reportFailure(th);
        });
        if (kafkaConnectorIncomingConfiguration.getRetry().booleanValue()) {
            int intValue = kafkaConnectorIncomingConfiguration.getRetryAttempts().intValue();
            int intValue2 = kafkaConnectorIncomingConfiguration.getRetryMaxWait().intValue();
            if (intValue == -1) {
                invoke.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(Interval.MAX);
            } else {
                invoke = invoke.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(intValue);
            }
        }
        this.stream = invoke.onSubscribe().invokeUni(subscription -> {
            this.consumer.exceptionHandler(this::reportFailure);
            if (this.pattern == null) {
                return this.consumer.subscribe(this.topics);
            }
            BiConsumer biConsumer = (uniEmitter, asyncResult) -> {
                if (asyncResult.failed()) {
                    uniEmitter.fail(asyncResult.cause());
                } else {
                    uniEmitter.complete(null);
                }
            };
            return Uni.createFrom().emitter(uniEmitter2 -> {
                this.consumer.getDelegate().subscribe(this.pattern, asyncResult2 -> {
                    biConsumer.accept(uniEmitter2, asyncResult2);
                });
            });
        }).map(kafkaConsumerRecord -> {
            return this.commitHandler.received(new IncomingKafkaRecord<>(kafkaConsumerRecord, this.commitHandler, this.failureHandler));
        }).onFailure().invoke(this::reportFailure);
    }

    private Set<String> getTopics(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration) {
        String orElse = kafkaConnectorIncomingConfiguration.getTopics().orElse(null);
        String orElse2 = kafkaConnectorIncomingConfiguration.getTopic().orElse(null);
        String channel = kafkaConnectorIncomingConfiguration.getChannel();
        boolean booleanValue = kafkaConnectorIncomingConfiguration.getPattern().booleanValue();
        if (orElse != null && orElse2 != null) {
            throw new IllegalArgumentException("The Kafka incoming configuration for channel `" + channel + "` cannot use `topics` and `topic` at the same time");
        }
        if (orElse == null || !booleanValue) {
            return orElse != null ? (Set) Arrays.stream(orElse.split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toSet()) : orElse2 != null ? Collections.singleton(orElse2) : Collections.singleton(channel);
        }
        throw new IllegalArgumentException("The Kafka incoming configuration for channel `" + channel + "` cannot use `topics` and `pattern` at the same time");
    }

    public synchronized void reportFailure(Throwable th) {
        KafkaLogging.log.failureReported(this.topics, th);
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
    }

    private KafkaFailureHandler createFailureHandler(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, Map<String, String> map) {
        String failureStrategy = kafkaConnectorIncomingConfiguration.getFailureStrategy();
        switch (KafkaFailureHandler.Strategy.from(failureStrategy)) {
            case FAIL:
                return new KafkaFailStop(kafkaConnectorIncomingConfiguration.getChannel(), this);
            case IGNORE:
                return new KafkaIgnoreFailure(kafkaConnectorIncomingConfiguration.getChannel());
            case DEAD_LETTER_QUEUE:
                return KafkaDeadLetterQueue.create(vertx, map, kafkaConnectorIncomingConfiguration, this);
            default:
                throw KafkaExceptions.ex.illegalArgumentInvalidFailureStrategy(failureStrategy);
        }
    }

    private KafkaCommitHandler createCommitHandler(KafkaConsumer<K, V> kafkaConsumer, Map<String, String> map, String str) {
        switch (KafkaCommitHandler.Strategy.from(str)) {
            case LATEST:
                return new KafkaLatestCommit(kafkaConsumer);
            case IGNORE:
                return new KafkaIgnoreCommit();
            case THROTTLED:
                return KafkaThrottledLatestProcessedCommit.create(kafkaConsumer, map, this);
            default:
                throw KafkaExceptions.ex.illegalArgumentInvalidCommitStrategy(str);
        }
    }

    public Multi<IncomingKafkaRecord<K, V>> getStream() {
        return this.stream;
    }

    public void closeQuietly() {
        try {
            this.consumer.closeAndAwait();
        } catch (Throwable th) {
            KafkaLogging.log.exceptionOnClose(th);
        }
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.configuration.getHealthEnabled().booleanValue()) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.configuration.getChannel(), true);
            } else {
                healthReportBuilder.add(this.configuration.getChannel(), false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.configuration.getHealthEnabled().booleanValue()) {
            try {
                Set<String> atMost = this.admin.listTopics().await().atMost(Duration.ofSeconds(2L));
                if (this.pattern == null && atMost.containsAll(this.topics)) {
                    healthReportBuilder.add(this.configuration.getChannel(), true);
                } else if (this.pattern != null) {
                    boolean anyMatch = atMost.stream().anyMatch(str -> {
                        return this.pattern.matcher(str).matches();
                    });
                    if (anyMatch) {
                        healthReportBuilder.add(this.configuration.getChannel(), anyMatch);
                    } else {
                        healthReportBuilder.add(this.configuration.getChannel(), false, "Unable to find a topic matching the given pattern: " + this.pattern);
                    }
                } else {
                    healthReportBuilder.add(this.configuration.getChannel(), false, "Unable to find topic(s): " + ((String) this.topics.stream().filter(str2 -> {
                        return !atMost.contains(str2);
                    }).collect(Collectors.joining())));
                }
            } catch (Exception e) {
                healthReportBuilder.add(this.configuration.getChannel(), false, "No response from broker for channel " + this.configuration.getChannel() + " : " + e);
            }
        }
    }
}
