package org.kie.kogito.services.event.impl;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventUnmarshaller;
import org.kie.kogito.event.SubscriptionInfo;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.services.event.EventConsumer;
import org.kie.kogito.services.event.EventConsumerFactory;
import org.kie.kogito.services.event.ProcessDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-services-1.19.0.Final.jar:org/kie/kogito/services/event/impl/AbstractMessageConsumer.class */
public abstract class AbstractMessageConsumer<M extends Model, D> {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) AbstractMessageConsumer.class);
    private Process<M> process;
    private Application application;
    private String trigger;
    private EventConsumer<M> eventConsumer;

    public AbstractMessageConsumer() {
    }

    public AbstractMessageConsumer(Application application, Process<M> process, String str, EventConsumerFactory eventConsumerFactory, EventReceiver eventReceiver, Class<D> cls, boolean z, ProcessService processService, ExecutorService executorService, EventUnmarshaller<Object> eventUnmarshaller) {
        init(application, process, str, eventConsumerFactory, eventReceiver, cls, z, processService, executorService, eventUnmarshaller);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void init(Application application, Process<M> process, String str, EventConsumerFactory eventConsumerFactory, EventReceiver eventReceiver, Class<D> cls, boolean z, ProcessService processService, ExecutorService executorService, EventUnmarshaller<Object> eventUnmarshaller) {
        this.process = process;
        this.application = application;
        this.trigger = str;
        this.eventConsumer = eventConsumerFactory.get(processService, executorService, getModelConverter(), z, this::getData);
        if (z) {
            eventReceiver.subscribe(this::consumeCloud, SubscriptionInfo.builder().converter(eventUnmarshaller).outputClass(ProcessDataEvent.class).parametrizedClasses(cls).type(str).createSubscriptionInfo());
        } else {
            eventReceiver.subscribe(this::consumeNotCloud, SubscriptionInfo.builder().converter(eventUnmarshaller).outputClass(cls).type(str).createSubscriptionInfo());
        }
        logger.info("Consumer for {} started", str);
    }

    protected CompletionStage<?> consumeCloud(ProcessDataEvent<D> processDataEvent) {
        return consume(processDataEvent);
    }

    protected CompletionStage<?> consumeNotCloud(D d) {
        return consume(d);
    }

    private CompletionStage<?> consume(Object obj) {
        logger.trace("Received {} for trigger {}", obj, this.trigger);
        CompletionStage<?> consume = this.eventConsumer.consume(this.application, this.process, obj, this.trigger);
        if (logger.isTraceEnabled()) {
            consume = consume.thenAccept(obj2 -> {
                logger.trace("Completed {} for trigger {}", obj, this.trigger);
            });
        }
        logger.trace("Dispatched {} for trigger {}", obj, this.trigger);
        return consume;
    }

    protected Optional<Function<D, M>> getModelConverter() {
        return Optional.empty();
    }

    protected D getData(ProcessDataEvent<D> processDataEvent) {
        return processDataEvent.getData();
    }
}
