package io.streamzi.openshift.dataflow.container.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.streamzi.cloudevents.CloudEvent;
import io.streamzi.openshift.dataflow.container.CloudEventOutput;
import io.streamzi.openshift.dataflow.container.config.EnvironmentResolver;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:io/streamzi/openshift/dataflow/container/kafka/KafkaCloudEventOutputImpl.class */
public class KafkaCloudEventOutputImpl extends CloudEventOutput {
    private ObjectMapper mapper;
    private static final Logger logger = Logger.getLogger(KafkaCloudEventOutputImpl.class.getName());
    private String bootstrapServers;
    private volatile boolean connected;
    private String topicName;
    private Producer<String, String> producer;

    public KafkaCloudEventOutputImpl(Object obj, String str) {
        super(obj, str);
        this.connected = false;
        this.producer = null;
        this.bootstrapServers = EnvironmentResolver.get("STREAMZI_KAFKA_BOOTSTRAP_SERVER");
        this.topicName = EnvironmentResolver.get(str);
        this.mapper = new ObjectMapper();
        this.mapper.registerModule(new Jdk8Module());
        this.mapper.registerModule(new JavaTimeModule());
        this.mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        this.mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    }

    public void send(CloudEvent cloudEvent) {
        if (!this.connected) {
            logger.log(Level.WARNING, "Producer not connected");
            return;
        }
        try {
            this.producer.send(new ProducerRecord(this.topicName, cloudEvent.getEventType(), this.mapper.writeValueAsString(cloudEvent)));
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error sending event to Kafka: " + e.getMessage(), (Throwable) e);
        }
    }

    @Override // io.streamzi.openshift.dataflow.container.CloudEventOutput
    public void startOutput() {
        if (this.producer == null) {
            logger.info("Trying to connect to Kafka");
            try {
                this.producer = createProducer();
                this.connected = true;
                logger.info("Connected to Kafka");
            } catch (Exception e) {
                logger.warning("Cannot connect to Kafka: " + e.getMessage());
            }
        }
    }

    @Override // io.streamzi.openshift.dataflow.container.CloudEventOutput
    public void stopOutput() {
        this.connected = false;
        if (this.producer != null) {
            this.producer.close();
        }
    }

    private Producer<String, String> createProducer() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("client.id", this.processorUuid);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer(properties);
    }
}
