/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.cloudevents.impl;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.infinispan.cloudevents.configuration.CloudEventsGlobalConfiguration;
import org.infinispan.cloudevents.impl.KafkaEventSender;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;

@Scope(value=Scopes.GLOBAL)
public class KafkaEventSenderImpl
implements KafkaEventSender {
    @Inject
    GlobalConfiguration globalConfiguration;
    private KafkaProducer<byte[], byte[]> producer;

    @Start
    void start() {
        Properties kafkaProperties = new Properties();
        CloudEventsGlobalConfiguration cloudEventsGlobalConfiguration = (CloudEventsGlobalConfiguration)this.globalConfiguration.module(CloudEventsGlobalConfiguration.class);
        kafkaProperties.put("bootstrap.servers", cloudEventsGlobalConfiguration.bootstrapServers());
        kafkaProperties.put("acks", String.valueOf(cloudEventsGlobalConfiguration.acks()));
        kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producer = new KafkaProducer(kafkaProperties);
        this.connect(cloudEventsGlobalConfiguration);
    }

    private void connect(CloudEventsGlobalConfiguration cloudEventsGlobalConfiguration) {
        if (cloudEventsGlobalConfiguration.cacheEntryEventsEnabled()) {
            this.producer.partitionsFor(cloudEventsGlobalConfiguration.cacheEntriesTopic());
        }
        if (cloudEventsGlobalConfiguration.auditEventsEnabled()) {
            this.producer.partitionsFor(cloudEventsGlobalConfiguration.auditTopic());
        }
    }

    @Override
    public CompletionStage<Void> send(ProducerRecord<byte[], byte[]> record) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        this.producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                cf.completeExceptionally(exception);
            } else {
                cf.complete(null);
            }
        });
        return cf;
    }
}

