package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;
import org.kie.kogito.timer.Interval;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSource.class */
public class KafkaSource<K, V> {
    private final Multi<IncomingKafkaRecord<K, V>> stream;
    private final KafkaConsumer<K, V> consumer;
    private final KafkaFailureHandler failureHandler;

    public KafkaSource(Vertx vertx, String str, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Instance<KafkaConsumerRebalanceListener> instance) {
        HashMap hashMap = new HashMap();
        JsonHelper.asJsonObject(kafkaConnectorIncomingConfiguration.config()).forEach(entry -> {
        });
        hashMap.put("group.id", str);
        String bootstrapServers = kafkaConnectorIncomingConfiguration.getBootstrapServers();
        if (!hashMap.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", bootstrapServers);
            hashMap.put("bootstrap.servers", bootstrapServers);
        }
        if (!hashMap.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
            KafkaLogging.log.keyDeserializerOmitted();
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConnectorIncomingConfiguration.getKeyDeserializer());
        }
        hashMap.remove(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE);
        hashMap.remove(ConsumerProtocol.TOPIC_KEY_NAME);
        hashMap.remove(ConnectorFactory.CONNECTOR_ATTRIBUTE);
        hashMap.remove("retry");
        hashMap.remove("retry-attempts");
        hashMap.remove("broadcast");
        hashMap.remove(ConsumerProtocol.PARTITIONS_KEY_NAME);
        hashMap.remove("consumer-rebalance-listener.name");
        KafkaConsumer<K, V> create = KafkaConsumer.create(vertx, hashMap);
        Optional<U> map = kafkaConnectorIncomingConfiguration.getConsumerRebalanceListenerName().map(str2 -> {
            KafkaLogging.log.loadingConsumerRebalanceListenerFromConfiguredName(str2);
            return NamedLiteral.of(str2);
        });
        instance.getClass();
        ((Optional) map.map(annotation -> {
            return instance.select(annotation);
        }).map((v0) -> {
            return v0.get();
        }).map((v0) -> {
            return Optional.of(v0);
        }).orElseGet(() -> {
            Instance select = instance.select(NamedLiteral.of(str));
            if (select.isUnsatisfied()) {
                return Optional.empty();
            }
            KafkaLogging.log.loadingConsumerRebalanceListenerFromGroupId(str);
            return Optional.of(select.get());
        })).ifPresent(kafkaConsumerRebalanceListener -> {
            long parseLong = Long.parseLong((String) hashMap.getOrDefault("max.poll.interval.ms", "300000")) + (hashMap.get("group.instance.id") == null ? 0L : Long.parseLong((String) hashMap.getOrDefault("session.timeout.ms", "10000"))) + 11000;
            create.partitionsAssignedHandler(set -> {
                create.pause2();
                KafkaLogging.log.executingConsumerAssignedRebalanceListener(str);
                kafkaConsumerRebalanceListener.onPartitionsAssigned(create, set).onFailure().invoke(th -> {
                    KafkaLogging.log.unableToExecuteConsumerAssignedRebalanceListener(str, th);
                }).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(10L)).expireIn(parseLong).subscribe().with(r5 -> {
                    KafkaLogging.log.executedConsumerAssignedRebalanceListener(str);
                    create.resume2();
                }, th2 -> {
                    KafkaLogging.log.reEnablingConsumerforGroup(str);
                    create.resume2();
                });
            });
            create.partitionsRevokedHandler(set2 -> {
                KafkaLogging.log.executingConsumerRevokedRebalanceListener(str);
                kafkaConsumerRebalanceListener.onPartitionsRevoked(create, set2).subscribe().with(r4 -> {
                    KafkaLogging.log.executedConsumerRevokedRebalanceListener(str);
                }, th -> {
                    KafkaLogging.log.unableToExecuteConsumerRevokedRebalanceListener(str, th);
                });
            });
        });
        this.consumer = create;
        Optional<String> topic = kafkaConnectorIncomingConfiguration.getTopic();
        kafkaConnectorIncomingConfiguration.getClass();
        String orElseGet = topic.orElseGet(kafkaConnectorIncomingConfiguration::getChannel);
        this.failureHandler = createFailureHandler(kafkaConnectorIncomingConfiguration, vertx, hashMap);
        Multi<KafkaConsumerRecord<K, V>> invoke = this.consumer.toMulti().onFailure().invoke(th -> {
            KafkaLogging.log.unableToReadRecord(orElseGet, th);
        });
        if (kafkaConnectorIncomingConfiguration.getRetry().booleanValue()) {
            int intValue = kafkaConnectorIncomingConfiguration.getRetryAttempts().intValue();
            int intValue2 = kafkaConnectorIncomingConfiguration.getRetryMaxWait().intValue();
            if (intValue == -1) {
                invoke.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(Interval.MAX);
            } else {
                invoke = invoke.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(intValue);
            }
        }
        this.stream = (Multi<IncomingKafkaRecord<K, V>>) invoke.on().subscribed(subscription -> {
            this.consumer.subscribeAndAwait(orElseGet);
        }).map(kafkaConsumerRecord -> {
            return new IncomingKafkaRecord(this.consumer, kafkaConsumerRecord, this.failureHandler);
        });
    }

    private KafkaFailureHandler createFailureHandler(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, Map<String, String> map) {
        String failureStrategy = kafkaConnectorIncomingConfiguration.getFailureStrategy();
        switch (KafkaFailureHandler.Strategy.from(failureStrategy)) {
            case FAIL:
                return new KafkaFailStop(kafkaConnectorIncomingConfiguration.getChannel());
            case IGNORE:
                return new KafkaIgnoreFailure(kafkaConnectorIncomingConfiguration.getChannel());
            case DEAD_LETTER_QUEUE:
                return KafkaDeadLetterQueue.create(vertx, map, kafkaConnectorIncomingConfiguration);
            default:
                throw KafkaExceptions.ex.illegalArgumentInvalidStrategy(failureStrategy);
        }
    }

    public Multi<IncomingKafkaRecord<K, V>> getStream() {
        return this.stream;
    }

    public void closeQuietly() {
        try {
            this.consumer.closeAndAwait();
        } catch (Throwable th) {
            KafkaLogging.log.exceptionOnClose(th);
        }
    }
}
