package org.kie.kogito.jobs.service.scheduler;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.management.MessagingChangeEvent;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.class */
public class JobSchedulerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedulerManager.class);

    @ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes")
    long schedulerChunkInMinutes;

    @ConfigProperty(name = "kogito.jobs-service.loadJobIntervalInMinutes")
    long loadJobIntervalInMinutes;

    @ConfigProperty(name = "kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes")
    long loadJobFromCurrentTimeIntervalInMinutes;

    @Inject
    TimerDelegateJobScheduler scheduler;

    @Inject
    ReactiveJobRepository repository;

    @Inject
    Vertx vertx;
    private AtomicBoolean enabled = new AtomicBoolean(false);

    void onStartup(@Observes @Priority(4000) StartupEvent startupEvent) {
        if (this.loadJobIntervalInMinutes > this.schedulerChunkInMinutes) {
            LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be greater than schedulerChunkInMinutes ({}), setting value {} for both", new Object[]{Long.valueOf(this.loadJobIntervalInMinutes), Long.valueOf(this.schedulerChunkInMinutes), Long.valueOf(this.schedulerChunkInMinutes)});
            this.loadJobIntervalInMinutes = this.schedulerChunkInMinutes;
        }
        this.vertx.runOnContext(this::loadJobDetails);
        this.vertx.setPeriodic(TimeUnit.MINUTES.toMillis(this.loadJobIntervalInMinutes), l -> {
            loadJobDetails();
        });
    }

    protected void onMessagingStatusChange(@Observes MessagingChangeEvent messagingChangeEvent) {
        this.enabled.set(messagingChangeEvent.isEnabled());
    }

    void loadJobDetails() {
        if (this.enabled.get()) {
            loadJobsInCurrentChunk().filter(jobDetails -> {
                return !this.scheduler.scheduled(jobDetails.getId()).isPresent();
            }).flatMapRsPublisher(jobDetails2 -> {
                TimerDelegateJobScheduler timerDelegateJobScheduler = this.scheduler;
                Objects.requireNonNull(timerDelegateJobScheduler);
                return ErrorHandling.skipErrorPublisher(timerDelegateJobScheduler::schedule, jobDetails2);
            }).forEach(jobDetails3 -> {
                LOGGER.debug("Loaded and scheduled job {}", jobDetails3);
            }).run().whenComplete((r3, th) -> {
                Optional.ofNullable(th).map(th -> {
                    LOGGER.error("Error Loading scheduled jobs!", th);
                    return null;
                }).orElseGet(() -> {
                    LOGGER.info("Loading scheduled jobs completed !");
                    return null;
                });
            });
        } else {
            LOGGER.info("Skip loading scheduled jobs");
        }
    }

    private PublisherBuilder<JobDetails> loadJobsInCurrentChunk() {
        return this.repository.findByStatusBetweenDatesOrderByPriority(DateUtil.now().minusMinutes(this.loadJobFromCurrentTimeIntervalInMinutes), DateUtil.now().plusMinutes(this.schedulerChunkInMinutes), JobStatus.SCHEDULED, JobStatus.RETRY);
    }
}
