/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.messaging;

import io.smallrye.mutiny.Uni;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.api.event.CancelJobRequestEvent;
import org.kie.kogito.jobs.api.event.CreateProcessInstanceJobRequestEvent;
import org.kie.kogito.jobs.api.event.JobCloudEvent;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.model.job.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ReactiveMessagingEventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS = "kogito-job-service-job-request-events";
    @Inject
    TimerDelegateJobScheduler scheduler;
    @Inject
    ReactiveJobRepository jobRepository;

    @Incoming(value="kogito-job-service-job-request-events")
    @Acknowledgment(value=Acknowledgment.Strategy.MANUAL)
    @Retry(delay=500L, maxRetries=4)
    public Uni<Void> onKogitoServiceRequest(Message<JobCloudEvent<?>> message) {
        JobCloudEvent<?> jobCloudEvent = message.getPayload();
        switch (jobCloudEvent.getType()) {
            case "CreateProcessInstanceJobRequest": {
                return this.handleEvent(message, (CreateProcessInstanceJobRequestEvent)jobCloudEvent);
            }
            case "CancelJobRequest": {
                return this.handleEvent(message, (CancelJobRequestEvent)jobCloudEvent);
            }
        }
        LOGGER.error("Unexpected job request type: {}, for the cloud event: {}", (Object)jobCloudEvent.getType(), (Object)jobCloudEvent);
        return Uni.createFrom().completionStage(message.nack(new JobServiceException("Unexpected job request type: " + jobCloudEvent.getType())));
    }

    private Uni<Void> handleEvent(Message<JobCloudEvent<?>> message, CreateProcessInstanceJobRequestEvent event) {
        return Uni.createFrom().completionStage(this.jobRepository.get(((Job)event.getData()).getId())).flatMap(existingJob -> {
            if (existingJob == null || existingJob.getStatus() == JobStatus.SCHEDULED) {
                return Uni.createFrom().publisher(this.scheduler.schedule(ScheduledJobAdapter.to(ScheduledJob.builder().job((Job)event.getData()).build())));
            }
            LOGGER.info("A Job in status: {} already exists for the job id: {}, no processing will be done fot the event: {}.", new Object[]{existingJob.getStatus(), existingJob.getId(), event});
            return Uni.createFrom().item(existingJob);
        }).onItem().transformToUni(createdJob -> {
            if (createdJob == null) {
                return Uni.createFrom().failure(new JobServiceException("An internal scheduler error was produced during Job scheduling"));
            }
            return Uni.createFrom().completionStage(message.ack());
        }).onFailure().recoverWithUni(throwable -> {
            String msg = String.format("An error was produced during Job scheduling for the event: %s", event);
            LOGGER.error(msg, (Throwable)throwable);
            return Uni.createFrom().completionStage(message.nack(new JobServiceException("An error was produced during Job scheduling: " + throwable.getMessage(), (Throwable)throwable)));
        });
    }

    private Uni<Void> handleEvent(Message<JobCloudEvent<?>> message, CancelJobRequestEvent event) {
        return Uni.createFrom().completionStage(this.scheduler.cancel(((CancelJobRequestEvent.JobId)event.getData()).getId())).onItemOrFailure().transformToUni((cancelledJob, throwable) -> {
            if (throwable != null) {
                String msg = String.format("An error was produced during Job cancelling for the event: %s", event);
                LOGGER.error(msg, (Throwable)throwable);
                return Uni.createFrom().completionStage(message.nack(new JobServiceException("An error was produced during Job cancelling: " + throwable.getMessage(), (Throwable)throwable)));
            }
            if (cancelledJob == null) {
                LOGGER.info("No Job exists for the job id: {} or it was already cancelled", (Object)((CancelJobRequestEvent.JobId)event.getData()).getId());
            }
            return Uni.createFrom().completionStage(message.ack());
        });
    }
}

