package org.kie.kogito.event.impl;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.kie.kogito.Model;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-events-core-1.32.0-SNAPSHOT.jar:org/kie/kogito/event/impl/ProcessEventDispatcher.class */
public class ProcessEventDispatcher<M extends Model, D> implements EventDispatcher<M, D> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProcessEventDispatcher.class);
    private final Set<String> correlationKeys;
    private final ProcessService processService;
    private final Optional<Function<D, M>> modelConverter;
    private final Process<M> process;
    private final ExecutorService executor;
    private final Function<DataEvent<D>, D> dataResolver;

    public ProcessEventDispatcher(Process<M> process, Optional<Function<D, M>> optional, ProcessService processService, ExecutorService executorService, Set<String> set, Function<DataEvent<D>, D> function) {
        this.process = process;
        this.modelConverter = optional;
        this.processService = processService;
        this.executor = executorService;
        this.correlationKeys = set;
        this.dataResolver = function;
    }

    @Override // org.kie.kogito.event.EventDispatcher
    public CompletableFuture<ProcessInstance<M>> dispatch(String str, DataEvent<D> dataEvent) {
        if (shouldSkipMessage(str, dataEvent)) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Ignoring message for trigger {} in process {}. Skipping consumed message {}", str, this.process.id(), dataEvent);
            }
            return CompletableFuture.completedFuture(null);
        }
        String resolveCorrelationId = resolveCorrelationId(dataEvent);
        if (StringUtils.isNotEmpty(resolveCorrelationId)) {
            return CompletableFuture.supplyAsync(() -> {
                return handleMessageWithReference(str, dataEvent, resolveCorrelationId);
            }, this.executor);
        }
        if (this.modelConverter.isPresent()) {
            return CompletableFuture.supplyAsync(() -> {
                return startNewInstance(str, dataEvent);
            }, this.executor);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", str, this.process.id(), dataEvent);
        }
        return CompletableFuture.completedFuture(null);
    }

    private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> dataEvent) {
        return (this.correlationKeys == null || this.correlationKeys.isEmpty()) ? Optional.empty() : Optional.of(new CompositeCorrelation((Set) this.correlationKeys.stream().map(str -> {
            return new SimpleCorrelation(str, resolve(dataEvent, str));
        }).collect(Collectors.toSet())));
    }

    private String resolveCorrelationId(DataEvent<?> dataEvent) {
        Optional<CompositeCorrelation> compositeCorrelation = compositeCorrelation(dataEvent);
        CorrelationService correlations = this.process.correlations();
        Objects.requireNonNull(correlations);
        Optional map = compositeCorrelation.flatMap((v1) -> {
            return r1.find(v1);
        }).map((v0) -> {
            return v0.getCorrelatedId();
        });
        Objects.requireNonNull(dataEvent);
        return (String) map.orElseGet(dataEvent::getKogitoReferenceId);
    }

    private Object resolve(DataEvent<?> dataEvent, String str) {
        if (dataEvent.getAttributeNames().contains(str)) {
            return dataEvent.getAttribute(str);
        }
        if (dataEvent.getExtensionNames().contains(str)) {
            return dataEvent.getExtension(str);
        }
        LOGGER.warn("Correlation key {} not found for event {}", str, dataEvent);
        return null;
    }

    private ProcessInstance<M> handleMessageWithReference(String str, DataEvent<D> dataEvent, String str2) {
        LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", str2, str);
        return (ProcessInstance) this.process.instances().findById(str2).map(processInstance -> {
            signalProcessInstance(str, processInstance.id(), dataEvent);
            return processInstance;
        }).orElseGet(() -> {
            LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", str2, str);
            return startNewInstance(str, dataEvent);
        });
    }

    private Optional<M> signalProcessInstance(String str, String str2, DataEvent<D> dataEvent) {
        return this.processService.signalProcessInstance(this.process, str2, this.dataResolver.apply(dataEvent), "Message-" + str);
    }

    private ProcessInstance<M> startNewInstance(String str, DataEvent<D> dataEvent) {
        return (ProcessInstance) this.modelConverter.map(function -> {
            LOGGER.info("Starting new process instance with signal '{}'", str);
            return this.processService.createProcessInstance(this.process, dataEvent.getKogitoBusinessKey(), (Model) function.apply(this.dataResolver.apply(dataEvent)), dataEvent.getKogitoStartFromNode(), str, dataEvent.getKogitoProcessInstanceId(), compositeCorrelation(dataEvent).orElse(null));
        }).orElse(null);
    }

    private boolean isEventTypeNotMatched(String str, DataEvent<?> dataEvent) {
        String type = dataEvent.getType();
        return (type == null || Objects.equals(str, type)) ? false : true;
    }

    private boolean isSourceNotMatched(String str, DataEvent<?> dataEvent) {
        String uri = dataEvent.getSource() == null ? null : dataEvent.getSource().toString();
        return (uri == null || Objects.equals(dataEvent.getClass().getSimpleName(), uri) || Objects.equals(str, uri)) ? false : true;
    }

    private boolean shouldSkipMessage(String str, DataEvent<?> dataEvent) {
        return isEventTypeNotMatched(str, dataEvent) && isSourceNotMatched(str, dataEvent);
    }
}
