package org.jbpm.process.workitem.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.dashbuilder.dataset.json.KafkaDefJSONMarshaller;
import org.drools.core.process.instance.impl.WorkItemImpl;
import org.drools.scorecards.parser.xls.XLSKeywords;
import org.jbpm.executor.impl.wih.AsyncWorkItemHandlerCmdCallback;
import org.jbpm.process.workitem.core.AbstractLogOrThrowWorkItemHandler;
import org.jbpm.process.workitem.core.util.RequiredParameterValidator;
import org.jbpm.process.workitem.core.util.Wid;
import org.jbpm.process.workitem.core.util.WidMavenDepends;
import org.jbpm.process.workitem.core.util.WidParameter;
import org.jbpm.process.workitem.core.util.WidResult;
import org.jbpm.process.workitem.core.util.service.WidAction;
import org.jbpm.process.workitem.core.util.service.WidAuth;
import org.jbpm.process.workitem.core.util.service.WidService;
import org.jbpm.workbench.es.model.RequestDataSetConstants;
import org.keycloak.common.constants.ServiceAccountConstants;
import org.kie.api.executor.Command;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutionResults;
import org.kie.api.executor.ExecutorService;
import org.kie.api.runtime.process.WorkItem;
import org.kie.api.runtime.process.WorkItemManager;
import org.kie.internal.runtime.Cacheable;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Wid(widfile = "KafkaWorkItem.wid", name = "KafkaPublishMessages", displayName = "KafkaPublishMessages", defaultHandler = "mvel: new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler(\"bootstrapServers\", \"clientId\", \"keySerializerClass\", \"valueSerializerClass\")", documentation = "kafka-workitem/index.html", category = "kafka-workitem", icon = "KafkaPublishMessages.png", parameters = {@WidParameter(name = "Topic", required = true), @WidParameter(name = "Key", required = true), @WidParameter(name = XLSKeywords.SCORECARD_CHARACTERISTIC_BIN_LABEL, required = true)}, results = {@WidResult(name = "Result")}, mavenDepends = {@WidMavenDepends(group = "org.jbpm.contrib", artifact = "kafka-workitem", version = "7.52.1-SNAPSHOT")}, serviceInfo = @WidService(category = "Kafka", description = "publish kafka messages from a process", keywords = "kafka,publish,message,topic", action = @WidAction(title = "Publish message to a kafka topic"), authinfo = @WidAuth(required = true, params = {"bootstrapServers", "clientId", "keySerializerClass", "valueSerializerClass"}, paramsdescription = {"Bootstrap Servers", ServiceAccountConstants.CLIENT_ID_PROTOCOL_MAPPER, "Key Serializer class", "Value Serializer class"}, referencesite = "https://red.ht/kafka-wih-params")))
/* loaded from: input_file:service-tasks/kafka-workitem/kafka-workitem-7.52.1-SNAPSHOT.jar:org/jbpm/process/workitem/kafka/KafkaWorkItemHandler.class */
public class KafkaWorkItemHandler extends AbstractLogOrThrowWorkItemHandler implements Cacheable {
    private static String DEFAULT_HOST = "localhost:9092";
    private static String DEFAULT_KAFKA_CLIENT_ID = "jBPM-Kafka-PublishMessage";
    private static String DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    private static String PROPERTY_PREFIX = "org.jbpm.process.workitem.kafka.";
    private static String GLOBAL_RECONNECT_BACKOFF_MAX_MS = PROPERTY_PREFIX + "reconnect.backoff.max.ms";
    private static String GLOBAL_RECONNECT_BACKOFF_MS = PROPERTY_PREFIX + "reconnect.backoff.ms";
    private static String GLOBAL_REQUEST_TIMEOUT_MS = PROPERTY_PREFIX + "request.timeout.ms";
    private static String GLOBAL_RETRIES = PROPERTY_PREFIX + RequestDataSetConstants.COLUMN_RETRIES;
    private static String GLOBAL_RETRY_BACKOFF_MS = PROPERTY_PREFIX + "retry.backoff.ms";
    private static String GLOBAL_ENABLE_IDEMPOTENCE = PROPERTY_PREFIX + "enable.idempotence";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaWorkItemHandler.class);
    private static Map<Properties, Producer> producers = new ConcurrentHashMap();
    private ExecutorService executorService;
    private Properties properties;
    private static final String RESULTS_VALUE = "Result";

    /* loaded from: input_file:service-tasks/kafka-workitem/kafka-workitem-7.52.1-SNAPSHOT.jar:org/jbpm/process/workitem/kafka/KafkaWorkItemHandler$KafkaWorkItemHandlerProducerCommand.class */
    public static class KafkaWorkItemHandlerProducerCommand implements Command {
        @Override // org.kie.api.executor.Command
        public ExecutionResults execute(CommandContext commandContext) throws Exception {
            String str = (String) commandContext.getData().get(KafkaDefJSONMarshaller.TOPIC);
            Object obj = commandContext.getData().get("key");
            Object obj2 = commandContext.getData().get("value");
            Properties properties = (Properties) commandContext.getData().get("producerProperties");
            KafkaWorkItemHandler.LOG.debug("Kafka WorkItem Handler {} about to send to topic {} key {} and value {}", properties, str, obj, obj2);
            ((Producer) KafkaWorkItemHandler.producers.get(properties)).send(new ProducerRecord(str, obj, obj2)).get();
            KafkaWorkItemHandler.LOG.debug("Kafka WorkItem Handler {} sent to topic {} key {} and value {}", properties, str, obj, obj2);
            ExecutionResults executionResults = new ExecutionResults();
            executionResults.setData("Result", "success");
            return executionResults;
        }
    }

    public KafkaWorkItemHandler(Properties properties, Producer producer) {
        this.properties = properties;
        producers.put(properties, producer);
    }

    public KafkaWorkItemHandler() {
        this(DEFAULT_HOST, DEFAULT_KAFKA_CLIENT_ID, DEFAULT_SERIALIZER, DEFAULT_SERIALIZER);
    }

    public KafkaWorkItemHandler(ClassLoader classLoader) {
        this(DEFAULT_HOST, DEFAULT_KAFKA_CLIENT_ID, DEFAULT_SERIALIZER, DEFAULT_SERIALIZER, classLoader);
    }

    public KafkaWorkItemHandler(ClassLoader classLoader, InternalRuntimeManager internalRuntimeManager) {
        this(DEFAULT_HOST, DEFAULT_KAFKA_CLIENT_ID, DEFAULT_SERIALIZER, DEFAULT_SERIALIZER, classLoader, internalRuntimeManager);
    }

    public KafkaWorkItemHandler(String str, String str2, String str3, String str4) {
        this(str, str2, str3, str4, KafkaProducer.class.getClassLoader());
    }

    public KafkaWorkItemHandler(String str, String str2, String str3, String str4, ClassLoader classLoader) {
        this(str, str2, str3, str4, classLoader, null);
    }

    public KafkaWorkItemHandler(String str, String str2, String str3, String str4, ClassLoader classLoader, InternalRuntimeManager internalRuntimeManager) {
        this.properties = new Properties();
        this.properties.put("bootstrap.servers", !isEmpty(str) ? str : DEFAULT_HOST);
        this.properties.put("client.id", !isEmpty(str2) ? str2 : DEFAULT_KAFKA_CLIENT_ID);
        this.properties.put("key.serializer", !isEmpty(str3) ? str3 : DEFAULT_SERIALIZER);
        this.properties.put("value.serializer", !isEmpty(str4) ? str4 : DEFAULT_SERIALIZER);
        String property = System.getProperty(GLOBAL_RECONNECT_BACKOFF_MAX_MS);
        if (property != null) {
            this.properties.put("reconnect.backoff.max.ms", property);
        }
        String property2 = System.getProperty(GLOBAL_RECONNECT_BACKOFF_MS);
        if (property2 != null) {
            this.properties.put("reconnect.backoff.ms", property2);
        }
        String property3 = System.getProperty(GLOBAL_REQUEST_TIMEOUT_MS);
        if (property3 != null) {
            this.properties.put("request.timeout.ms", property3);
        }
        String property4 = System.getProperty(GLOBAL_RETRIES);
        if (property4 != null) {
            this.properties.put(RequestDataSetConstants.COLUMN_RETRIES, property4);
        }
        String property5 = System.getProperty(GLOBAL_RETRY_BACKOFF_MS);
        if (property5 != null) {
            this.properties.put("retry.backoff.ms", property5);
        }
        String property6 = System.getProperty(GLOBAL_ENABLE_IDEMPOTENCE);
        if (property6 != null) {
            this.properties.put("enable.idempotence", property6);
        }
        if (internalRuntimeManager != null) {
            this.executorService = (ExecutorService) internalRuntimeManager.getEnvironment().getEnvironment().get("ExecutorService");
            LOG.info("Kafka WorkItem Handler Producer created with async {}", this.properties);
        } else {
            LOG.info("Kafka WorkItem Handler Producer created with sync for {}", this.properties);
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(classLoader);
            producers.computeIfAbsent(this.properties, properties -> {
                return new KafkaProducer(properties);
            });
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public boolean isEmpty(String str) {
        return str == null || str.isEmpty();
    }

    @Override // org.kie.api.runtime.process.WorkItemHandler
    public void executeWorkItem(WorkItem workItem, WorkItemManager workItemManager) {
        try {
            RequiredParameterValidator.validate(getClass(), workItem);
            String str = (String) workItem.getParameter("Topic");
            Object parameter = workItem.getParameter("Key");
            Object parameter2 = workItem.getParameter(XLSKeywords.SCORECARD_CHARACTERISTIC_BIN_LABEL);
            if (this.executorService == null || !this.executorService.isActive()) {
                producers.get(this.properties).send(new ProducerRecord(str, parameter, parameter2)).get();
                HashMap hashMap = new HashMap();
                hashMap.put("Result", "success");
                workItemManager.completeWorkItem(workItem.getId(), hashMap);
            } else {
                CommandContext commandContext = new CommandContext();
                commandContext.setData("workItem", workItem);
                commandContext.setData("processInstanceId", Long.valueOf(getProcessInstanceId(workItem)));
                commandContext.setData("deploymentId", ((WorkItemImpl) workItem).getDeploymentId());
                commandContext.setData("callbacks", AsyncWorkItemHandlerCmdCallback.class.getName());
                commandContext.setData(KafkaDefJSONMarshaller.TOPIC, str);
                commandContext.setData("key", parameter);
                commandContext.setData("value", parameter2);
                commandContext.setData("producerProperties", this.properties);
                LOG.debug("Request Kafka producer successfully with id {}", this.executorService.scheduleRequest(KafkaWorkItemHandlerProducerCommand.class.getName(), commandContext));
            }
        } catch (Exception e) {
            LOG.error("Handler error", (Throwable) e);
            handleException(e);
        }
    }

    @Override // org.kie.api.runtime.process.WorkItemHandler
    public void abortWorkItem(WorkItem workItem, WorkItemManager workItemManager) {
    }

    @Override // org.kie.internal.runtime.Cacheable
    public void close() {
        if (producers == null || !producers.containsKey(this.properties)) {
            return;
        }
        Producer remove = producers.remove(this.properties);
        remove.flush();
        remove.close();
    }

    protected long getProcessInstanceId(WorkItem workItem) {
        return ((WorkItemImpl) workItem).getProcessInstanceId();
    }
}
