package org.kie.kogito.addon.cloudevents.spring;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import org.kie.kogito.addon.cloudevents.Subscription;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.SubscriptionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:BOOT-INF/lib/kogito-addons-springboot-messaging-1.11.0-SNAPSHOT.jar:org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.class */
public class SpringKafkaCloudEventReceiver implements EventReceiver {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SpringKafkaCloudEventReceiver.class);
    private Collection<Subscription<Object>> consumers;

    @PostConstruct
    private void init() {
        this.consumers = new CopyOnWriteArrayList();
    }

    @Override // org.kie.kogito.event.EventReceiver
    public <T> void subscribe(Function<T, CompletionStage<?>> function, SubscriptionInfo<String, T> subscriptionInfo) {
        log.info("Registering consumer with info {}", subscriptionInfo);
        this.consumers.add(new Subscription<>(function, subscriptionInfo));
    }

    @KafkaListener(topics = {"${kogito.addon.cloudevents.kafka.kogito_incoming_stream:kogito_incoming_stream}"})
    public void receive(@Payload Collection<String> collection) throws InterruptedException {
        log.debug("Received {} events", Integer.valueOf(collection.size()));
        ArrayList arrayList = new ArrayList();
        for (String str : collection) {
            for (Subscription<Object> subscription : this.consumers) {
                try {
                    arrayList.add(subscription.getConsumer().apply(subscription.getInfo().getConverter().apply(str, subscription.getInfo().getOutputClass())));
                } catch (IOException e) {
                    log.info("Cannot convert to {} from {}, ignoring type {}, exception message is {}", subscription.getInfo().getOutputClass(), str, subscription.getInfo().getType(), e.getMessage());
                }
            }
        }
        log.debug("Waiting for all operations in batch to complete");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((CompletionStage) it.next()).toCompletableFuture().get();
            } catch (ExecutionException e2) {
                log.error("Error executing consumer", e2.getCause());
            }
        }
        log.debug("All operations in batch completed");
    }
}
