package org.kie.remote.impl.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.kie.remote.message.Message;
import org.kie.remote.message.ResultMessage;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kie-remote-7.28.0-SNAPSHOT.jar:org/kie/remote/impl/producer/EventProducer.class */
public class EventProducer<T> implements Producer {
    private Logger logger = LoggerFactory.getLogger((Class<?>) EventProducer.class);
    protected org.apache.kafka.clients.producer.Producer<String, T> producer;

    @Override // org.kie.remote.impl.producer.Producer
    public void start(Properties properties) {
        this.producer = new KafkaProducer(properties);
    }

    @Override // org.kie.remote.impl.producer.Producer
    public void stop() {
        if (this.producer != null) {
            this.producer.flush();
            this.producer.close();
        }
    }

    @Override // org.kie.remote.impl.producer.Producer
    public <T> void produceSync(String str, String str2, ResultMessage<T> resultMessage) {
        internalProduceSync(str, str2, resultMessage);
    }

    @Override // org.kie.remote.impl.producer.Producer
    public void produceSync(String str, String str2, Message message) {
        internalProduceSync(str, str2, message);
    }

    protected void internalProduceSync(String str, String str2, Object obj) {
        try {
            this.producer.send(getFreshProducerRecord(str, str2, obj)).get();
        } catch (InterruptedException | ExecutionException e) {
            this.logger.error("Error in produceSync!", e);
        }
    }

    private ProducerRecord<String, T> getFreshProducerRecord(String str, String str2, Object obj) {
        return new ProducerRecord<>(str, str2, SerializationUtil.serialize(obj));
    }
}
