/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.event.emitters.kafka;

import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jbpm.event.emitters.kafka.CloudEventSpec1;
import org.jbpm.persistence.api.integration.EventCollection;
import org.jbpm.persistence.api.integration.EventEmitter;
import org.jbpm.persistence.api.integration.InstanceView;
import org.jbpm.persistence.api.integration.base.BaseEventCollection;
import org.jbpm.persistence.api.integration.model.CaseInstanceView;
import org.jbpm.persistence.api.integration.model.ProcessInstanceView;
import org.jbpm.persistence.api.integration.model.TaskInstanceView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaEventEmitter
implements EventEmitter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventEmitter.class);
    private static final String SOURCE_FORMATTER = "/process/%s/%s";
    private ObjectMapper mapper;
    private ThreadLocal<Producer<String, byte[]>> localProducer = new ThreadLocal();

    public KafkaEventEmitter() {
        this.mapper = new ObjectMapper().setDateFormat((DateFormat)new SimpleDateFormat(System.getProperty("org.kie.jbpm.event.emitters.kafka.date_format", System.getProperty("org.kie.server.json.date_format", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")))).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true);
    }

    public void deliver(Collection<InstanceView<?>> data) {
        if (data == null || data.isEmpty()) {
            return;
        }
        Producer<String, byte[]> producer = this.getProducer();
        this.localProducer.set(producer);
        producer.initTransactions();
        producer.beginTransaction();
        for (InstanceView<?> view : data) {
            String processId;
            long processInstanceId;
            String type;
            String topic;
            if (view instanceof ProcessInstanceView) {
                ProcessInstanceView processInstanceView = (ProcessInstanceView)view;
                topic = "processes";
                type = "process";
                processInstanceId = processInstanceView.getId();
                processId = processInstanceView.getProcessId();
            } else if (view instanceof TaskInstanceView) {
                TaskInstanceView taskInstanceView = (TaskInstanceView)view;
                topic = "tasks";
                type = "task";
                processInstanceId = taskInstanceView.getProcessInstanceId();
                processId = taskInstanceView.getProcessId();
            } else if (view instanceof CaseInstanceView) {
                CaseInstanceView caseInstanceView = (CaseInstanceView)view;
                topic = "cases";
                type = "case";
                processInstanceId = caseInstanceView.getId();
                processId = caseInstanceView.getCaseDefinitionId();
            } else {
                throw new UnsupportedOperationException("Unsupported view type " + view.getClass());
            }
            try {
                producer.send(new ProducerRecord(KafkaEventEmitter.getTopic(topic), (Object)this.mapper.writeValueAsBytes((Object)new CloudEventSpec1(type, String.format(SOURCE_FORMATTER, processId, processInstanceId), view))));
            }
            catch (IOException ex) {
                throw new IllegalArgumentException("Error creating cloud event for view " + view, ex);
            }
        }
    }

    public void apply(Collection<InstanceView<?>> data) {
        try {
            this.localProducer.get().commitTransaction();
        }
        catch (KafkaException ex) {
            logger.error("Error publishing events " + data, (Throwable)ex);
            this.localProducer.get().abortTransaction();
            throw ex;
        }
        finally {
            this.localProducer.remove();
        }
    }

    public void drop(Collection<InstanceView<?>> data) {
        try {
            this.localProducer.get().abortTransaction();
        }
        finally {
            this.localProducer.remove();
        }
    }

    public void close() {
        this.localProducer.remove();
    }

    public EventCollection newCollection() {
        return new BaseEventCollection();
    }

    protected Producer<String, byte[]> getProducer() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", System.getProperty("org.kie.jbpm.event.emitters.kafka.boopstrap.servers", "localhost:9092"));
        configs.put("key.serializer", StringSerializer.class);
        configs.put("value.serializer", ByteArraySerializer.class.getName());
        configs.put("transactional.id", UUID.randomUUID().toString());
        String clientId = System.getProperty("org.kie.jbpm.event.emitters.kafka.client.id");
        if (clientId != null) {
            configs.put("client.id", clientId);
        }
        return new KafkaProducer(configs);
    }

    private static String getTopic(String eventType) {
        return System.getProperty("org.kie.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType + "-events");
    }
}

