package org.kie.kogito.services.event.impl;

import java.util.Optional;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.InputTriggerAware;
import org.kie.kogito.event.SubscriptionInfo;
import org.kie.kogito.process.Process;
import org.kie.kogito.services.event.AbstractProcessDataEvent;
import org.kie.kogito.services.event.EventConsumer;
import org.kie.kogito.services.event.EventConsumerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-services-1.9.1-SNAPSHOT.jar:org/kie/kogito/services/event/impl/AbstractMessageConsumer.class */
public abstract class AbstractMessageConsumer<M extends Model, D, T extends AbstractProcessDataEvent<D>> implements InputTriggerAware {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AbstractMessageConsumer.class);
    private Process<M> process;
    private Application application;
    private String trigger;
    private EventConsumer<M> eventConsumer;

    public AbstractMessageConsumer() {
    }

    public AbstractMessageConsumer(Application application, Process<M> process, String str, EventConsumerFactory eventConsumerFactory, EventReceiver eventReceiver, Class<D> cls, Class<T> cls2, boolean z) {
        init(application, process, str, eventConsumerFactory, eventReceiver, cls, cls2, z);
    }

    public void init(Application application, Process<M> process, String str, EventConsumerFactory eventConsumerFactory, EventReceiver eventReceiver, Class<D> cls, Class<T> cls2, boolean z) {
        this.process = process;
        this.application = application;
        this.trigger = str;
        this.eventConsumer = eventConsumerFactory.get(this::eventToModel, z);
        if (z) {
            eventReceiver.subscribe(this::consumeCloud, new SubscriptionInfo(cls2, Optional.of(str)));
        } else {
            eventReceiver.subscribe(this::consume, new SubscriptionInfo(cls, Optional.of(str)));
        }
        logger.info("Consumer for {} started.", cls);
    }

    public void consumeCloud(T t) {
        logger.debug("Received: {} on thread {}", t, Thread.currentThread().getName());
        this.eventConsumer.consume(this.application, this.process, t, this.trigger);
    }

    public void consume(D d) {
        logger.debug("Received: {} on thread {}", d, Thread.currentThread().getName());
        this.eventConsumer.consume(this.application, this.process, d, this.trigger);
    }

    @Override // org.kie.kogito.event.InputTriggerAware
    public String getInputTrigger() {
        return this.trigger;
    }

    protected abstract M eventToModel(D d);
}
