package org.kie.kogito.jobs.service.messaging;

import io.cloudevents.CloudEvent;
import io.smallrye.mutiny.Uni;
import java.util.Objects;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/jobs/service/messaging/ReactiveMessagingEventConsumer.class */
public abstract class ReactiveMessagingEventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    private final TimerDelegateJobScheduler scheduler;
    private final ReactiveJobRepository jobRepository;
    private final String createJobEventType;
    private final String cancelJobEventType;

    /* JADX INFO: Access modifiers changed from: protected */
    public ReactiveMessagingEventConsumer() {
        this(null, null, null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReactiveMessagingEventConsumer(TimerDelegateJobScheduler timerDelegateJobScheduler, ReactiveJobRepository reactiveJobRepository, String str, String str2) {
        this.scheduler = timerDelegateJobScheduler;
        this.jobRepository = reactiveJobRepository;
        this.createJobEventType = str;
        this.cancelJobEventType = str2;
    }

    public Uni<Void> onKogitoServiceRequest(Message<CloudEvent> message) {
        CloudEvent cloudEvent = (CloudEvent) message.getPayload();
        String type = cloudEvent.getType();
        if (Objects.equals(this.createJobEventType, type)) {
            return handleCreateEvent(message, getJobDetails(cloudEvent));
        }
        if (Objects.equals(this.cancelJobEventType, type)) {
            return handleCancelEvent(message, getJobId(cloudEvent));
        }
        LOGGER.error("Unexpected job request type: {}, for the cloud event: {}", type, cloudEvent);
        return Uni.createFrom().completionStage(message.nack(new JobServiceException("Unexpected job request type: " + type)));
    }

    public abstract JobDetails getJobDetails(CloudEvent cloudEvent);

    public abstract String getJobId(CloudEvent cloudEvent);

    protected Uni<Void> handleCreateEvent(Message<?> message, JobDetails jobDetails) {
        return Uni.createFrom().completionStage(this.jobRepository.get(jobDetails.getId())).flatMap(jobDetails2 -> {
            if (jobDetails2 == null || jobDetails2.getStatus() == JobStatus.SCHEDULED) {
                return Uni.createFrom().publisher(this.scheduler.schedule(jobDetails));
            }
            LOGGER.info("A Job in status: {} already exists for the job id: {}, no processing will be done fot the event: {}.", new Object[]{jobDetails2.getStatus(), jobDetails2.getId(), message.getPayload()});
            return Uni.createFrom().item(jobDetails2);
        }).onItem().transformToUni(jobDetails3 -> {
            return jobDetails3 == null ? Uni.createFrom().failure(new JobServiceException("An internal scheduler error was produced during Job scheduling")) : Uni.createFrom().completionStage(message.ack());
        }).onFailure().recoverWithUni(th -> {
            LOGGER.error(String.format("An error was produced during Job scheduling for the event: %s", message.getPayload()), th);
            return Uni.createFrom().completionStage(message.nack(new JobServiceException("An error was produced during Job scheduling: " + th.getMessage(), th)));
        });
    }

    protected Uni<Void> handleCancelEvent(Message<?> message, String str) {
        return Uni.createFrom().completionStage(this.scheduler.cancel(str)).onItemOrFailure().transformToUni((jobDetails, th) -> {
            if (th != null) {
                LOGGER.error(String.format("An error was produced during Job cancelling for the event: %s", message.getPayload()), th);
                return Uni.createFrom().completionStage(message.nack(new JobServiceException("An error was produced during Job cancelling: " + th.getMessage(), th)));
            }
            if (jobDetails == null) {
                LOGGER.info("No Job exists for the job id: {} or it was already cancelled", str);
            }
            return Uni.createFrom().completionStage(message.ack());
        });
    }

    public String getCreateJobEventType() {
        return this.createJobEventType;
    }

    public String getCancelJobEventType() {
        return this.cancelJobEventType;
    }
}
