package org.jbpm.process.workitem.kafka;

import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.drools.scorecards.parser.xls.XLSKeywords;
import org.jbpm.process.workitem.core.AbstractLogOrThrowWorkItemHandler;
import org.jbpm.process.workitem.core.util.RequiredParameterValidator;
import org.jbpm.process.workitem.core.util.Wid;
import org.jbpm.process.workitem.core.util.WidMavenDepends;
import org.jbpm.process.workitem.core.util.WidParameter;
import org.jbpm.process.workitem.core.util.WidResult;
import org.jbpm.process.workitem.core.util.service.WidAction;
import org.jbpm.process.workitem.core.util.service.WidAuth;
import org.jbpm.process.workitem.core.util.service.WidService;
import org.keycloak.common.constants.ServiceAccountConstants;
import org.kie.api.runtime.process.WorkItem;
import org.kie.api.runtime.process.WorkItemManager;
import org.kie.internal.runtime.Cacheable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Wid(widfile = "KafkaWorkItem.wid", name = "KafkaPublishMessages", displayName = "KafkaPublishMessages", defaultHandler = "mvel: new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler(\"bootstrapServers\", \"clientId\", \"keySerializerClass\", \"valueSerializerClass\")", documentation = "kafka-workitem/index.html", category = "kafka-workitem", icon = "KafkaPublishMessages.png", parameters = {@WidParameter(name = "Topic", required = true), @WidParameter(name = "Key", required = true), @WidParameter(name = XLSKeywords.SCORECARD_CHARACTERISTIC_BIN_LABEL, required = true)}, results = {@WidResult(name = "Result")}, mavenDepends = {@WidMavenDepends(group = "org.jbpm.contrib", artifact = "kafka-workitem", version = "7.37.0-SNAPSHOT")}, serviceInfo = @WidService(category = "Kafka", description = "publish kafka messages from a process", keywords = "kafka,publish,message,topic", action = @WidAction(title = "Publish message to a kafka topic"), authinfo = @WidAuth(required = true, params = {"bootstrapServers", ServiceAccountConstants.CLIENT_ID, "keySerializerClass", "valueSerializerClass"}, paramsdescription = {"Bootstrap Servers", ServiceAccountConstants.CLIENT_ID_PROTOCOL_MAPPER, "Key Serializer class", "Value Serializer class"}, referencesite = "https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html")))
/* loaded from: input_file:service-tasks/kafka-workitem/kafka-workitem-7.37.0-SNAPSHOT.jar:org/jbpm/process/workitem/kafka/KafkaWorkItemHandler.class */
public class KafkaWorkItemHandler extends AbstractLogOrThrowWorkItemHandler implements Cacheable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaWorkItemHandler.class);
    private Producer<Long, String> producer;
    private static final String RESULTS_VALUE = "Result";

    public KafkaWorkItemHandler(Producer producer) {
        this.producer = producer;
    }

    public KafkaWorkItemHandler(String str, String str2, String str3, String str4) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("client.id", str2);
        properties.put("key.serializer", str3);
        properties.put("value.serializer", str4);
        this.producer = new KafkaProducer(properties);
    }

    @Override // org.kie.api.runtime.process.WorkItemHandler
    public void executeWorkItem(WorkItem workItem, WorkItemManager workItemManager) {
        try {
            RequiredParameterValidator.validate(getClass(), workItem);
            HashMap hashMap = new HashMap();
            try {
                this.producer.send(new ProducerRecord((String) workItem.getParameter("Topic"), (String) workItem.getParameter("Key"), (String) workItem.getParameter(XLSKeywords.SCORECARD_CHARACTERISTIC_BIN_LABEL)));
                hashMap.put("Result", "success");
                workItemManager.completeWorkItem(workItem.getId(), hashMap);
            } catch (Exception e) {
                LOG.error("Kafka error", (Throwable) e);
                this.producer.flush();
                hashMap.put("Result", "failure");
            }
        } catch (Exception e2) {
            LOG.error("Handler error", (Throwable) e2);
            handleException(e2);
        }
    }

    @Override // org.kie.api.runtime.process.WorkItemHandler
    public void abortWorkItem(WorkItem workItem, WorkItemManager workItemManager) {
    }

    @Override // org.kie.internal.runtime.Cacheable
    public void close() {
        if (this.producer != null) {
            this.producer.flush();
            this.producer.close();
        }
    }
}
