package org.jboss.pnc.buildagent.server;

import ch.qos.logback.core.status.Status;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jboss.pnc.buildagent.server.formatter.LogbackFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/jboss/pnc/buildagent/server/IoKafkaLogger.class */
public class IoKafkaLogger implements ReadOnlyChannel {
    private final String queueTopic;
    private final KafkaProducer kafkaProducer;
    private Consumer<byte[]> outputLogger;
    private final boolean primary;
    private long flushTimeoutMillis;
    static final Logger processLog = LoggerFactory.getLogger("org.jboss.pnc._userlog_.build-log");
    private static final Logger log = LoggerFactory.getLogger((Class<?>) IoKafkaLogger.class);
    List<Status> loggerErrors = new ArrayList();
    private Charset charset = Charset.defaultCharset();
    private final AtomicReference<Exception> deliveryException = new AtomicReference<>();

    public IoKafkaLogger(Properties properties, String str, boolean z, long j, Map<String, String> map) {
        this.queueTopic = str;
        this.primary = z;
        this.flushTimeoutMillis = j;
        this.kafkaProducer = new KafkaProducer(properties);
        LogbackFormatter logbackFormatter = new LogbackFormatter();
        Consumer consumer = exc -> {
            log.error("Error writing log.", (Throwable) exc);
            this.deliveryException.compareAndSet(null, exc);
        };
        this.outputLogger = bArr -> {
            MDC.setContextMap(map);
            send(logbackFormatter.format(new String(bArr, this.charset)), (Consumer<Exception>) consumer);
        };
    }

    @Override // org.jboss.pnc.buildagent.server.ReadOnlyChannel
    public void flush() throws IOException {
        Exception exc = this.deliveryException.get();
        if (exc != null) {
            throw new IOException("Some messages were not written.", exc);
        }
        Future<?> submit = Executors.newFixedThreadPool(1).submit(() -> {
            this.kafkaProducer.flush();
        });
        try {
            submit.get(this.flushTimeoutMillis, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            submit.cancel(true);
            throw new IOException("Unable to flush logs.", e);
        }
    }

    @Override // org.jboss.pnc.buildagent.server.ReadOnlyChannel
    public void writeOutput(byte[] bArr) {
        this.outputLogger.accept(bArr);
    }

    @Override // org.jboss.pnc.buildagent.server.ReadOnlyChannel
    public boolean isPrimary() {
        return this.primary;
    }

    private void send(String str, Consumer<Exception> consumer) {
        this.kafkaProducer.send(new ProducerRecord(this.queueTopic, str), (recordMetadata, exc) -> {
            if (exc != null) {
                consumer.accept(exc);
            } else {
                log.trace("Message sent to Kafka. Partition:{}, timestamp {}.", Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.timestamp()));
            }
        });
    }

    private void send(String str, long j) throws TimeoutException, ExecutionException, InterruptedException {
        this.kafkaProducer.send(new ProducerRecord(this.queueTopic, str)).get(j, TimeUnit.MILLISECONDS);
    }

    public void close(long j, TimeUnit timeUnit) throws IOException {
        this.kafkaProducer.close(j, timeUnit);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        log.info("Closing IoKafkaLogger.");
        this.kafkaProducer.close();
    }
}
