package org.kie.kogito.jobs.service.scheduler;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.TransformingOperators;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.class */
public abstract class BaseTimerJobScheduler implements ReactiveJobScheduler<ScheduledJob> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BaseTimerJobScheduler.class);

    @ConfigProperty(name = "kogito.jobs-service.backoffRetryMillis")
    long backoffRetryMillis;

    @ConfigProperty(name = "kogito.jobs-service.maxIntervalLimitToRetryMillis")
    long maxIntervalLimitToRetryMillis;

    @ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes")
    long schedulerChunkInMinutes;

    @Inject
    JobExecutor jobExecutor;

    @Inject
    ReactiveJobRepository jobRepository;
    private final Map<String, ZonedDateTime> schedulerControl;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseTimerJobScheduler() {
        this(null, null, 0L, 0L);
    }

    public BaseTimerJobScheduler(JobExecutor jobExecutor, ReactiveJobRepository reactiveJobRepository, long j, long j2) {
        this.jobExecutor = jobExecutor;
        this.jobRepository = reactiveJobRepository;
        this.backoffRetryMillis = j;
        this.maxIntervalLimitToRetryMillis = j2;
        this.schedulerControl = new ConcurrentHashMap();
    }

    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler, org.kie.kogito.jobs.service.scheduler.JobScheduler
    public Publisher<ScheduledJob> schedule(Job job) {
        LOGGER.debug("Scheduling {}", job);
        return ReactiveStreams.fromCompletionStage(this.jobRepository.exists(job.getId())).flatMap(bool -> {
            return Boolean.TRUE.equals(bool) ? handleExistingJob(job) : ReactiveStreams.of(job);
        }).flatMap(job2 -> {
            return isOnCurrentSchedulerChunk(job) ? doJobScheduling(job) : ReactiveStreams.fromCompletionStage(this.jobRepository.save(ScheduledJob.builder().job(job).status(JobStatus.SCHEDULED).build()));
        }).buildRs();
    }

    private PublisherBuilder<ScheduledJob> doJobScheduling(Job job) {
        return ReactiveStreams.of(job).map(job2 -> {
            return job.getExpirationTime();
        }).map(this::calculateDelay).peek(duration -> {
            Optional of = Optional.of(Boolean.valueOf(duration.isNegative()));
            Boolean bool = Boolean.FALSE;
            Objects.requireNonNull(bool);
            of.filter((v1) -> {
                return r1.equals(v1);
            }).orElseThrow(() -> {
                return new RuntimeException("Delay should be positive");
            });
        }).map(duration2 -> {
            return schedule(duration2, job);
        }).flatMap(publisherBuilder -> {
            return publisherBuilder;
        }).map(str -> {
            return ScheduledJob.builder().job(job).scheduledId(str).status(JobStatus.SCHEDULED).build();
        }).map(scheduledJob -> {
            return this.jobRepository.save(scheduledJob);
        }).flatMapCompletionStage(completionStage -> {
            return completionStage;
        });
    }

    private boolean isOnCurrentSchedulerChunk(Job job) {
        return job.getExpirationTime().isBefore(DateUtil.now().plusMinutes(this.schedulerChunkInMinutes));
    }

    private PublisherBuilder<ScheduledJob> handleExistingJob(Job job) {
        return ReactiveStreams.fromCompletionStage(this.jobRepository.get(job.getId())).flatMap(scheduledJob -> {
            switch (scheduledJob.getStatus()) {
                case SCHEDULED:
                    return wasPeriodicScheduled(scheduledJob) ? handleJobExecutionSuccess(scheduledJob).flatMap(scheduledJob -> {
                        return ReactiveStreams.empty();
                    }) : handleExpirationTime(scheduledJob).map(scheduledJob2 -> {
                        return ScheduledJob.builder().of(scheduledJob2).status(JobStatus.CANCELED).build();
                    }).map((v0) -> {
                        return CompletableFuture.completedFuture(v0);
                    }).flatMapCompletionStage((v1) -> {
                        return cancel(v1);
                    }).map(scheduledJob3 -> {
                        return scheduledJob;
                    });
                case RETRY:
                    return handleRetry(CompletableFuture.completedFuture(scheduledJob));
                default:
                    return ReactiveStreams.empty();
            }
        }).onErrorResumeWith(th -> {
            return ReactiveStreams.empty();
        });
    }

    private Duration calculateDelay(ZonedDateTime zonedDateTime) {
        return Duration.between(DateUtil.now(), zonedDateTime);
    }

    private boolean validLimit(ScheduledJob scheduledJob) {
        return Optional.of(scheduledJob).map((v0) -> {
            return v0.getRepeatLimit();
        }).filter(num -> {
            return scheduledJob.getExecutionCounter().intValue() < num.intValue();
        }).isPresent();
    }

    private boolean wasPeriodicScheduled(ScheduledJob scheduledJob) {
        return Optional.ofNullable(scheduledJob).filter(scheduledJob2 -> {
            return scheduledJob2.getExecutionCounter().intValue() > 1;
        }).isPresent();
    }

    public PublisherBuilder<ScheduledJob> handleJobExecutionSuccess(ScheduledJob scheduledJob) {
        return ReactiveStreams.of(scheduledJob).map(scheduledJob2 -> {
            return ScheduledJob.builder().of(scheduledJob2).incrementExecutionCounter().build();
        }).flatMap(scheduledJob3 -> {
            return (PublisherBuilder) scheduledJob3.hasInterval().filter(l -> {
                return !wasPeriodicScheduled(scheduledJob3);
            }).map((v0) -> {
                return Duration.ofMillis(v0);
            }).map(duration -> {
                TransformingOperators map = periodicSchedule(duration, scheduledJob3).map(str -> {
                    return ScheduledJob.builder().of(scheduledJob3).scheduledId(str).expirationTime(DateUtil.now().plus((TemporalAmount) duration)).status(JobStatus.SCHEDULED).build();
                });
                ReactiveJobRepository reactiveJobRepository = this.jobRepository;
                Objects.requireNonNull(reactiveJobRepository);
                return map.flatMapCompletionStage(reactiveJobRepository::save);
            }).orElseGet(() -> {
                return ReactiveStreams.fromCompletionStage((CompletionStage) scheduledJob3.hasInterval().map(l2 -> {
                    Optional map = Optional.of(scheduledJob3).filter(this::wasPeriodicScheduled).filter(this::validLimit).map(scheduledJob3 -> {
                        return ScheduledJob.builder().of(scheduledJob3).expirationTime(DateUtil.now().plus((TemporalAmount) Duration.ofMillis(l2.longValue()))).build();
                    });
                    ReactiveJobRepository reactiveJobRepository = this.jobRepository;
                    Objects.requireNonNull(reactiveJobRepository);
                    return (CompletionStage) map.map(reactiveJobRepository::save).orElse(null);
                }).orElseGet(() -> {
                    return CompletableFuture.completedFuture(ScheduledJob.builder().of(scheduledJob3).status(JobStatus.EXECUTED).build());
                }));
            });
        }).filter(scheduledJob4 -> {
            return JobStatus.EXECUTED.equals(scheduledJob4.getStatus());
        }).flatMap(scheduledJob5 -> {
            return ReactiveStreams.fromCompletionStage(cancel(CompletableFuture.completedFuture(scheduledJob5)));
        });
    }

    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler
    public PublisherBuilder<ScheduledJob> handleJobExecutionSuccess(JobExecutionResponse jobExecutionResponse) {
        PublisherBuilder map = ReactiveStreams.of(jobExecutionResponse).map((v0) -> {
            return v0.getJobId();
        });
        ReactiveJobRepository reactiveJobRepository = this.jobRepository;
        Objects.requireNonNull(reactiveJobRepository);
        return map.flatMapCompletionStage(reactiveJobRepository::get).flatMap(this::handleJobExecutionSuccess);
    }

    private boolean isExpired(ZonedDateTime zonedDateTime) {
        return calculateDelay(zonedDateTime).plus(Duration.ofMillis(this.maxIntervalLimitToRetryMillis)).isNegative();
    }

    private PublisherBuilder<ScheduledJob> handleExpirationTime(ScheduledJob scheduledJob) {
        return ReactiveStreams.of(scheduledJob).map((v0) -> {
            return v0.getExpirationTime();
        }).flatMapCompletionStage(zonedDateTime -> {
            return isExpired(zonedDateTime) ? handleExpiredJob(scheduledJob) : CompletableFuture.completedFuture(scheduledJob);
        });
    }

    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler
    public PublisherBuilder<ScheduledJob> handleJobExecutionError(JobExecutionResponse jobExecutionResponse) {
        return handleRetry(this.jobRepository.get(jobExecutionResponse.getJobId()));
    }

    private PublisherBuilder<ScheduledJob> handleRetry(CompletionStage<ScheduledJob> completionStage) {
        return ReactiveStreams.fromCompletionStage(completionStage).flatMap(scheduledJob -> {
            PublisherBuilder map = handleExpirationTime(scheduledJob).map((v0) -> {
                return v0.getStatus();
            }).filter(jobStatus -> {
                return !JobStatus.ERROR.equals(jobStatus);
            }).map(jobStatus2 -> {
                return schedule(Duration.ofMillis(this.backoffRetryMillis), scheduledJob);
            }).flatMap(publisherBuilder -> {
                return publisherBuilder;
            }).map(str -> {
                return ScheduledJob.builder().of(scheduledJob).scheduledId(str).status(JobStatus.RETRY).incrementRetries().build();
            });
            ReactiveJobRepository reactiveJobRepository = this.jobRepository;
            Objects.requireNonNull(reactiveJobRepository);
            return map.map(reactiveJobRepository::save).flatMapCompletionStage(completionStage2 -> {
                return completionStage2;
            });
        }).peek(scheduledJob2 -> {
            LOGGER.debug("Retry executed {}", scheduledJob2);
        });
    }

    private CompletionStage<ScheduledJob> handleExpiredJob(ScheduledJob scheduledJob) {
        return (CompletionStage) Optional.of(ScheduledJob.builder().of(scheduledJob).status(JobStatus.ERROR).build()).map(scheduledJob2 -> {
            return this.jobRepository.delete(scheduledJob2).thenApply(scheduledJob2 -> {
                LOGGER.warn("Retry limit exceeded for job{}", scheduledJob2);
                return scheduledJob2;
            });
        }).orElse(null);
    }

    private PublisherBuilder<String> schedule(Duration duration, Job job) {
        return doSchedule(duration, job).peek((Consumer<? super String>) registerScheduledJob(job));
    }

    private PublisherBuilder<String> periodicSchedule(Duration duration, Job job) {
        return doPeriodicSchedule(duration, job).peek((Consumer<? super String>) registerScheduledJob(job));
    }

    private Consumer<String> registerScheduledJob(Job job) {
        return str -> {
            this.schedulerControl.put(job.getId(), DateUtil.now());
        };
    }

    public abstract PublisherBuilder<String> doSchedule(Duration duration, Job job);

    public abstract PublisherBuilder<String> doPeriodicSchedule(Duration duration, Job job);

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletionStage<ScheduledJob> execute(Job job) {
        LOGGER.debug("Executing job ! {}", job);
        return this.jobExecutor.execute(this.jobRepository.get(job.getId())).whenComplete((scheduledJob, th) -> {
            unregisterScheduledJob(job);
        });
    }

    private ZonedDateTime unregisterScheduledJob(Job job) {
        return this.schedulerControl.remove(job.getId());
    }

    public CompletionStage<ScheduledJob> cancel(CompletionStage<ScheduledJob> completionStage) {
        PublisherBuilder flatMap = ReactiveStreams.fromCompletionStageNullable(completionStage).peek(scheduledJob -> {
            LOGGER.debug("Cancel Job Scheduling {}", scheduledJob);
        }).flatMap(scheduledJob2 -> {
            return ReactiveStreams.concat(ReactiveStreams.of(scheduledJob2), ReactiveStreams.fromPublisher(doCancel(scheduledJob2)).map(bool -> {
                return scheduledJob2;
            }));
        });
        ReactiveJobRepository reactiveJobRepository = this.jobRepository;
        Objects.requireNonNull(reactiveJobRepository);
        return flatMap.flatMapCompletionStage(reactiveJobRepository::delete).findFirst().run().thenApply(optional -> {
            return (ScheduledJob) optional.orElse(null);
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler, org.kie.kogito.jobs.service.scheduler.JobScheduler
    public CompletionStage<ScheduledJob> cancel(String str) {
        return cancel(this.jobRepository.get(str).thenApply(scheduledJob -> {
            return ScheduledJob.builder().of(scheduledJob).status(JobStatus.CANCELED).build();
        }));
    }

    public abstract Publisher<Boolean> doCancel(ScheduledJob scheduledJob);

    @Override // org.kie.kogito.jobs.service.scheduler.JobScheduler
    public Optional<ZonedDateTime> scheduled(String str) {
        return Optional.ofNullable(this.schedulerControl.get(str));
    }
}
