package org.kie.kogito.event.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.services.event.AbstractProcessDataEvent;
import org.kie.kogito.services.event.EventConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/jbpm-flow-1.14.1.Final.jar:org/kie/kogito/event/impl/CloudEventConsumer.class */
public class CloudEventConsumer<D, M extends Model, T extends AbstractProcessDataEvent<D>> implements EventConsumer<M> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CloudEventConsumer.class);
    private Function<D, M> function;
    private ProcessService processService;
    private ExecutorService executor;

    public CloudEventConsumer(ProcessService processService, ExecutorService executorService, Function<D, M> function) {
        this.processService = processService;
        this.executor = executorService;
        this.function = function;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.kie.kogito.services.event.EventConsumer
    public CompletionStage<?> consume(Application application, Process<M> process, Object obj, String str) {
        AbstractProcessDataEvent abstractProcessDataEvent = (AbstractProcessDataEvent) obj;
        Model model = (Model) this.function.apply(abstractProcessDataEvent.getData());
        String simpleName = abstractProcessDataEvent.getClass().getSimpleName();
        if (ignoredMessageType(abstractProcessDataEvent, simpleName) && ignoredMessageType(abstractProcessDataEvent, str)) {
            logger.warn("Consumer for CloudEvent type '{}', trigger '{}': ignoring message with type '{}',  source '{}'", simpleName, str, abstractProcessDataEvent.getType(), abstractProcessDataEvent.getSource());
            return CompletableFuture.completedFuture(null);
        }
        if (abstractProcessDataEvent.getKogitoReferenceId() == null || abstractProcessDataEvent.getKogitoReferenceId().isEmpty()) {
            logger.debug("Received message without reference id, starting new process instance with trigger '{}'", str);
            return startNewInstance(process, model, abstractProcessDataEvent, str);
        }
        logger.debug("Received message with reference id '{}' going to use it to send signal '{}'", abstractProcessDataEvent.getKogitoReferenceId(), str);
        if (process.instances().findById(abstractProcessDataEvent.getKogitoReferenceId()).isPresent()) {
            return CompletableFuture.completedFuture(this.processService.signalProcessInstance(process, abstractProcessDataEvent.getKogitoReferenceId(), abstractProcessDataEvent.getData(), "Message-" + str));
        }
        logger.warn("Process instance with id '{}' not found for triggering signal '{}', starting a new one", abstractProcessDataEvent.getKogitoReferenceId(), str);
        return startNewInstance(process, model, abstractProcessDataEvent, str);
    }

    private CompletionStage<Void> startNewInstance(Process<M> process, M m, T t, String str) {
        return CompletableFuture.runAsync(() -> {
            this.processService.createProcessInstance(process, m, t.getKogitoStartFromNode(), str, t.getKogitoProcessinstanceId());
        }, this.executor);
    }

    private boolean ignoredMessageType(T t, String str) {
        return (str.equals(t.getType()) || str.equals(t.getSource())) ? false : true;
    }
}
