package org.kie.kogito.index.messaging;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.index.event.DomainModelRegisteredEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.event.KogitoProcessCloudEvent;
import org.kie.kogito.index.event.KogitoUserTaskCloudEvent;
import org.kie.kogito.index.json.ProcessInstanceMetaMapper;
import org.kie.kogito.index.json.UserTaskInstanceMetaMapper;
import org.kie.kogito.index.service.IndexingService;
import org.kie.kogito.index.vertx.ObjectNodeMessageCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/index/messaging/ReactiveMessagingEventConsumer.class */
public class ReactiveMessagingEventConsumer {
    protected static final String KOGITO_DOMAIN_EVENTS = "kogito-domain-events-%s";
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    private static final String KOGITO_PROCESSINSTANCES_EVENTS = "kogito-processinstances-events";
    private static final String KOGITO_PROCESSDOMAIN_EVENTS = "kogito-processdomain-events";
    private static final String KOGITO_USERTASKDOMAIN_EVENTS = "kogito-usertaskdomain-events";
    private static final String KOGITO_USERTASKINSTANCES_EVENTS = "kogito-usertaskinstances-events";
    private static final String KOGITO_JOBS_EVENTS = "kogito-jobs-events";

    @Inject
    IndexingService indexingService;

    @Inject
    EventBus eventBus;

    @Inject
    ObjectNodeMessageCodec codec;
    private Map<String, MessageConsumer<ObjectNode>> consumers = new HashMap();

    @PostConstruct
    public void setup() {
        this.eventBus.registerDefaultCodec(ObjectNode.class, this.codec);
    }

    public void onDomainModelRegisteredEvent(@Observes DomainModelRegisteredEvent domainModelRegisteredEvent) {
        LOGGER.info("New domain model registered for Process Id: {}", domainModelRegisteredEvent.getProcessId());
        this.consumers.computeIfAbsent(domainModelRegisteredEvent.getProcessId(), str -> {
            MessageConsumer consumer = this.eventBus.consumer(String.format(KOGITO_DOMAIN_EVENTS, domainModelRegisteredEvent.getProcessId()), message -> {
                onDomainEvent(message);
            });
            LOGGER.info("Consumer registered for address: {}", consumer.address());
            return consumer;
        });
    }

    @Incoming(KOGITO_PROCESSINSTANCES_EVENTS)
    public CompletionStage<Void> onProcessInstanceEvent(KogitoProcessCloudEvent kogitoProcessCloudEvent) {
        LOGGER.debug("Process instance consumer received KogitoCloudEvent: \n{}", kogitoProcessCloudEvent);
        return CompletableFuture.runAsync(() -> {
            this.indexingService.indexProcessInstance(kogitoProcessCloudEvent.getData());
        });
    }

    @Incoming(KOGITO_PROCESSDOMAIN_EVENTS)
    public CompletionStage<Void> onProcessInstanceDomainEvent(KogitoProcessCloudEvent kogitoProcessCloudEvent) {
        LOGGER.debug("Process domain consumer received KogitoCloudEvent: \n{}", kogitoProcessCloudEvent);
        return sendMessage(new ProcessInstanceMetaMapper().apply(kogitoProcessCloudEvent));
    }

    @Incoming(KOGITO_USERTASKINSTANCES_EVENTS)
    public CompletionStage<Void> onUserTaskInstanceEvent(KogitoUserTaskCloudEvent kogitoUserTaskCloudEvent) {
        LOGGER.debug("Task instance received KogitoUserTaskCloudEvent \n{}", kogitoUserTaskCloudEvent);
        return CompletableFuture.runAsync(() -> {
            this.indexingService.indexUserTaskInstance(kogitoUserTaskCloudEvent.getData());
        });
    }

    @Incoming(KOGITO_USERTASKDOMAIN_EVENTS)
    public CompletionStage<Void> onUserTaskInstanceDomainEvent(KogitoUserTaskCloudEvent kogitoUserTaskCloudEvent) {
        LOGGER.debug("Task domain received KogitoUserTaskCloudEvent \n{}", kogitoUserTaskCloudEvent);
        return sendMessage(new UserTaskInstanceMetaMapper().apply(kogitoUserTaskCloudEvent));
    }

    @Incoming(KOGITO_JOBS_EVENTS)
    public CompletionStage<Void> onJobEvent(KogitoJobCloudEvent kogitoJobCloudEvent) {
        LOGGER.debug("Job received KogitoJobCloudEvent \n{}", kogitoJobCloudEvent);
        return CompletableFuture.runAsync(() -> {
            this.indexingService.indexJob(kogitoJobCloudEvent.getData());
        });
    }

    private CompletableFuture<Void> sendMessage(ObjectNode objectNode) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.eventBus.request(String.format(KOGITO_DOMAIN_EVENTS, objectNode.get("processId").asText()), objectNode, asyncResult -> {
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private void onDomainEvent(Message<ObjectNode> message) {
        try {
            LOGGER.debug("Processing domain message: {}", message);
            this.indexingService.indexModel((ObjectNode) message.body());
            message.reply((Object) null);
        } catch (Exception e) {
            LOGGER.error("Error processing domain event: {}", e.getMessage(), e);
            message.fail(0, e.getMessage());
        }
    }

    @PreDestroy
    public void destroy() {
        this.consumers.values().forEach(messageConsumer -> {
            messageConsumer.unregister();
        });
        this.consumers.clear();
    }
}
