/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.addon.cloudevents.spring;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import org.kie.kogito.addon.cloudevents.Subscription;
import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.event.CloudEventUnmarshallerFactory;
import org.kie.kogito.event.Converter;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventUnmarshaller;
import org.kie.kogito.event.impl.CloudEventConverter;
import org.kie.kogito.event.impl.DataEventConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class SpringKafkaCloudEventReceiver
implements EventReceiver {
    private static final Logger log = LoggerFactory.getLogger(SpringKafkaCloudEventReceiver.class);
    private Collection<Subscription<Object, String>> consumers;
    @Autowired
    EventUnmarshaller<Object> eventDataUnmarshaller;
    @Autowired
    CloudEventUnmarshallerFactory<Object> cloudEventUnmarshaller;
    @Autowired
    ConfigBean configBean;

    @PostConstruct
    private void init() {
        this.consumers = new CopyOnWriteArrayList<Subscription<Object, String>>();
    }

    public <T> void subscribe(Function<DataEvent<T>, CompletionStage<?>> consumer, Class<T> clazz) {
        this.consumers.add((Subscription<Object, String>)new Subscription(consumer, (Converter)(this.configBean.useCloudEvents() ? new CloudEventConverter(clazz, this.cloudEventUnmarshaller) : new DataEventConverter(clazz, this.eventDataUnmarshaller))));
    }

    @KafkaListener(topics={"${kogito.addon.cloudevents.kafka.kogito_incoming_stream:kogito_incoming_stream}"})
    public void receive(@Payload Collection<String> messages) throws InterruptedException {
        log.debug("Received {} events", (Object)messages.size());
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        for (String message : messages) {
            for (Subscription<Object, String> consumer : this.consumers) {
                try {
                    futures.add((CompletionStage)consumer.getConsumer().apply(consumer.getConverter().convert((Object)message)));
                }
                catch (IOException e) {
                    log.info("Cannot convert event to the proper type {}", (Object)e.getMessage());
                }
            }
        }
        log.debug("Waiting for all operations in batch to complete");
        for (CompletionStage future : futures) {
            try {
                future.toCompletableFuture().get();
            }
            catch (ExecutionException ex) {
                log.error("Error executing consumer", ex.getCause());
            }
        }
        log.debug("All operations in batch completed");
    }
}

