package org.kie.kogito.jobs.messaging.quarkus;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.JobsServiceException;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.api.JobCallbackResourceDef;
import org.kie.kogito.jobs.api.event.CancelJobRequestEvent;
import org.kie.kogito.jobs.api.event.CreateProcessInstanceJobRequestEvent;
import org.kie.kogito.jobs.api.event.JobCloudEvent;
import org.kie.kogito.jobs.api.event.serialization.JobCloudEventSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/jobs/messaging/quarkus/AbstractReactiveMessagingJobsService.class */
public abstract class AbstractReactiveMessagingJobsService implements JobsService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractReactiveMessagingJobsService.class);
    public static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_CHANNEL = "kogito-job-service-job-request-events";
    private final JobCloudEventSerializer serializer;
    private final URI serviceUrl;
    private final Emitter<String> eventsEmitter;

    protected AbstractReactiveMessagingJobsService() {
        this(null, null, null);
    }

    protected AbstractReactiveMessagingJobsService(URI uri, ObjectMapper objectMapper, Emitter<String> emitter) {
        this.serviceUrl = uri;
        this.eventsEmitter = emitter;
        this.serializer = new JobCloudEventSerializer(objectMapper);
    }

    public String scheduleProcessJob(ProcessJobDescription processJobDescription) {
        throw new UnsupportedOperationException("Scheduling for process jobs is not yet implemented");
    }

    public String scheduleProcessInstanceJob(ProcessInstanceJobDescription processInstanceJobDescription) {
        Job buildCallbackPatternJob = JobCallbackResourceDef.buildCallbackPatternJob(processInstanceJobDescription, JobCallbackResourceDef.buildCallbackURI(processInstanceJobDescription, this.serviceUrl.toString()));
        LOGGER.debug("scheduleProcessInstanceJob job: {}", buildCallbackPatternJob);
        emitEvent(CreateProcessInstanceJobRequestEvent.builder().source(this.serviceUrl).job(buildCallbackPatternJob).processInstanceId(processInstanceJobDescription.processInstanceId()).processId(processInstanceJobDescription.processId()).rootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId()).rootProcessId(processInstanceJobDescription.rootProcessId()).kogitoAddons(getAddonName()).build());
        return buildCallbackPatternJob.getId();
    }

    public boolean cancelJob(String str) {
        LOGGER.debug("cancelJob, id: {}", str);
        emitEvent(CancelJobRequestEvent.builder().source(this.serviceUrl).jobId(str).kogitoAddons(getAddonName()).build());
        return true;
    }

    protected Message<String> decorate(Message<String> message) {
        return message;
    }

    protected abstract String getAddonName();

    void emitEvent(JobCloudEvent<?> jobCloudEvent) {
        LOGGER.debug("About to emit JobCloudEvent {} to channel {}", jobCloudEvent, KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_CHANNEL);
        try {
            String serialize = this.serializer.serialize(jobCloudEvent);
            LOGGER.trace("JobCloudEvent json value: {}", serialize);
            Context currentContext = Vertx.currentContext();
            Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
                this.eventsEmitter.send(decorate(ContextAwareMessage.of(serialize).withAck(() -> {
                    uniEmitter.complete((Object) null);
                    return CompletableFuture.completedFuture(null);
                }).withNack(th -> {
                    uniEmitter.fail(th);
                    return CompletableFuture.completedFuture(null);
                })));
            });
            if (currentContext != null) {
                emitter = emitter.emitOn(runnable -> {
                    currentContext.runOnContext(r3 -> {
                        runnable.run();
                    });
                });
            }
            emitter.await().indefinitely();
            LOGGER.trace("Successfully emitted JobCloudEvent {} to channel {}", jobCloudEvent, KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_CHANNEL);
        } catch (Exception e) {
            throw new JobsServiceException("Error while emitting JobCloudEvent event to channel: kogito-job-service-job-request-events, event: " + jobCloudEvent, e);
        }
    }
}
