package org.kie.kogito.jobs.service.messaging;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.api.event.CancelJobRequestEvent;
import org.kie.kogito.jobs.api.event.CreateProcessInstanceJobRequestEvent;
import org.kie.kogito.jobs.api.event.JobCloudEvent;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.model.job.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/messaging/ReactiveMessagingEventConsumer.class */
public class ReactiveMessagingEventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ReactiveMessagingEventConsumer.class);
    private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS = "kogito-job-service-job-request-events";

    @Inject
    TimerDelegateJobScheduler scheduler;

    @Inject
    ReactiveJobRepository jobRepository;

    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    @Retry(delay = 500, maxRetries = 4)
    @Incoming(KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS)
    public Uni<Void> onKogitoServiceRequest(Message<JobCloudEvent<?>> message) {
        JobCloudEvent<?> payload = message.getPayload();
        String type = payload.getType();
        boolean z = -1;
        switch (type.hashCode()) {
            case -2005160372:
                if (type.equals(CancelJobRequestEvent.CANCEL_JOB_REQUEST)) {
                    z = true;
                    break;
                }
                break;
            case 51016090:
                if (type.equals(CreateProcessInstanceJobRequestEvent.CREATE_PROCESS_INSTANCE_JOB_REQUEST)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return handleEvent(message, (CreateProcessInstanceJobRequestEvent) payload);
            case true:
                return handleEvent(message, (CancelJobRequestEvent) payload);
            default:
                LOGGER.error("Unexpected job request type: {}, for the cloud event: {}", payload.getType(), payload);
                return Uni.createFrom().completionStage(message.nack(new JobServiceException("Unexpected job request type: " + payload.getType())));
        }
    }

    private Uni<Void> handleEvent(Message<JobCloudEvent<?>> message, CreateProcessInstanceJobRequestEvent createProcessInstanceJobRequestEvent) {
        return Uni.createFrom().completionStage(this.jobRepository.get(createProcessInstanceJobRequestEvent.getData().getId())).flatMap(jobDetails -> {
            if (jobDetails == null || jobDetails.getStatus() == JobStatus.SCHEDULED) {
                return Uni.createFrom().publisher(this.scheduler.schedule(ScheduledJobAdapter.to(ScheduledJob.builder().job(createProcessInstanceJobRequestEvent.getData()).build())));
            }
            LOGGER.info("A Job in status: {} already exists for the job id: {}, no processing will be done fot the event: {}.", jobDetails.getStatus(), jobDetails.getId(), createProcessInstanceJobRequestEvent);
            return Uni.createFrom().item((UniCreate) jobDetails);
        }).onItem().transformToUni(jobDetails2 -> {
            return jobDetails2 == null ? Uni.createFrom().failure(new JobServiceException("An internal scheduler error was produced during Job scheduling")) : Uni.createFrom().completionStage(message.ack());
        }).onFailure().recoverWithUni(th -> {
            LOGGER.error(String.format("An error was produced during Job scheduling for the event: %s", createProcessInstanceJobRequestEvent), th);
            return Uni.createFrom().completionStage(message.nack(new JobServiceException("An error was produced during Job scheduling: " + th.getMessage(), th)));
        });
    }

    private Uni<Void> handleEvent(Message<JobCloudEvent<?>> message, CancelJobRequestEvent cancelJobRequestEvent) {
        return Uni.createFrom().completionStage(this.scheduler.cancel(cancelJobRequestEvent.getData().getId())).onItemOrFailure().transformToUni((jobDetails, th) -> {
            if (th != null) {
                LOGGER.error(String.format("An error was produced during Job cancelling for the event: %s", cancelJobRequestEvent), th);
                return Uni.createFrom().completionStage(message.nack(new JobServiceException("An error was produced during Job cancelling: " + th.getMessage(), th)));
            }
            if (jobDetails == null) {
                LOGGER.info("No Job exists for the job id: {} or it was already cancelled", cancelJobRequestEvent.getData().getId());
            }
            return Uni.createFrom().completionStage(message.ack());
        });
    }
}
