/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.addon.cloudevents.spring;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import org.kie.kogito.addon.cloudevents.Subscription;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.SubscriptionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class SpringKafkaCloudEventReceiver
implements EventReceiver {
    private static final Logger log = LoggerFactory.getLogger(SpringKafkaCloudEventReceiver.class);
    private Collection<Subscription<Object>> consumers;

    @PostConstruct
    private void init() {
        this.consumers = new CopyOnWriteArrayList<Subscription<Object>>();
    }

    public <T> void subscribe(Function<T, CompletionStage<?>> consumer, SubscriptionInfo<Object, T> info) {
        log.info("Registering consumer with info {}", info);
        this.consumers.add((Subscription<Object>)new Subscription(consumer, info));
    }

    @KafkaListener(topics={"${kogito.addon.cloudevents.kafka.kogito_incoming_stream:kogito_incoming_stream}"})
    public void receive(@Payload Collection<String> messages) throws InterruptedException {
        log.debug("Received {} events", (Object)messages.size());
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        for (String message : messages) {
            for (Subscription<Object> consumer : this.consumers) {
                try {
                    futures.add((CompletionStage)consumer.getConsumer().apply(consumer.getInfo().getConverter().unmarshall((Object)message, consumer.getInfo().getOutputClass(), consumer.getInfo().getParametrizedClasses())));
                }
                catch (IOException e) {
                    log.info("Cannot convert to {} from {}, ignoring type {}, exception message is {}", new Object[]{consumer.getInfo().getOutputClass(), message, consumer.getInfo().getType(), e.getMessage()});
                }
            }
        }
        log.debug("Waiting for all operations in batch to complete");
        for (CompletionStage future : futures) {
            try {
                future.toCompletableFuture().get();
            }
            catch (ExecutionException ex) {
                log.error("Error executing consumer", ex.getCause());
            }
        }
        log.debug("All operations in batch completed");
    }
}

