package io.smallrye.reactive.messaging.kafka.impl;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.commit.KafkaIgnoreCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.enterprise.inject.Instance;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSource.class */
public class KafkaSource<K, V> {
    private final Multi<IncomingKafkaRecord<K, V>> stream;
    private final Multi<IncomingKafkaRecordBatch<K, V>> batchStream;
    private final KafkaFailureHandler failureHandler;
    private final KafkaCommitHandler commitHandler;
    private final KafkaConnectorIncomingConfiguration configuration;
    private final List<Throwable> failures = new ArrayList();
    private final Set<String> topics;
    private final boolean isTracingEnabled;
    private final boolean isHealthEnabled;
    private final boolean isCloudEventEnabled;
    private final String channel;
    private volatile boolean subscribed;
    private final KafkaSourceHealth health;
    private final String group;
    private final int index;
    private final Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers;
    private final Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;
    private final ReactiveKafkaConsumer<K, V> client;
    private final EventLoopContext context;

    public KafkaSource(Vertx vertx, String str, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Instance<KafkaConsumerRebalanceListener> instance, KafkaCDIEvents kafkaCDIEvents, Instance<DeserializationFailureHandler<?>> instance2, int i) {
        Pattern pattern;
        this.group = str;
        this.index = i;
        this.deserializationFailureHandlers = instance2;
        this.consumerRebalanceListeners = instance;
        this.topics = getTopics(kafkaConnectorIncomingConfiguration);
        if (kafkaConnectorIncomingConfiguration.getPattern().booleanValue()) {
            pattern = Pattern.compile(kafkaConnectorIncomingConfiguration.getTopic().orElseThrow(() -> {
                return new IllegalArgumentException("Invalid Kafka incoming configuration for channel `" + kafkaConnectorIncomingConfiguration.getChannel() + "`, `pattern` must be used with the `topic` attribute");
            }));
            KafkaLogging.log.configuredPattern(kafkaConnectorIncomingConfiguration.getChannel(), pattern.toString());
        } else {
            KafkaLogging.log.configuredTopics(kafkaConnectorIncomingConfiguration.getChannel(), this.topics);
            pattern = null;
        }
        this.configuration = kafkaConnectorIncomingConfiguration;
        this.context = ((VertxInternal) vertx.mo3390getDelegate()).createEventLoopContext();
        this.client = new ReactiveKafkaConsumer<>(kafkaConnectorIncomingConfiguration, this);
        this.commitHandler = createCommitHandler(vertx, this.client, str, kafkaConnectorIncomingConfiguration, kafkaConnectorIncomingConfiguration.getCommitStrategy().orElse(Boolean.parseBoolean(this.client.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) ? KafkaCommitHandler.Strategy.IGNORE.name() : KafkaCommitHandler.Strategy.THROTTLED.name()));
        this.failureHandler = createFailureHandler(kafkaConnectorIncomingConfiguration, this.client.configuration(), kafkaCDIEvents);
        if (this.configuration.getHealthEnabled().booleanValue()) {
            this.health = new KafkaSourceHealth(this, this.configuration, this.client);
        } else {
            this.health = null;
        }
        this.isTracingEnabled = this.configuration.getTracingEnabled().booleanValue();
        this.isHealthEnabled = this.configuration.getHealthEnabled().booleanValue();
        this.isCloudEventEnabled = this.configuration.getCloudEvents().booleanValue();
        this.channel = this.configuration.getChannel();
        kafkaCDIEvents.consumer().fire(this.client.unwrap());
        if (this.commitHandler instanceof ContextHolder) {
            ((ContextHolder) this.commitHandler).capture(this.context);
        }
        this.client.setRebalanceListener();
        if (kafkaConnectorIncomingConfiguration.getBatch().booleanValue()) {
            Multi transformToUniAndConcatenate = (pattern != null ? this.client.subscribeBatch(pattern) : this.client.subscribeBatch(this.topics)).onSubscription().invoke(() -> {
                this.subscribed = true;
                String str2 = this.client.get("group.id");
                KafkaLogging.log.connectedToKafka(this.client.get("client.id"), kafkaConnectorIncomingConfiguration.getBootstrapServers(), str2, this.topics);
            }).onFailure().invoke(th -> {
                KafkaLogging.log.unableToReadRecord(this.topics, th);
                reportFailure(th, false);
            }).onItem().transformToUniAndConcatenate(consumerRecords -> {
                return receiveBatchRecord(new IncomingKafkaRecordBatch<>(consumerRecords, this.commitHandler, this.failureHandler, this.isCloudEventEnabled, this.isTracingEnabled));
            });
            this.batchStream = (kafkaConnectorIncomingConfiguration.getTracingEnabled().booleanValue() ? transformToUniAndConcatenate.onItem().invoke(this::incomingTrace) : transformToUniAndConcatenate).onFailure().invoke(th2 -> {
                reportFailure(th2, false);
            });
            this.stream = null;
        } else {
            Multi transformToUniAndConcatenate2 = (pattern != null ? this.client.subscribe(pattern) : this.client.subscribe(this.topics)).onSubscription().invoke(() -> {
                this.subscribed = true;
                String str2 = this.client.get("group.id");
                KafkaLogging.log.connectedToKafka(this.client.get("client.id"), kafkaConnectorIncomingConfiguration.getBootstrapServers(), str2, this.topics);
            }).onFailure().invoke(th3 -> {
                KafkaLogging.log.unableToReadRecord(this.topics, th3);
                reportFailure(th3, false);
            }).onItem().transformToUniAndConcatenate(consumerRecord -> {
                return this.commitHandler.received(new IncomingKafkaRecord<>(consumerRecord, this.commitHandler, this.failureHandler, this.isCloudEventEnabled, this.isTracingEnabled));
            });
            this.stream = (kafkaConnectorIncomingConfiguration.getTracingEnabled().booleanValue() ? transformToUniAndConcatenate2.onItem().invoke(incomingKafkaRecord -> {
                incomingTrace(incomingKafkaRecord, false);
            }) : transformToUniAndConcatenate2).onFailure().invoke(th4 -> {
                reportFailure(th4, false);
            });
            this.batchStream = null;
        }
    }

