package org.infinispan.cloudevents.impl;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.infinispan.cloudevents.configuration.CloudEventsGlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;

@Scope(Scopes.GLOBAL)
/* loaded from: input_file:org/infinispan/cloudevents/impl/KafkaEventSenderImpl.class */
public class KafkaEventSenderImpl implements KafkaEventSender {

    @Inject
    GlobalConfiguration globalConfiguration;
    private KafkaProducer<byte[], byte[]> producer;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Start
    public void start() {
        Properties properties = new Properties();
        CloudEventsGlobalConfiguration cloudEventsGlobalConfiguration = (CloudEventsGlobalConfiguration) this.globalConfiguration.module(CloudEventsGlobalConfiguration.class);
        properties.put("bootstrap.servers", cloudEventsGlobalConfiguration.bootstrapServers());
        properties.put("acks", String.valueOf(cloudEventsGlobalConfiguration.acks()));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producer = new KafkaProducer<>(properties);
        connect(cloudEventsGlobalConfiguration);
    }

    private void connect(CloudEventsGlobalConfiguration cloudEventsGlobalConfiguration) {
        if (cloudEventsGlobalConfiguration.cacheEntryEventsEnabled()) {
            this.producer.partitionsFor(cloudEventsGlobalConfiguration.cacheEntriesTopic());
        }
        if (cloudEventsGlobalConfiguration.auditEventsEnabled()) {
            this.producer.partitionsFor(cloudEventsGlobalConfiguration.auditTopic());
        }
    }

    @Override // org.infinispan.cloudevents.impl.KafkaEventSender
    public CompletionStage<Void> send(ProducerRecord<byte[], byte[]> producerRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.producer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc != null) {
                completableFuture.completeExceptionally(exc);
            } else {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }
}
