package org.kie.kogito.index.service.messaging;

import io.quarkus.arc.properties.UnlessBuildProperty;
import io.smallrye.mutiny.Uni;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.event.ProcessInstanceEventMapper;
import org.kie.kogito.index.event.UserTaskInstanceEventMapper;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.service.IndexingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@UnlessBuildProperty(name = "kogito.data-index.blocking", stringValue = "true", enableIfMissing = true)
/* loaded from: input_file:org/kie/kogito/index/service/messaging/ReactiveMessagingEventConsumer.class */
public class ReactiveMessagingEventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    public static final String KOGITO_PROCESSINSTANCES_EVENTS = "kogito-processinstances-events";
    public static final String KOGITO_USERTASKINSTANCES_EVENTS = "kogito-usertaskinstances-events";
    public static final String KOGITO_JOBS_EVENTS = "kogito-jobs-events";

    @Inject
    IndexingService indexingService;

    @Inject
    Event<DataEvent> eventPublisher;

    @Incoming(KOGITO_PROCESSINSTANCES_EVENTS)
    public Uni<Void> onProcessInstanceEvent(ProcessInstanceDataEvent processInstanceDataEvent) {
        LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", processInstanceDataEvent);
        return Uni.createFrom().item(processInstanceDataEvent).invoke(processInstanceDataEvent2 -> {
            this.indexingService.indexProcessInstance(new ProcessInstanceEventMapper().apply(processInstanceDataEvent2));
        }).invoke(processInstanceDataEvent3 -> {
            this.eventPublisher.fire(processInstanceDataEvent3);
        }).onFailure().invoke(th -> {
            LOGGER.error("Error processing process instance ProcessInstanceDataEvent: {}", th.getMessage(), th);
        }).onItem().ignore().andContinueWithNull();
    }

    @Incoming(KOGITO_USERTASKINSTANCES_EVENTS)
    public Uni<Void> onUserTaskInstanceEvent(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
        LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", userTaskInstanceDataEvent);
        return Uni.createFrom().item(userTaskInstanceDataEvent).invoke(userTaskInstanceDataEvent2 -> {
            this.indexingService.indexUserTaskInstance(new UserTaskInstanceEventMapper().apply(userTaskInstanceDataEvent2));
        }).invoke(userTaskInstanceDataEvent3 -> {
            this.eventPublisher.fire(userTaskInstanceDataEvent3);
        }).onFailure().invoke(th -> {
            LOGGER.error("Error processing task instance UserTaskInstanceDataEvent: {}", th.getMessage(), th);
        }).onItem().ignore().andContinueWithNull();
    }

    @Incoming(KOGITO_JOBS_EVENTS)
    public Uni<Void> onJobEvent(KogitoJobCloudEvent kogitoJobCloudEvent) {
        LOGGER.debug("Job received KogitoJobCloudEvent \n{}", kogitoJobCloudEvent);
        return Uni.createFrom().item(kogitoJobCloudEvent).onItem().invoke(kogitoJobCloudEvent2 -> {
            this.indexingService.indexJob((Job) kogitoJobCloudEvent2.getData());
        }).onFailure().invoke(th -> {
            LOGGER.error("Error processing job KogitoJobCloudEvent: {}", th.getMessage(), th);
        }).onItem().ignore().andContinueWithNull();
    }
}
