/*
 * Decompiled with CFR 0.152.
 */
package org.kie.remote.impl.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.kie.remote.impl.producer.Producer;
import org.kie.remote.message.Message;
import org.kie.remote.message.ResultMessage;
import org.kie.remote.util.SerializationUtil;

public class EventProducer<T>
implements Producer {
    protected org.apache.kafka.clients.producer.Producer<String, T> producer;

    @Override
    public void start(Properties properties) {
        this.producer = new KafkaProducer(properties);
    }

    @Override
    public void stop() {
        if (this.producer != null) {
            this.producer.flush();
            this.producer.close();
        }
    }

    @Override
    public <T> void produceSync(String topicName, String key, ResultMessage<T> object) {
        this.internalProduceSync(topicName, key, object);
    }

    @Override
    public void produceSync(String topicName, String key, Message object) {
        this.internalProduceSync(topicName, key, object);
    }

    protected void internalProduceSync(String topicName, String key, Object object) {
        try {
            this.producer.send(this.getFreshProducerRecord(topicName, key, object)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }

    private ProducerRecord<String, T> getFreshProducerRecord(String topicName, String key, Object object) {
        return new ProducerRecord(topicName, (Object)key, (Object)SerializationUtil.serialize(object));
    }
}

