package org.kie.kogito.addon.cloudevents.spring;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.kie.kogito.event.KogitoEventStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;

@Component
/* loaded from: input_file:BOOT-INF/lib/kogito-cloudevents-spring-boot-addon-1.7.0.Final.jar:org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventPublisher.class */
public class SpringKafkaCloudEventPublisher {
    private static final Logger log = LoggerFactory.getLogger(SpringKafkaCloudEventPublisher.class.getName());
    private final ReceiverOptions<Integer, String> receiverOptions;
    private final SimpleDateFormat dateFormat;
    private final String topic;

    public SpringKafkaCloudEventPublisher(@Value("${spring.kafka.bootstrap-servers}") String str, @Value("${spring.kafka.consumer.group-id}") String str2, @Value("${kogito.addon.cloudevents.kafka.kogito_incoming_stream:kogito_incoming_stream}") String str3) {
        this.topic = str3;
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", str);
        hashMap.put("group.id", str2);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        this.receiverOptions = ReceiverOptions.create(hashMap);
        this.dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
    }

    @Scope("singleton")
    @Bean
    @Qualifier(KogitoEventStreams.PUBLISHER)
    public Flux<String> makeConsumer() {
        return KafkaReceiver.create(this.receiverOptions.subscription(Collections.singleton(this.topic)).addAssignListener(collection -> {
            log.debug("onPartitionsAssigned {}", collection);
        }).addRevokeListener(collection2 -> {
            log.debug("onPartitionsRevoked {}", collection2);
        })).receive().map(receiverRecord -> {
            ReceiverOffset receiverOffset = receiverRecord.receiverOffset();
            log.info("Received message: topic-partition={} offset={} timestamp={} key={} value={}\n", receiverOffset.topicPartition(), Long.valueOf(receiverOffset.offset()), this.dateFormat.format(new Date(receiverRecord.timestamp())), receiverRecord.key(), receiverRecord.value());
            receiverOffset.acknowledge();
            return (String) receiverRecord.value();
        }).publish().autoConnect();
    }
}
