package org.kie.kogito.services.event.impl;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.apache.commons.lang3.StringUtils;
import org.kie.kogito.Model;
import org.kie.kogito.correlation.CorrelationResolver;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.services.event.correlation.SimpleAttributeCorrelationResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-services-1.22.0.Final.jar:org/kie/kogito/services/event/impl/ProcessEventDispatcher.class */
public class ProcessEventDispatcher<M extends Model> implements EventDispatcher<M> {
    private CorrelationResolver kogitoReferenceCorrelationResolver = new SimpleAttributeCorrelationResolver(CloudEventExtensionConstants.PROCESS_REFERENCE_ID);
    private CorrelationResolver eventTypeResolver = new SimpleAttributeCorrelationResolver("type");
    private CorrelationResolver eventSourceResolver = new SimpleAttributeCorrelationResolver("source");
    private CorrelationResolver businessKeyResolver = new SimpleAttributeCorrelationResolver(CloudEventExtensionConstants.BUSINESS_KEY);
    private CorrelationResolver nodeIdResolver = new SimpleAttributeCorrelationResolver(CloudEventExtensionConstants.PROCESS_START_FROM_NODE);
    private CorrelationResolver referenceIdResolver = new SimpleAttributeCorrelationResolver(CloudEventExtensionConstants.PROCESS_INSTANCE_ID);
    private UnaryOperator<Object> dataResolver;
    private ProcessService processService;
    private Function<Object, M> modelConverter;
    private Process<M> process;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProcessEventDispatcher.class);
    private ExecutorService executor;

    public ProcessEventDispatcher(Process<M> process, Function<Object, M> function, ProcessService processService, ExecutorService executorService, UnaryOperator<Object> unaryOperator) {
        this.process = process;
        this.modelConverter = function;
        this.processService = processService;
        this.executor = executorService;
        this.dataResolver = unaryOperator;
    }

    @Override // org.kie.kogito.event.EventDispatcher
    public CompletableFuture<ProcessInstance<M>> dispatch(String str, Object obj) {
        if (shouldSkipMessage(str, obj)) {
            LOGGER.info("Ignoring message for trigger {} in process {}. Skipping consumed message {}", str, this.process.id(), obj);
            return CompletableFuture.completedFuture(null);
        }
        String asString = this.kogitoReferenceCorrelationResolver.resolve(obj).asString();
        if (StringUtils.isNotEmpty(asString)) {
            return CompletableFuture.supplyAsync(() -> {
                return handleMessageWithReference(str, obj, asString);
            }, this.executor);
        }
        if (this.modelConverter != null) {
            return CompletableFuture.supplyAsync(() -> {
                return startNewInstance(str, obj);
            }, this.executor);
        }
        LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", str, this.process.id(), obj);
        return CompletableFuture.completedFuture(null);
    }

    private ProcessInstance<M> handleMessageWithReference(String str, Object obj, String str2) {
        LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", str2, str);
        return (ProcessInstance) this.process.instances().findById(str2).map(processInstance -> {
            signalProcessInstance(str, processInstance.id(), obj);
            return processInstance;
        }).orElseGet(() -> {
            LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", str2, str);
            if (this.modelConverter != null) {
                return startNewInstance(str, obj);
            }
            return null;
        });
    }

    private Optional<M> signalProcessInstance(String str, String str2, Object obj) {
        return this.processService.signalProcessInstance(this.process, str2, this.dataResolver.apply(obj), "Message-" + str);
    }

    private ProcessInstance<M> startNewInstance(String str, Object obj) {
        LOGGER.info("Starting new process instance with signal '{}'", str);
        return this.processService.createProcessInstance(this.process, this.businessKeyResolver.resolve(obj).asString(), this.modelConverter.apply(this.dataResolver.apply(obj)), this.nodeIdResolver.resolve(obj).asString(), str, this.referenceIdResolver.resolve(obj).asString());
    }

    private boolean shouldSkipMessage(String str, Object obj) {
        String asString = this.eventTypeResolver.resolve(obj).asString();
        String asString2 = this.eventSourceResolver.resolve(obj).asString();
        return (Objects.nonNull(asString) && !Objects.equals(str, asString)) && (Objects.nonNull(asString2) && !Objects.equals(obj.getClass().getSimpleName(), asString2) && !Objects.equals(str, asString2));
    }
}
