package io.apicurio.registry.events.kafka;

import io.apicurio.datamodels.core.Constants;
import io.apicurio.registry.events.EventSink;
import io.apicurio.registry.utils.RegistryProperties;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ProducerActions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import java.time.Instant;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;

@ApplicationScoped
/* loaded from: input_file:io/apicurio/registry/events/kafka/KafkaEventSink.class */
public class KafkaEventSink implements EventSink {

    @Inject
    Logger log;

    @Inject
    @RegistryProperties(value = {"registry.events.kafka.config"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties producerProperties;
    private ProducerActions<String, byte[]> producer;
    private Integer partition;

    @ConfigProperty(name = "registry.events.kafka.topic")
    Optional<String> eventsTopic;

    @ConfigProperty(name = "registry.events.kafka.topic-partition")
    Optional<Integer> eventsTopicPartition;

    @PostConstruct
    void init() {
        this.partition = this.eventsTopicPartition.orElse(null);
    }

    @Override // io.apicurio.registry.events.EventSink
    public String name() {
        return "Kafka Sink";
    }

    @Override // io.apicurio.registry.events.EventSink
    public boolean isConfigured() {
        return this.eventsTopic.isPresent();
    }

    @Override // io.apicurio.registry.events.EventSink
    public void handle(Message<Buffer> message) {
        String str = message.headers().get(Constants.PROP_TYPE);
        String str2 = message.headers().get("artifactId");
        this.log.info("Firing event " + str);
        UUID randomUUID = UUID.randomUUID();
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("ce_id", randomUUID.toString().getBytes());
        recordHeaders.add("ce_specversion", "1.0".getBytes());
        recordHeaders.add("ce_source", "apicurio-registry".getBytes());
        recordHeaders.add("ce_type", str.getBytes());
        recordHeaders.add("ce_time", Instant.now().toString().getBytes());
        recordHeaders.add("content-type", "application/json".getBytes());
        String str3 = str2;
        if (str3 == null) {
            str3 = randomUUID.toString();
        }
        getProducer().apply(new ProducerRecord(this.eventsTopic.get(), this.partition, str3, ((Buffer) message.body()).getBytes(), recordHeaders));
    }

    public synchronized ProducerActions<String, byte[]> getProducer() {
        if (this.producer == null) {
            this.producer = new AsyncProducer(this.producerProperties, Serdes.String().serializer(), Serdes.ByteArray().serializer());
        }
        return this.producer;
    }
}
