package org.kie.kogito.event.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Optional;
import java.util.function.Function;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.impl.Sig;
import org.kie.kogito.services.event.AbstractProcessDataEvent;
import org.kie.kogito.services.uow.UnitOfWorkExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/event/impl/CloudEventConsumer.class */
public class CloudEventConsumer<D, M extends Model, T extends AbstractProcessDataEvent<D>> extends JacksonEventConsumer<M> {
    private static final Logger logger = LoggerFactory.getLogger(CloudEventConsumer.class);
    private Function<D, M> function;
    private Class<T> cloudEventClass;

    public CloudEventConsumer(Function<D, M> function, Class<T> cls, ObjectMapper objectMapper) {
        super(objectMapper);
        this.function = function;
        this.cloudEventClass = cls;
    }

    public void consume(Application application, Process<M> process, String str, String str2) {
        try {
            AbstractProcessDataEvent abstractProcessDataEvent = (AbstractProcessDataEvent) this.mapper.readValue(str, this.cloudEventClass);
            Model model = (Model) this.function.apply(abstractProcessDataEvent.getData());
            UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> {
                if (abstractProcessDataEvent.getKogitoReferenceId() != null) {
                    logger.debug("Received message with reference id '{}' going to use it to send signal '{}'", abstractProcessDataEvent.getKogitoReferenceId(), str2);
                    Optional findById = process.instances().findById(abstractProcessDataEvent.getKogitoReferenceId());
                    if (findById.isPresent()) {
                        ((ProcessInstance) findById.get()).send(Sig.of("Message-" + str2, abstractProcessDataEvent.getData(), abstractProcessDataEvent.getKogitoProcessinstanceId()));
                        return null;
                    }
                    logger.warn("Process instance with id '{}' not found for triggering signal '{}'", abstractProcessDataEvent.getKogitoReferenceId(), str2);
                    return null;
                }
                logger.debug("Received message without reference id, staring new process instance with trigger '{}'", str2);
                ProcessInstance createInstance = process.createInstance(model);
                if (abstractProcessDataEvent.getKogitoStartFromNode() != null) {
                    createInstance.startFrom(abstractProcessDataEvent.getKogitoStartFromNode(), abstractProcessDataEvent.getKogitoProcessinstanceId());
                    return null;
                }
                createInstance.start(str2, abstractProcessDataEvent.getKogitoProcessinstanceId());
                return null;
            });
        } catch (JsonProcessingException e) {
            logger.error("Error when consuming message for process {}", process.id(), e);
        }
    }
}
