/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.scheduler;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.model.job.ManageableJobHandle;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.JobHandle;
import org.kie.kogito.timer.Trigger;
import org.kie.kogito.timer.impl.PointInTimeTrigger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTimerJobScheduler
implements ReactiveJobScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseTimerJobScheduler.class);
    long backoffRetryMillis;
    long maxIntervalLimitToRetryMillis;
    Optional<Boolean> forceExecuteExpiredJobs;
    long schedulerChunkInMinutes;
    private ReactiveJobRepository jobRepository;
    private final Map<String, ZonedDateTime> schedulerControl;

    protected BaseTimerJobScheduler() {
        this(null, 0L, 0L, 0L, null);
    }

    public BaseTimerJobScheduler(ReactiveJobRepository jobRepository, long backoffRetryMillis, long maxIntervalLimitToRetryMillis, long schedulerChunkInMinutes, Boolean forceExecuteExpiredJobs) {
        this.jobRepository = jobRepository;
        this.backoffRetryMillis = backoffRetryMillis;
        this.maxIntervalLimitToRetryMillis = maxIntervalLimitToRetryMillis;
        this.schedulerControl = new ConcurrentHashMap<String, ZonedDateTime>();
        this.schedulerChunkInMinutes = schedulerChunkInMinutes;
        this.forceExecuteExpiredJobs = Optional.ofNullable(forceExecuteExpiredJobs);
    }

    @Override
    public Publisher<JobDetails> schedule(JobDetails job) {
        LOGGER.debug("Scheduling {}", (Object)job);
        return ReactiveStreams.fromCompletionStage(this.jobRepository.exists(job.getId())).flatMap(exists -> Boolean.TRUE.equals(exists) ? this.handleExistingJob(job) : ReactiveStreams.of(job)).flatMap(j -> this.isOnCurrentSchedulerChunk(job) ? this.doJobScheduling(job) : ReactiveStreams.fromCompletionStage(this.jobRepository.save(this.jobWithStatus(job, JobStatus.SCHEDULED)))).buildRs();
    }

    @Override
    public PublisherBuilder<JobDetails> reschedule(String id, Trigger trigger) {
        return ReactiveStreams.fromCompletionStageNullable(this.jobRepository.merge(id, JobDetails.builder().trigger(trigger).build())).peek(this::doCancel).map(jobDetails -> this.schedule((JobDetails)jobDetails)).flatMapRsPublisher(j -> j);
    }

    private JobDetails jobWithStatus(JobDetails job, JobStatus status) {
        return JobDetails.builder().of(job).status(status).build();
    }

    private JobDetails jobWithStatusAndHandle(JobDetails job, JobStatus status, ManageableJobHandle handle) {
        return JobDetails.builder().of(job).status(status).scheduledId(String.valueOf(handle.getId())).build();
    }

    private PublisherBuilder<JobDetails> doJobScheduling(JobDetails job) {
        return ReactiveStreams.of(job).map(current -> job.getTrigger().hasNextFireTime()).map(DateUtil::fromDate).map(this::calculateDelay).peek(delay -> Optional.of(delay.isNegative()).filter(Boolean.FALSE::equals).orElseThrow(() -> new RuntimeException("The expirationTime should be greater than current time"))).map(delay -> this.scheduleRegistering(job, Optional.empty())).flatMap(p -> p).map(handle -> this.jobWithStatusAndHandle(job, JobStatus.SCHEDULED, (ManageableJobHandle)handle)).map(scheduledJob -> this.jobRepository.save((JobDetails)scheduledJob)).flatMapCompletionStage(p -> p);
    }

    private boolean isOnCurrentSchedulerChunk(JobDetails job) {
        return DateUtil.fromDate(job.getTrigger().hasNextFireTime()).isBefore(DateUtil.now().plusMinutes(this.schedulerChunkInMinutes));
    }

    private PublisherBuilder<JobDetails> handleExistingJob(JobDetails job) {
        return ReactiveStreams.fromCompletionStage(this.jobRepository.get(job.getId())).flatMap(j -> {
            switch (j.getStatus()) {
                case SCHEDULED: {
                    return this.handleExpirationTime((JobDetails)j).map(scheduled -> this.jobWithStatus((JobDetails)scheduled, JobStatus.CANCELED)).map(CompletableFuture::completedFuture).flatMapCompletionStage(this::cancel).map(deleted -> j);
                }
                case RETRY: {
                    return this.handleRetry(CompletableFuture.completedFuture(j)).flatMap(retryJob -> ReactiveStreams.empty());
                }
            }
            return ReactiveStreams.empty();
        }).onErrorResumeWith(t -> ReactiveStreams.empty());
    }

    private Duration calculateDelay(ZonedDateTime expirationTime) {
        return Optional.of(Duration.between(DateUtil.now(), expirationTime)).filter(d -> !d.isNegative()).orElse(this.forceExecuteExpiredJobs.filter(Boolean.TRUE::equals).map(f -> Duration.ofSeconds(1L)).orElse(Duration.ofSeconds(-1L)));
    }

    public PublisherBuilder<JobDetails> handleJobExecutionSuccess(JobDetails futureJob) {
        return ReactiveStreams.of(futureJob).map(job -> JobDetails.builder().of((JobDetails)job).incrementExecutionCounter().build()).peek(job -> job.getTrigger().nextFireTime()).flatMapCompletionStage(this.jobRepository::save).flatMap(job -> Optional.ofNullable(job.getTrigger()).filter(trigger -> Objects.nonNull(trigger.hasNextFireTime())).map(time -> this.doJobScheduling((JobDetails)job)).orElseGet(() -> ReactiveStreams.of(this.jobWithStatus((JobDetails)job, JobStatus.EXECUTED)))).filter(j -> JobStatus.EXECUTED.equals((Object)j.getStatus())).flatMap(j -> ReactiveStreams.fromCompletionStage(this.cancel(CompletableFuture.completedFuture(j))));
    }

    @Override
    public PublisherBuilder<JobDetails> handleJobExecutionSuccess(JobExecutionResponse response) {
        return ReactiveStreams.of(response).map(JobExecutionResponse::getJobId).flatMapCompletionStage(this.jobRepository::get).flatMap(this::handleJobExecutionSuccess);
    }

    private boolean isExpired(ZonedDateTime expirationTime, int retries) {
        Duration limit = Duration.ofMillis(this.maxIntervalLimitToRetryMillis).minus(Duration.ofMillis((long)retries * this.backoffRetryMillis));
        return this.calculateDelay(expirationTime).plus(limit).isNegative();
    }

    private PublisherBuilder<JobDetails> handleExpirationTime(JobDetails scheduledJob) {
        return ReactiveStreams.of(scheduledJob).map(JobDetails::getTrigger).map(Trigger::hasNextFireTime).map(DateUtil::fromDate).flatMapCompletionStage(time -> this.isExpired((ZonedDateTime)time, scheduledJob.getRetries()) ? this.handleExpiredJob(scheduledJob) : CompletableFuture.completedFuture(scheduledJob));
    }

    @Override
    public PublisherBuilder<JobDetails> handleJobExecutionError(JobExecutionResponse errorResponse) {
        return this.handleRetry(this.jobRepository.get(errorResponse.getJobId()));
    }

    private PublisherBuilder<JobDetails> handleRetry(CompletionStage<JobDetails> futureJob) {
        return ReactiveStreams.fromCompletionStage(futureJob).flatMap(scheduledJob -> this.handleExpirationTime((JobDetails)scheduledJob).map(JobDetails::getStatus).filter(s -> !JobStatus.ERROR.equals(s)).map(s -> this.scheduleRegistering((JobDetails)scheduledJob, Optional.of(this.getRetryTrigger()))).flatMap(p -> p).map(scheduleId -> JobDetails.builder().of(this.jobWithStatusAndHandle((JobDetails)scheduledJob, JobStatus.RETRY, (ManageableJobHandle)scheduleId)).incrementRetries().build()).map(this.jobRepository::save).flatMapCompletionStage(p -> p)).peek(job -> LOGGER.debug("Retry executed {}", job));
    }

    private PointInTimeTrigger getRetryTrigger() {
        return new PointInTimeTrigger(DateUtil.now().plus(this.backoffRetryMillis, ChronoUnit.MILLIS).toInstant().toEpochMilli(), null, null);
    }

    private CompletionStage<JobDetails> handleExpiredJob(JobDetails scheduledJob) {
        return Optional.of(this.jobWithStatus(scheduledJob, JobStatus.ERROR)).map(j -> this.jobRepository.delete((JobDetails)j).thenApply(deleted -> {
            this.unregisterScheduledJob((JobDetails)j);
            LOGGER.warn("Retry limit exceeded for job{}", j);
            return j;
        })).orElse(null);
    }

    private PublisherBuilder<ManageableJobHandle> scheduleRegistering(JobDetails job, Optional<Trigger> trigger) {
        return this.doSchedule(job, trigger).peek(this.registerScheduledJob(job));
    }

    private Consumer<JobHandle> registerScheduledJob(JobDetails job) {
        return s -> this.schedulerControl.put(job.getId(), DateUtil.now());
    }

    public abstract PublisherBuilder<ManageableJobHandle> doSchedule(JobDetails var1, Optional<Trigger> var2);

    private ZonedDateTime unregisterScheduledJob(JobDetails job) {
        return this.schedulerControl.remove(job.getId());
    }

    public CompletionStage<JobDetails> cancel(CompletionStage<JobDetails> futureJob) {
        return ReactiveStreams.fromCompletionStageNullable(futureJob).peek(job -> LOGGER.debug("Cancel Job Scheduling {}", job)).flatMap(scheduledJob -> Optional.ofNullable(scheduledJob.getScheduledId()).map(id -> ReactiveStreams.fromPublisher(this.doCancel((JobDetails)scheduledJob)).map(b -> scheduledJob)).orElse(ReactiveStreams.of(scheduledJob))).flatMapCompletionStage(this.jobRepository::delete).peek(this::unregisterScheduledJob).findFirst().run().thenApply(job -> job.orElse(null));
    }

    @Override
    public CompletionStage<JobDetails> cancel(String jobId) {
        return this.cancel(this.jobRepository.get(jobId).thenApply(scheduledJob -> Optional.ofNullable(scheduledJob).map(j -> this.jobWithStatus((JobDetails)j, JobStatus.CANCELED)).orElse(null)));
    }

    public abstract Publisher<ManageableJobHandle> doCancel(JobDetails var1);

    @Override
    public Optional<ZonedDateTime> scheduled(String jobId) {
        return Optional.ofNullable(this.schedulerControl.get(jobId));
    }

    public void setForceExecuteExpiredJobs(boolean forceExecuteExpiredJobs) {
        this.forceExecuteExpiredJobs = Optional.of(forceExecuteExpiredJobs);
    }
}

