package io.streamzi.eventflow.runtime.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.streamzi.cloudevents.CloudEvent;
import io.streamzi.cloudevents.impl.CloudEventImpl;
import io.streamzi.eventflow.annotations.ObjectType;
import io.streamzi.eventflow.runtime.CloudEventInput;
import io.streamzi.eventflow.utils.EnvironmentResolver;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:io/streamzi/eventflow/runtime/kafka/KafkaCloudEventInputImpl.class */
public class KafkaCloudEventInputImpl extends CloudEventInput implements Runnable {
    private static final Logger logger = Logger.getLogger(KafkaCloudEventInputImpl.class.getName());
    private ObjectMapper mapper;
    private Consumer<String, String> consumer;
    private String topicName;
    private String bootstrapServers;
    private volatile boolean stopInput;
    private Thread pollThread;

    /* renamed from: io.streamzi.eventflow.runtime.kafka.KafkaCloudEventInputImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/streamzi/eventflow/runtime/kafka/KafkaCloudEventInputImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$streamzi$eventflow$annotations$ObjectType = new int[ObjectType.values().length];

        static {
            try {
                $SwitchMap$io$streamzi$eventflow$annotations$ObjectType[ObjectType.CLOUDEVENT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$streamzi$eventflow$annotations$ObjectType[ObjectType.OBJECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public KafkaCloudEventInputImpl(Object obj, Method method) {
        super(obj, method);
        this.stopInput = false;
        if (EnvironmentResolver.exists(this.inputName + "_BOOTSTRAP_SERVERS")) {
            logger.info("Bootstrap server Env exists for input: " + this.inputName);
            this.bootstrapServers = EnvironmentResolver.get(this.inputName + "_BOOTSTRAP_SERVERS");
        } else {
            logger.warning("No Bootstrap server Env exists for input: " + this.inputName);
            this.bootstrapServers = EnvironmentResolver.get("STREAMZI_KAFKA_BOOTSTRAP_SERVER");
        }
        logger.info("Kafka broker defined at: " + this.bootstrapServers);
        this.topicName = EnvironmentResolver.get(this.inputName);
        logger.info("Input will connect to topic: " + this.topicName);
        this.mapper = new ObjectMapper();
        this.mapper.registerModule(new Jdk8Module());
        this.mapper.registerModule(new JavaTimeModule());
        this.mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        this.mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    }

    @Override // io.streamzi.eventflow.runtime.CloudEventInput
    public void startInput() {
        logger.info("Starting input: " + this.inputName);
        if (this.consumer == null) {
            this.consumer = createConsumer();
            this.pollThread = new Thread(this);
            this.pollThread.setDaemon(true);
            this.pollThread.start();
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:10:0x0037. Please report as an issue. */
    @Override // java.lang.Runnable
    public void run() {
        while (!this.stopInput) {
            try {
                try {
                    Iterator it = this.consumer.poll(100L).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        try {
                            switch (AnonymousClass1.$SwitchMap$io$streamzi$eventflow$annotations$ObjectType[this.objectType.ordinal()]) {
                                case 1:
                                    this.consumerMethod.invoke(this.consumerObject, (CloudEvent) this.mapper.readValue((String) consumerRecord.value(), CloudEventImpl.class));
                                    break;
                                case 2:
                                default:
                                    this.consumerMethod.invoke(this.consumerObject, this.mapper.readValue((String) consumerRecord.value(), Object.class));
                                    break;
                            }
                        } catch (Exception e) {
                            logger.warning("Error running consumer method: " + e.getMessage());
                        }
                    }
                } catch (Throwable th) {
                    this.consumer.close();
                    throw th;
                }
            } catch (WakeupException e2) {
                if (this.stopInput) {
                    this.consumer.close();
                    return;
                } else {
                    logger.info("Wakeup exception");
                    throw e2;
                }
            }
        }
        this.consumer.close();
    }

    private Consumer<String, String> createConsumer() {
        if (this.topicName == null || this.bootstrapServers == null) {
            logger.log(Level.SEVERE, "Missing configuration data");
            return null;
        }
        logger.info("Attaching to topic: " + this.topicName);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("group.id", this.processorUuid);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("auto.commit.interval.ms", 1000);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(this.topicName));
        return kafkaConsumer;
    }

    @Override // io.streamzi.eventflow.runtime.CloudEventInput
    public void stopInput() {
        this.stopInput = true;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }
}
