/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.index.messaging;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.index.event.DomainModelRegisteredEvent;
import org.kie.kogito.index.event.KogitoProcessCloudEvent;
import org.kie.kogito.index.event.KogitoUserTaskCloudEvent;
import org.kie.kogito.index.json.ProcessInstanceMetaMapper;
import org.kie.kogito.index.json.UserTaskInstanceMetaMapper;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.service.IndexingService;
import org.kie.kogito.index.vertx.ObjectNodeMessageCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ReactiveMessagingEventConsumer {
    protected static final String KOGITO_DOMAIN_EVENTS = "kogito-domain-events-%s";
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    private static final String KOGITO_PROCESSINSTANCES_EVENTS = "kogito-processinstances-events";
    private static final String KOGITO_PROCESSDOMAIN_EVENTS = "kogito-processdomain-events";
    private static final String KOGITO_USERTASKDOMAIN_EVENTS = "kogito-usertaskdomain-events";
    private static final String KOGITO_USERTASKINSTANCES_EVENTS = "kogito-usertaskinstances-events";
    @Inject
    IndexingService indexingService;
    @Inject
    EventBus eventBus;
    @Inject
    ObjectNodeMessageCodec codec;
    private Map<String, MessageConsumer<ObjectNode>> consumers = new HashMap<String, MessageConsumer<ObjectNode>>();

    @PostConstruct
    public void setup() {
        this.eventBus.registerDefaultCodec(ObjectNode.class, this.codec);
    }

    public void onDomainModelRegisteredEvent(@Observes DomainModelRegisteredEvent event) {
        LOGGER.info("New domain model registered for Process Id: {}", (Object)event.getProcessId());
        this.consumers.computeIfAbsent(event.getProcessId(), f -> {
            MessageConsumer consumer = this.eventBus.consumer(String.format(KOGITO_DOMAIN_EVENTS, event.getProcessId()), e -> this.onDomainEvent((Message<ObjectNode>)e));
            LOGGER.info("Consumer registered for address: {}", (Object)consumer.address());
            return consumer;
        });
    }

    @Incoming(value="kogito-processinstances-events")
    public CompletionStage<Void> onProcessInstanceEvent(KogitoProcessCloudEvent event) {
        LOGGER.debug("Process instance consumer received KogitoCloudEvent: \n{}", (Object)event);
        return CompletableFuture.runAsync(() -> this.indexingService.indexProcessInstance((ProcessInstance)event.getData()));
    }

    @Incoming(value="kogito-processdomain-events")
    public CompletionStage<Void> onProcessInstanceDomainEvent(KogitoProcessCloudEvent event) {
        LOGGER.debug("Process domain consumer received KogitoCloudEvent: \n{}", (Object)event);
        ObjectNode json = new ProcessInstanceMetaMapper().apply(event);
        return this.sendMessage(json);
    }

    @Incoming(value="kogito-usertaskinstances-events")
    public CompletionStage<Void> onUserTaskInstanceEvent(KogitoUserTaskCloudEvent event) {
        LOGGER.debug("Task instance received KogitoUserTaskCloudEvent \n{}", (Object)event);
        return CompletableFuture.runAsync(() -> this.indexingService.indexUserTaskInstance((UserTaskInstance)event.getData()));
    }

    @Incoming(value="kogito-usertaskdomain-events")
    public CompletionStage<Void> onUserTaskInstanceDomainEvent(KogitoUserTaskCloudEvent event) {
        LOGGER.debug("Task domain received KogitoUserTaskCloudEvent \n{}", (Object)event);
        ObjectNode json = new UserTaskInstanceMetaMapper().apply(event);
        return this.sendMessage(json);
    }

    private CompletableFuture<Void> sendMessage(ObjectNode json) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        String processId = json.get("processId").asText();
        this.eventBus.request(String.format(KOGITO_DOMAIN_EVENTS, processId), json, async -> cf.complete(null));
        return cf;
    }

    private void onDomainEvent(Message<ObjectNode> message) {
        try {
            LOGGER.debug("Processing domain message: {}", (Object)message);
            this.indexingService.indexModel(message.body());
            message.reply(null);
        }
        catch (Exception ex) {
            LOGGER.error("Error processing domain event: {}", (Object)ex.getMessage(), (Object)ex);
            message.fail(0, ex.getMessage());
        }
    }

    @PreDestroy
    public void destroy() {
        this.consumers.values().forEach(c -> c.unregister());
        this.consumers.clear();
    }
}

