package org.kie.server.services.jbpm.kafka;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jbpm.services.api.DeploymentEvent;
import org.kie.api.event.process.DefaultProcessEventListener;
import org.kie.api.event.process.MessageEvent;
import org.kie.api.event.process.SignalEvent;
import org.kie.api.runtime.process.ProcessInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/server/services/jbpm/kafka/KafkaServerProducer.class */
class KafkaServerProducer extends DefaultProcessEventListener {
    private static final Logger logger = LoggerFactory.getLogger(KafkaServerProducer.class);
    private Producer<String, byte[]> producer;
    private Supplier<Producer<String, byte[]>> producerSupplier;
    private KafkaEventProcessorFactory factory;
    private AtomicBoolean producerReady = new AtomicBoolean();

    public KafkaServerProducer(KafkaEventProcessorFactory kafkaEventProcessorFactory, Supplier<Producer<String, byte[]>> supplier) {
        this.factory = kafkaEventProcessorFactory;
        this.producerSupplier = supplier;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close(Duration duration) {
        if (this.producerReady.compareAndSet(true, false)) {
            this.producer.close(duration);
        }
    }

    public void onMessage(MessageEvent messageEvent) {
        if (KafkaServerUtils.processMessages()) {
            sendEvent(messageEvent.getProcessInstance(), messageEvent.getMessageName(), messageEvent.getMessage());
        }
    }

    public void onSignal(SignalEvent signalEvent) {
        if (KafkaServerUtils.processSignals(signalEvent)) {
            sendEvent(signalEvent.getProcessInstance(), signalEvent.getSignalName(), signalEvent.getSignal());
        }
    }

    private void sendEvent(ProcessInstance processInstance, String str, Object obj) {
        if (this.producerReady.compareAndSet(false, true)) {
            this.producer = this.producerSupplier.get();
        }
        try {
            String str2 = KafkaServerUtils.topicFromSignal(str);
            this.producer.send(new ProducerRecord(str2, this.factory.getEventWriter(str2).writeEvent(processInstance, obj)), (recordMetadata, exc) -> {
                if (exc != null) {
                    logError(obj, exc);
                }
            });
        } catch (Exception e) {
            logError(obj, e);
        }
    }

    private void logError(Object obj, Exception exc) {
        logger.error("Error publishing event {}", obj, exc);
    }

    public void activate(DeploymentEvent deploymentEvent) {
        deploymentEvent.getDeployedUnit().getRuntimeManager().getEnvironment().getRegisterableItemsFactory().addProcessListener(this);
    }

    public void deactivate(DeploymentEvent deploymentEvent) {
    }
}