    public Set<String> getSubscribedTopics() {
        return this.topics;
    }

    private Set<String> getTopics(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration) {
        String orElse = kafkaConnectorIncomingConfiguration.getTopics().orElse(null);
        String orElse2 = kafkaConnectorIncomingConfiguration.getTopic().orElse(null);
        String channel = kafkaConnectorIncomingConfiguration.getChannel();
        boolean booleanValue = kafkaConnectorIncomingConfiguration.getPattern().booleanValue();
        if (orElse != null && orElse2 != null) {
            throw new IllegalArgumentException("The Kafka incoming configuration for channel `" + channel + "` cannot use `topics` and `topic` at the same time");
        }
        if (orElse == null || !booleanValue) {
            return orElse != null ? (Set) Arrays.stream(orElse.split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toSet()) : orElse2 != null ? Collections.singleton(orElse2) : Collections.singleton(channel);
        }
        throw new IllegalArgumentException("The Kafka incoming configuration for channel `" + channel + "` cannot use `topics` and `pattern` at the same time");
    }

    public synchronized void reportFailure(Throwable th, boolean z) {
        if (th instanceof RebalanceInProgressException) {
            KafkaLogging.log.failureReportedDuringRebalance(this.topics, th);
            return;
        }
        KafkaLogging.log.failureReported(this.topics, th);
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
        if (!z || this.client == null) {
            return;
        }
        this.client.close();
    }

