/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.event.emitters.kafka;

import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.HashMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jbpm.event.emitters.kafka.CloudEventSpec1;
import org.jbpm.persistence.api.integration.EventCollection;
import org.jbpm.persistence.api.integration.EventEmitter;
import org.jbpm.persistence.api.integration.InstanceView;
import org.jbpm.persistence.api.integration.base.BaseEventCollection;
import org.jbpm.persistence.api.integration.model.CaseInstanceView;
import org.jbpm.persistence.api.integration.model.ProcessInstanceView;
import org.jbpm.persistence.api.integration.model.TaskInstanceView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaEventEmitter
implements EventEmitter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventEmitter.class);
    private static final String SOURCE_FORMATTER = "/process/%s/%s";
    private ObjectMapper mapper;
    private Producer<String, byte[]> producer;

    public KafkaEventEmitter() {
        this(KafkaEventEmitter.getProducer());
    }

    KafkaEventEmitter(Producer<String, byte[]> producer) {
        this.producer = producer;
        this.mapper = new ObjectMapper().setDateFormat((DateFormat)new SimpleDateFormat(System.getProperty("org.kie.jbpm.event.emitters.kafka.date_format", System.getProperty("org.kie.server.json.date_format", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")))).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true);
    }

    public void deliver(Collection<InstanceView<?>> data) {
    }

    public void apply(Collection<InstanceView<?>> data) {
        if (data == null || data.isEmpty()) {
            return;
        }
        for (InstanceView<?> view : data) {
            String processId;
            long processInstanceId;
            String type;
            String topic;
            if (view instanceof ProcessInstanceView) {
                ProcessInstanceView processInstanceView = (ProcessInstanceView)view;
                topic = "processes";
                type = "process";
                processInstanceId = processInstanceView.getId();
                processId = processInstanceView.getProcessId();
            } else if (view instanceof TaskInstanceView) {
                TaskInstanceView taskInstanceView = (TaskInstanceView)view;
                topic = "tasks";
                type = "task";
                processInstanceId = taskInstanceView.getProcessInstanceId();
                processId = taskInstanceView.getProcessId();
            } else if (view instanceof CaseInstanceView) {
                CaseInstanceView caseInstanceView = (CaseInstanceView)view;
                topic = "cases";
                type = "case";
                processInstanceId = caseInstanceView.getId();
                processId = caseInstanceView.getCaseDefinitionId();
            } else {
                throw new UnsupportedOperationException("Unsupported view type " + view.getClass());
            }
            try {
                this.producer.send(new ProducerRecord(KafkaEventEmitter.getTopic(topic), (Object)this.mapper.writeValueAsBytes((Object)new CloudEventSpec1(type, String.format(SOURCE_FORMATTER, processId, processInstanceId), view))), (m, e) -> {
                    if (e != null) {
                        this.logError(view, e);
                    }
                });
            }
            catch (Exception e2) {
                this.logError(view, e2);
            }
        }
    }

    private void logError(InstanceView<?> view, Exception e) {
        logger.error("Error publishing view {}", view, (Object)e);
    }

    public void drop(Collection<InstanceView<?>> data) {
    }

    public void close() {
        this.producer.close();
    }

    public EventCollection newCollection() {
        return new BaseEventCollection();
    }

    private static Producer<String, byte[]> getProducer() {
        String clientId;
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", System.getProperty("org.kie.jbpm.event.emitters.kafka.boopstrap.servers", "localhost:9092"));
        String acks = System.getProperty("org.kie.jbpm.event.emitters.kafka.acks");
        if (acks != null) {
            configs.put("acks", acks);
        }
        if ((clientId = System.getProperty("org.kie.jbpm.event.emitters.kafka.client.id")) != null) {
            configs.put("client.id", clientId);
        }
        return new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new ByteArraySerializer());
    }

    private static String getTopic(String eventType) {
        return System.getProperty("org.kie.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType + "-events");
    }
}

