package org.kie.server.services.jbpm.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.kie.api.runtime.process.ProcessInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kie-server-services-kafka-7.73.0-SNAPSHOT.jar:org/kie/server/services/jbpm/kafka/KafkaServerProducer.class */
class KafkaServerProducer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaServerProducer.class);
    private static KafkaServerProducer instance;
    private Producer<String, byte[]> producer;
    private Supplier<Producer<String, byte[]>> producerSupplier;
    private KafkaEventProcessorFactory factory;
    private KafkaSender kafkaSender;
    private Lock producerLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kie-server-services-kafka-7.73.0-SNAPSHOT.jar:org/kie/server/services/jbpm/kafka/KafkaServerProducer$KafkaSender.class */
    public interface KafkaSender {
        void send(String str, Object obj, ProcessInstance processInstance);
    }

    public static void init(KafkaEventProcessorFactory kafkaEventProcessorFactory, Supplier<Producer<String, byte[]>> supplier) {
        instance = new KafkaServerProducer(kafkaEventProcessorFactory, supplier);
    }

    public static void cleanup(Duration duration) {
        if (instance != null) {
            instance.close(duration);
            instance = null;
        }
    }

    public static void publish(ProcessInstance processInstance, String str, Object obj) {
        if (instance != null) {
            instance.sendEvent(processInstance, str, obj);
        }
    }

    private KafkaServerProducer(KafkaEventProcessorFactory kafkaEventProcessorFactory, Supplier<Producer<String, byte[]>> supplier) {
        this.factory = kafkaEventProcessorFactory;
        this.producerSupplier = supplier;
        this.kafkaSender = Boolean.getBoolean("org.kie.server.jbpm-kafka.ext.sync") ? this::sendSync : this::sendAsync;
    }

    private void close(Duration duration) {
        this.producerLock.lock();
        try {
            if (this.producer != null) {
                this.producer.close(duration);
                this.producer = null;
            }
        } finally {
            this.producerLock.unlock();
        }
    }

    private void sendEvent(ProcessInstance processInstance, String str, Object obj) {
        this.producerLock.lock();
        try {
            if (this.producer == null) {
                this.producer = this.producerSupplier.get();
            }
            String str2 = KafkaServerUtils.topicFromSignal(str);
            logger.debug("Publishing event {}  to topic {}", obj, str2);
            this.kafkaSender.send(str2, obj, processInstance);
        } finally {
            this.producerLock.unlock();
        }
    }

    private void sendAsync(String str, Object obj, ProcessInstance processInstance) {
        try {
            this.producer.send(new ProducerRecord<>(str, marshall(str, obj, processInstance)), (recordMetadata, exc) -> {
                if (exc != null) {
                    logError(obj, exc);
                }
            });
        } catch (Exception e) {
            logError(obj, e);
        }
    }

    private void sendSync(String str, Object obj, ProcessInstance processInstance) {
        try {
            this.producer.send(new ProducerRecord<>(str, marshall(str, obj, processInstance))).get();
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e2);
        } catch (ExecutionException e3) {
            if (!(e3.getCause() instanceof RuntimeException)) {
                throw new KafkaException(e3.getCause());
            }
            throw ((RuntimeException) e3.getCause());
        }
    }

    private byte[] marshall(String str, Object obj, ProcessInstance processInstance) throws IOException {
        return this.factory.getEventWriter(str).writeEvent(processInstance, obj);
    }

    private void logError(Object obj, Exception exc) {
        logger.error("Error publishing event {}", obj, exc);
    }
}
