package org.jboss.pnc.buildagent.server;

import java.time.Duration;
import java.util.Properties;
import java.util.Random;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/jboss/pnc/buildagent/server/KafkaQueueAdapter.class */
public class KafkaQueueAdapter implements QueueAdapter {
    private static final String DEFAULT_KEY = String.valueOf(new Random().nextInt(8192));
    private static final Logger log = LoggerFactory.getLogger(KafkaQueueAdapter.class);
    private final KafkaProducer kafkaProducer;
    private final String queueTopic;

    public KafkaQueueAdapter(Properties properties, String str) {
        this.queueTopic = str;
        this.kafkaProducer = new KafkaProducer(properties);
    }

    @Override // org.jboss.pnc.buildagent.server.QueueAdapter
    public void flush() {
        this.kafkaProducer.flush();
    }

    @Override // org.jboss.pnc.buildagent.server.QueueAdapter
    public void send(String str, Consumer<Exception> consumer) {
        String str2 = MDC.get("processContext");
        if (str2 == null || str2.isEmpty()) {
            str2 = DEFAULT_KEY;
        }
        this.kafkaProducer.send(new ProducerRecord(this.queueTopic, str2, str), (recordMetadata, exc) -> {
            if (exc != null) {
                consumer.accept(exc);
            } else if (log.isTraceEnabled()) {
                log.trace("Message sent to Kafka. Partition:{}, timestamp {}.", Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.timestamp()));
            }
        });
    }

    @Override // org.jboss.pnc.buildagent.server.QueueAdapter
    public void close(Duration duration) {
        this.kafkaProducer.close(duration);
    }

    @Override // org.jboss.pnc.buildagent.server.QueueAdapter
    public void close() {
        this.kafkaProducer.close();
    }
}
