package org.kie.kogito.serverless.workflow.executor;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.kie.kogito.event.CloudEventUnmarshaller;
import org.kie.kogito.event.CloudEventUnmarshallerFactory;
import org.kie.kogito.event.Converter;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.Subscription;
import org.kie.kogito.event.impl.CloudEventConverter;
import org.kie.kogito.event.impl.JacksonCloudEventDataConverter;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/serverless/workflow/executor/KafkaEventReceiver.class */
public class KafkaEventReceiver implements EventReceiver {
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventReceiver.class);
    private final Collection<Subscription<?, CloudEvent>> subscriptions = new ArrayList();

    public void onEvent(CloudEvent cloudEvent) {
        for (Subscription<?, CloudEvent> subscription : this.subscriptions) {
            try {
                subscription.getConsumer().apply(subscription.getConverter().convert(cloudEvent));
            } catch (IOException e) {
                logger.info("Problem deserializing event {}", cloudEvent, e);
            }
        }
    }

    public <T> void subscribe(Function<DataEvent<T>, CompletionStage<?>> function, Class<T> cls) {
        this.subscriptions.add(new Subscription<>(function, new CloudEventConverter(cls, new CloudEventUnmarshallerFactory<CloudEvent>() { // from class: org.kie.kogito.serverless.workflow.executor.KafkaEventReceiver.1
            public <S> CloudEventUnmarshaller<CloudEvent, S> unmarshaller(final Class<S> cls2) {
                return new CloudEventUnmarshaller<CloudEvent, S>() { // from class: org.kie.kogito.serverless.workflow.executor.KafkaEventReceiver.1.1
                    public Converter<CloudEvent, CloudEvent> cloudEvent() {
                        return cloudEvent -> {
                            return cloudEvent;
                        };
                    }

                    public Converter<CloudEvent, CloudEventData> binaryCloudEvent() {
                        return cloudEvent -> {
                            return cloudEvent.getData();
                        };
                    }

                    public Converter<CloudEventData, S> data() {
                        return new JacksonCloudEventDataConverter(ObjectMapperFactory.listenerAware(), cls2);
                    }
                };
            }
        })));
    }
}