    public void incomingTrace(IncomingKafkaRecord<K, V> incomingKafkaRecord, boolean z) {
        if (!this.isTracingEnabled || KafkaConnector.TRACER == null) {
            return;
        }
        TracingMetadata orElse = TracingMetadata.fromMessage(incomingKafkaRecord).orElse(TracingMetadata.empty());
        SpanBuilder spanKind = KafkaConnector.TRACER.spanBuilder(incomingKafkaRecord.getTopic() + " receive").setSpanKind(SpanKind.CONSUMER);
        Context previousContext = orElse.getPreviousContext();
        if (previousContext != null) {
            spanKind.setParent(previousContext);
        } else {
            spanKind.setNoParent();
        }
        Span startSpan = spanKind.startSpan();
        startSpan.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, incomingKafkaRecord.getPartition());
        startSpan.setAttribute("offset", incomingKafkaRecord.getOffset());
        startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_SYSTEM, (AttributeKey<String>) "kafka");
        startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_DESTINATION, (AttributeKey<String>) incomingKafkaRecord.getTopic());
        startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_DESTINATION_KIND, (AttributeKey<String>) SemanticAttributes.MessagingDestinationKindValues.TOPIC);
        String str = this.client.get("group.id");
        String str2 = this.client.get("client.id");
        startSpan.setAttribute("messaging.consumer_id", constructConsumerId(str, str2));
        startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, (AttributeKey<String>) str);
        if (!str2.isEmpty()) {
            startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, (AttributeKey<String>) str2);
        }
        if (!z) {
            startSpan.makeCurrent();
        }
        incomingKafkaRecord.injectTracingMetadata(orElse.withSpan(startSpan));
        startSpan.end();
    }

    private String constructConsumerId(String str, String str2) {
        String str3 = str;
        if (!str2.isEmpty()) {
            str3 = str3 + " - " + str2;
        }
        return str3;
    }

    public void incomingTrace(IncomingKafkaRecordBatch<K, V> incomingKafkaRecordBatch) {
        if (!this.isTracingEnabled || KafkaConnector.TRACER == null) {
            return;
        }
        Iterator<KafkaRecord<K, V>> it = incomingKafkaRecordBatch.getRecords().iterator();
        while (it.hasNext()) {
            incomingTrace((IncomingKafkaRecord) it.next().unwrap(IncomingKafkaRecord.class), true);
        }
    }

    private Uni<IncomingKafkaRecordBatch<K, V>> receiveBatchRecord(IncomingKafkaRecordBatch<K, V> incomingKafkaRecordBatch) {
        ArrayList arrayList = new ArrayList();
        Iterator<KafkaRecord<K, V>> it = incomingKafkaRecordBatch.getLatestOffsetRecords().values().iterator();
        while (it.hasNext()) {
            arrayList.add(this.commitHandler.received((IncomingKafkaRecord) it.next().unwrap(IncomingKafkaRecord.class)));
        }
        return arrayList.size() == 0 ? Uni.createFrom().item((UniCreate) incomingKafkaRecordBatch) : arrayList.size() == 1 ? ((Uni) arrayList.get(0)).onItem().transform(incomingKafkaRecord -> {
            return incomingKafkaRecordBatch;
        }) : Uni.combine().all().unis(arrayList).combinedWith(list -> {
            return incomingKafkaRecordBatch;
        });
    }

    private KafkaFailureHandler createFailureHandler(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Map<String, ?> map, KafkaCDIEvents kafkaCDIEvents) {
        String failureStrategy = kafkaConnectorIncomingConfiguration.getFailureStrategy();
        switch (KafkaFailureHandler.Strategy.from(failureStrategy)) {
            case FAIL:
                return new KafkaFailStop(kafkaConnectorIncomingConfiguration.getChannel(), this);
            case IGNORE:
                return new KafkaIgnoreFailure(kafkaConnectorIncomingConfiguration.getChannel());
            case DEAD_LETTER_QUEUE:
                return KafkaDeadLetterQueue.create(map, kafkaConnectorIncomingConfiguration, this, kafkaCDIEvents);
            default:
                throw KafkaExceptions.ex.illegalArgumentInvalidFailureStrategy(failureStrategy);
        }
    }

    private KafkaCommitHandler createCommitHandler(Vertx vertx, ReactiveKafkaConsumer<K, V> reactiveKafkaConsumer, String str, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, String str2) {
        switch (KafkaCommitHandler.Strategy.from(str2)) {
            case LATEST:
                KafkaLogging.log.commitStrategyForChannel("latest", kafkaConnectorIncomingConfiguration.getChannel());
                return new KafkaLatestCommit(vertx, this.configuration, reactiveKafkaConsumer);
            case IGNORE:
                KafkaLogging.log.commitStrategyForChannel("ignore", kafkaConnectorIncomingConfiguration.getChannel());
                return new KafkaIgnoreCommit();
            case THROTTLED:
                KafkaLogging.log.commitStrategyForChannel("throttled", kafkaConnectorIncomingConfiguration.getChannel());
                return KafkaThrottledLatestProcessedCommit.create(vertx, reactiveKafkaConsumer, str, kafkaConnectorIncomingConfiguration, this);
            default:
                throw KafkaExceptions.ex.illegalArgumentInvalidCommitStrategy(str2);
        }
    }

    public Multi<IncomingKafkaRecord<K, V>> getStream() {
        return this.stream;
    }

    public Multi<IncomingKafkaRecordBatch<K, V>> getBatchStream() {
        return this.batchStream;
    }

    public void closeQuietly() {
        try {
            if (this.configuration.getGracefulShutdown().booleanValue()) {
                Duration ofMillis = Duration.ofMillis(this.configuration.getPollTimeout().intValue() * 2);
                if (((Boolean) this.client.runOnPollingThread(consumer -> {
                    Set<TopicPartition> assignment = consumer.assignment();
                    if (assignment.isEmpty()) {
                        return false;
                    }
                    KafkaLogging.log.pauseAllPartitionOnTermination();
                    consumer.pause(assignment);
                    return true;
                }).await().atMost(ofMillis)).booleanValue()) {
                    grace(ofMillis);
                }
            }
            this.commitHandler.terminate(this.configuration.getGracefulShutdown().booleanValue());
            this.failureHandler.terminate();
        } catch (Throwable th) {
            KafkaLogging.log.exceptionOnClose(th);
        }
        try {
            this.client.close();
        } catch (Throwable th2) {
            KafkaLogging.log.exceptionOnClose(th2);
        }
        if (this.health != null) {
            this.health.close();
        }
    }

    private void grace(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.isHealthEnabled) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.health == null || !this.configuration.getHealthReadinessEnabled().booleanValue()) {
            return;
        }
        this.health.isReady(healthReportBuilder);
    }

    public void isStarted(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.health != null) {
            this.health.isStarted(healthReportBuilder);
        }
    }

    public ReactiveKafkaConsumer<K, V> getConsumer() {
        return this.client;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getConsumerGroup() {
        return this.group;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getConsumerIndex() {
        return this.index;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Instance<DeserializationFailureHandler<?>> getDeserializationFailureHandlers() {
        return this.deserializationFailureHandlers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Instance<KafkaConsumerRebalanceListener> getConsumerRebalanceListeners() {
        return this.consumerRebalanceListeners;
    }

    public KafkaCommitHandler getCommitHandler() {
        return this.commitHandler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public io.vertx.mutiny.core.Context getContext() {
        return new io.vertx.mutiny.core.Context((io.vertx.core.Context) this.context);
    }

    public String getChannel() {
        return this.configuration.getChannel();
    }

    public boolean hasSubscribers() {
        return this.subscribed;
    }
}
