/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaPollingThread;
import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners;
import io.vertx.core.Context;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class ReactiveKafkaConsumer<K, V>
implements KafkaConsumer<K, V> {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private final KafkaSource<K, V> source;
    private Consumer<K, V> consumer;
    private final KafkaConnectorIncomingConfiguration configuration;
    private final Duration pollTimeout;
    private ConsumerRebalanceListener rebalanceListener;
    private final AtomicBoolean paused = new AtomicBoolean();
    private final ScheduledExecutorService kafkaWorker;
    private final KafkaRecordStream<K, V> stream;
    private final Map<String, Object> kafkaConfiguration;

    public ReactiveKafkaConsumer(KafkaConnectorIncomingConfiguration config, KafkaSource<K, V> source) {
        this.configuration = config;
        this.source = source;
        this.kafkaConfiguration = this.getKafkaConsumerConfiguration(this.configuration, source.getConsumerGroup(), source.getConsumerIndex());
        Instance<DeserializationFailureHandler<?>> failureHandlers = source.getDeserializationFailureHandlers();
        String keyDeserializerCN = (String)this.kafkaConfiguration.get("key.deserializer");
        String valueDeserializerCN = (String)this.kafkaConfiguration.get("value.deserializer");
        if (valueDeserializerCN == null) {
            throw KafkaExceptions.ex.missingValueDeserializer(config.getChannel(), config.getChannel());
        }
        DeserializerWrapper keyDeserializer = new DeserializerWrapper(keyDeserializerCN, true, this.getDeserializationHandler(true, failureHandlers), source);
        DeserializerWrapper valueDeserializer = new DeserializerWrapper(valueDeserializerCN, false, this.getDeserializationHandler(false, failureHandlers), source);
        keyDeserializer.configure(this.kafkaConfiguration, true);
        valueDeserializer.configure(this.kafkaConfiguration, false);
        this.pollTimeout = Duration.ofMillis(config.getPollTimeout().intValue());
        this.kafkaWorker = Executors.newSingleThreadScheduledExecutor(KafkaPollingThread::new);
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(this.kafkaConfiguration, keyDeserializer, valueDeserializer);
        this.stream = new KafkaRecordStream(this, config, source.getContext().getDelegate());
    }

    public void setRebalanceListener() {
        try {
            this.rebalanceListener = RebalanceListeners.createRebalanceListener(this, this.configuration, this.source.getConsumerGroup(), this.source.getConsumerRebalanceListeners(), this.source.getCommitHandler());
        }
        catch (Exception e) {
            this.close();
            throw e;
        }
    }

    public ConsumerRebalanceListener getRebalanceListener() {
        return this.rebalanceListener;
    }

    @Override
    public <T> Uni<T> runOnPollingThread(Function<Consumer<K, V>, T> action) {
        return Uni.createFrom().item(() -> action.apply(this.consumer)).runSubscriptionOn(this.kafkaWorker);
    }

    @Override
    public Uni<Void> runOnPollingThread(java.util.function.Consumer<Consumer<K, V>> action) {
        return Uni.createFrom().item(() -> {
            action.accept(this.consumer);
            return null;
        }).runSubscriptionOn(this.kafkaWorker);
    }

    Uni<Void> executeWithDelay(Runnable action, Duration delay) {
        return Uni.createFrom().emitter(e -> this.kafkaWorker.schedule(() -> {
            try {
                action.run();
            }
            catch (Exception ex) {
                e.fail(ex);
                return;
            }
            e.complete(null);
        }, delay.toMillis(), TimeUnit.MILLISECONDS));
    }

    Uni<ConsumerRecords<K, V>> poll() {
        if (this.polling.compareAndSet(false, true)) {
            return this.runOnPollingThread((Function)c -> this.paused.get() ? c.poll(Duration.ZERO) : c.poll(this.pollTimeout)).eventually(() -> this.polling.set(false)).onFailure(WakeupException.class).recoverWithItem(ConsumerRecords.EMPTY);
        }
        return Uni.createFrom().item(ConsumerRecords.EMPTY);
    }

    @Override
    public Uni<Set<TopicPartition>> pause() {
        if (this.paused.compareAndSet(false, true)) {
            return this.runOnPollingThread((Function)c -> {
                Set<TopicPartition> tps = this.consumer.assignment();
                this.consumer.pause(tps);
                return tps;
            });
        }
        return this.runOnPollingThread((Function)Consumer::paused);
    }

    @Override
    public Uni<Set<TopicPartition>> paused() {
        return this.runOnPollingThread((Function)Consumer::paused);
    }

    @Override
    public Uni<Map<TopicPartition, OffsetAndMetadata>> committed(TopicPartition ... tps) {
        return this.runOnPollingThread((Function)c -> c.committed(new LinkedHashSet<TopicPartition>(Arrays.asList(tps))));
    }

    public Multi<ConsumerRecord<K, V>> subscribe(Set<String> topics) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.subscribe(topics, this.rebalanceListener)).onItem().transformToMulti(v -> this.stream);
    }

    public Multi<ConsumerRecord<K, V>> subscribe(Pattern topics) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.subscribe(topics, this.rebalanceListener)).onItem().transformToMulti(v -> this.stream);
    }

    @Override
    public Uni<Void> resume() {
        if (this.paused.get()) {
            return this.runOnPollingThread((Consumer<K, V> c) -> {
                Set<TopicPartition> assignment = c.assignment();
                this.consumer.resume(assignment);
            }).invoke(() -> this.paused.set(false));
        }
        return Uni.createFrom().voidItem();
    }

    private Map<String, Object> getKafkaConsumerConfiguration(KafkaConnectorIncomingConfiguration configuration, String consumerGroup, int index) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        JsonHelper.asJsonObject(configuration.config()).forEach(e -> map.put((String)e.getKey(), e.getValue().toString()));
        map.put("group.id", consumerGroup);
        if (!map.containsKey("reconnect.backoff.max.ms")) {
            map.put("reconnect.backoff.max.ms", "10000");
        }
        String servers = configuration.getBootstrapServers();
        if (!map.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", servers);
            map.put("bootstrap.servers", servers);
        }
        if (!map.containsKey("key.deserializer")) {
            KafkaLogging.log.keyDeserializerOmitted();
            map.put("key.deserializer", configuration.getKeyDeserializer());
        }
        if (!map.containsKey("enable.auto.commit")) {
            KafkaLogging.log.disableAutoCommit(configuration.getChannel());
            map.put("enable.auto.commit", "false");
        }
        if (!map.containsKey("client.id")) {
            String name = "kafka-consumer-" + configuration.getChannel();
            if (index != -1) {
                name = name + "-" + index;
            }
            map.put("client.id", name);
        }
        ConfigurationCleaner.cleanupConsumerConfiguration(map);
        return map;
    }

    private <T> DeserializationFailureHandler<T> getDeserializationHandler(boolean isKey, Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers) {
        return ReactiveKafkaConsumer.createDeserializationFailureHandler(isKey, deserializationFailureHandlers, this.configuration);
    }

    private static <T> DeserializationFailureHandler<T> createDeserializationFailureHandler(boolean isKey, Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers, KafkaConnectorIncomingConfiguration configuration) {
        String name;
        String string = name = isKey ? (String)configuration.getKeyDeserializationFailureHandler().orElse(null) : (String)configuration.getValueDeserializationFailureHandler().orElse(null);
        if (name == null) {
            return null;
        }
        Instance<DeserializationFailureHandler<?>> matching = deserializationFailureHandlers.select(Identifier.Literal.of(name));
        if (matching.isUnsatisfied() && !(matching = deserializationFailureHandlers.select(NamedLiteral.of(name))).isUnsatisfied()) {
            ProviderLogging.log.deprecatedNamed();
        }
        if (matching.isUnsatisfied()) {
            throw KafkaExceptions.ex.unableToFindDeserializationFailureHandler(name, configuration.getChannel());
        }
        if (matching.stream().count() > 1L) {
            throw KafkaExceptions.ex.unableToFindDeserializationFailureHandler(name, configuration.getChannel(), (int)matching.stream().count());
        }
        if (matching.stream().count() == 1L) {
            return (DeserializationFailureHandler)matching.get();
        }
        return null;
    }

    public String get(String attribute) {
        return (String)this.kafkaConfiguration.get(attribute);
    }

    @Override
    public Consumer<K, V> unwrap() {
        return this.consumer;
    }

    @Override
    public Uni<Void> commit(Map<TopicPartition, OffsetAndMetadata> map) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.commitSync(map));
    }

    public Map<String, ?> configuration() {
        return this.kafkaConfiguration;
    }

    public void close() {
        int timeout = this.configuration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(1000);
        if (this.closed.compareAndSet(false, true)) {
            Uni<Void> uni = this.runOnPollingThread((Consumer<K, V> c) -> c.close(Duration.ofMillis(timeout))).onItem().invoke(this.kafkaWorker::shutdown);
            this.consumer.wakeup();
            if (Context.isOnEventLoopThread()) {
                uni.subscribeAsCompletionStage();
            } else {
                uni.await().atMost(Duration.ofMillis((long)timeout * 2L));
            }
        }
    }

    public void injectClient(MockConsumer<?, ?> consumer) {
        Consumer<K, V> cons = this.consumer;
        this.consumer = consumer;
        cons.close();
    }

    @Override
    public Uni<Map<TopicPartition, Long>> getPositions() {
        return this.runOnPollingThread((Function)c -> {
            HashMap map = new HashMap();
            c.assignment().forEach(tp -> map.put(tp, c.position((TopicPartition)tp)));
            return map;
        });
    }

    @Override
    public Uni<Set<TopicPartition>> getAssignments() {
        return this.runOnPollingThread((Function)Consumer::assignment);
    }

    @Override
    public Uni<Void> seek(TopicPartition partition, long offset) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.seek(partition, offset));
    }

    @Override
    public Uni<Void> seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.seek(partition, offsetAndMetadata));
    }

    @Override
    public Uni<Void> seekToBeginning(Collection<TopicPartition> partitions) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.seekToBeginning(partitions));
    }

    @Override
    public Uni<Void> seekToEnd(Collection<TopicPartition> partitions) {
        return this.runOnPollingThread((Consumer<K, V> c) -> c.seekToEnd(partitions));
    }

    boolean isClosed() {
        return this.closed.get();
    }

    boolean isPaused() {
        return this.paused.get();
    }
}

