package org.kie.kogito.jobs.service.scheduler;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.model.job.ManageableJobHandle;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.JobHandle;
import org.kie.kogito.timer.Trigger;
import org.kie.kogito.timer.impl.PointInTimeTrigger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:test-resources/jobs-service.jar:org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.class */
public abstract class BaseTimerJobScheduler implements ReactiveJobScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BaseTimerJobScheduler.class);
    long backoffRetryMillis;
    long maxIntervalLimitToRetryMillis;
    Optional<Boolean> forceExecuteExpiredJobs;
    long schedulerChunkInMinutes;
    private ReactiveJobRepository jobRepository;
    private final Map<String, ZonedDateTime> schedulerControl;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseTimerJobScheduler() {
        this(null, 0L, 0L, 0L, null);
    }

    public BaseTimerJobScheduler(ReactiveJobRepository reactiveJobRepository, long j, long j2, long j3, Boolean bool) {
        this.jobRepository = reactiveJobRepository;
        this.backoffRetryMillis = j;
        this.maxIntervalLimitToRetryMillis = j2;
        this.schedulerControl = new ConcurrentHashMap();
        this.schedulerChunkInMinutes = j3;
        this.forceExecuteExpiredJobs = Optional.ofNullable(bool);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler, org.kie.kogito.jobs.service.scheduler.JobScheduler
    public Publisher<JobDetails> schedule(JobDetails jobDetails) {
        LOGGER.debug("Scheduling {}", jobDetails);
        return ReactiveStreams.fromCompletionStage(this.jobRepository.exists(jobDetails.getId())).flatMap(bool -> {
            return Boolean.TRUE.equals(bool) ? handleExistingJob(jobDetails) : ReactiveStreams.of(jobDetails);
        }).flatMap(jobDetails2 -> {
            return isOnCurrentSchedulerChunk(jobDetails) ? doJobScheduling(jobDetails) : ReactiveStreams.fromCompletionStage(this.jobRepository.save(jobWithStatus(jobDetails, JobStatus.SCHEDULED)));
        }).buildRs();
    }

    private JobDetails jobWithStatus(JobDetails jobDetails, JobStatus jobStatus) {
        return JobDetails.builder().of(jobDetails).status(jobStatus).build();
    }

    private JobDetails jobWithStatusAndHandle(JobDetails jobDetails, JobStatus jobStatus, ManageableJobHandle manageableJobHandle) {
        return JobDetails.builder().of(jobDetails).status(jobStatus).scheduledId(String.valueOf(manageableJobHandle.getId())).build();
    }

    private PublisherBuilder<JobDetails> doJobScheduling(JobDetails jobDetails) {
        return ReactiveStreams.of(jobDetails).map(jobDetails2 -> {
            return jobDetails.getTrigger().hasNextFireTime();
        }).map(DateUtil::fromDate).map(this::calculateDelay).peek(duration -> {
            Optional of = Optional.of(Boolean.valueOf(duration.isNegative()));
            Boolean bool = Boolean.FALSE;
            Objects.requireNonNull(bool);
            of.filter((v1) -> {
                return r1.equals(v1);
            }).orElseThrow(() -> {
                return new RuntimeException("The expirationTime should be greater than current time");
            });
        }).map(duration2 -> {
            return scheduleRegistering(jobDetails, Optional.empty());
        }).flatMap(publisherBuilder -> {
            return publisherBuilder;
        }).map(manageableJobHandle -> {
            return jobWithStatusAndHandle(jobDetails, JobStatus.SCHEDULED, manageableJobHandle);
        }).map(jobDetails3 -> {
            return this.jobRepository.save(jobDetails3);
        }).flatMapCompletionStage(completionStage -> {
            return completionStage;
        });
    }

    private boolean isOnCurrentSchedulerChunk(JobDetails jobDetails) {
        return DateUtil.fromDate(jobDetails.getTrigger().hasNextFireTime()).isBefore(DateUtil.now().plusMinutes(this.schedulerChunkInMinutes));
    }

    private PublisherBuilder<JobDetails> handleExistingJob(JobDetails jobDetails) {
        return ReactiveStreams.fromCompletionStage(this.jobRepository.get(jobDetails.getId())).flatMap(jobDetails2 -> {
            switch (jobDetails2.getStatus()) {
                case SCHEDULED:
                    return handleExpirationTime(jobDetails2).map(jobDetails2 -> {
                        return jobWithStatus(jobDetails2, JobStatus.CANCELED);
                    }).map((v0) -> {
                        return CompletableFuture.completedFuture(v0);
                    }).flatMapCompletionStage((v1) -> {
                        return cancel(v1);
                    }).map(jobDetails3 -> {
                        return jobDetails2;
                    });
                case RETRY:
                    return handleRetry(CompletableFuture.completedFuture(jobDetails2)).flatMap(jobDetails4 -> {
                        return ReactiveStreams.empty();
                    });
                default:
                    return ReactiveStreams.empty();
            }
        }).onErrorResumeWith(th -> {
            return ReactiveStreams.empty();
        });
    }

    private Duration calculateDelay(ZonedDateTime zonedDateTime) {
        Optional filter = Optional.of(Duration.between(DateUtil.now(), zonedDateTime)).filter(duration -> {
            return !duration.isNegative();
        });
        Optional<Boolean> optional = this.forceExecuteExpiredJobs;
        Boolean bool = Boolean.TRUE;
        Objects.requireNonNull(bool);
        return (Duration) filter.orElse((Duration) optional.filter((v1) -> {
            return r2.equals(v1);
        }).map(bool2 -> {
            return Duration.ofSeconds(1L);
        }).orElse(Duration.ofSeconds(-1L)));
    }

    public PublisherBuilder<JobDetails> handleJobExecutionSuccess(JobDetails jobDetails) {
        PublisherBuilder peek = ReactiveStreams.of(jobDetails).map(jobDetails2 -> {
            return JobDetails.builder().of(jobDetails2).incrementExecutionCounter().build();
        }).peek(jobDetails3 -> {
            jobDetails3.getTrigger().nextFireTime();
        });
        ReactiveJobRepository reactiveJobRepository = this.jobRepository;
        Objects.requireNonNull(reactiveJobRepository);
        return peek.flatMapCompletionStage(reactiveJobRepository::save).flatMap(jobDetails4 -> {
            return (PublisherBuilder) Optional.ofNullable(jobDetails4.getTrigger()).filter(trigger -> {
                return Objects.nonNull(trigger.hasNextFireTime());
            }).map(trigger2 -> {
                return doJobScheduling(jobDetails4);
            }).orElseGet(() -> {
                return ReactiveStreams.of(jobWithStatus(jobDetails4, JobStatus.EXECUTED));
            });
        }).filter(jobDetails5 -> {
            return JobStatus.EXECUTED.equals(jobDetails5.getStatus());
        }).flatMap(jobDetails6 -> {
            return ReactiveStreams.fromCompletionStage(cancel(CompletableFuture.completedFuture(jobDetails6)));
        });
    }

    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler
    public PublisherBuilder<JobDetails> handleJobExecutionSuccess(JobExecutionResponse jobExecutionResponse) {
        PublisherBuilder map = ReactiveStreams.of(jobExecutionResponse).map((v0) -> {
            return v0.getJobId();
        });
        ReactiveJobRepository reactiveJobRepository = this.jobRepository;
        Objects.requireNonNull(reactiveJobRepository);
        return map.flatMapCompletionStage(reactiveJobRepository::get).flatMap(this::handleJobExecutionSuccess);
    }

    private boolean isExpired(ZonedDateTime zonedDateTime, int i) {
        return calculateDelay(zonedDateTime).plus(Duration.ofMillis(this.maxIntervalLimitToRetryMillis).minus(Duration.ofMillis(i * this.backoffRetryMillis))).isNegative();
    }

    private PublisherBuilder<JobDetails> handleExpirationTime(JobDetails jobDetails) {
        return ReactiveStreams.of(jobDetails).map((v0) -> {
            return v0.getTrigger();
        }).map((v0) -> {
            return v0.hasNextFireTime();
        }).map(DateUtil::fromDate).flatMapCompletionStage(zonedDateTime -> {
            return isExpired(zonedDateTime, jobDetails.getRetries().intValue()) ? handleExpiredJob(jobDetails) : CompletableFuture.completedFuture(jobDetails);
        });
    }

    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler
    public PublisherBuilder<JobDetails> handleJobExecutionError(JobExecutionResponse jobExecutionResponse) {
        return handleRetry(this.jobRepository.get(jobExecutionResponse.getJobId()));
    }

    private PublisherBuilder<JobDetails> handleRetry(CompletionStage<JobDetails> completionStage) {
        return ReactiveStreams.fromCompletionStage(completionStage).flatMap(jobDetails -> {
            PublisherBuilder map = handleExpirationTime(jobDetails).map((v0) -> {
                return v0.getStatus();
            }).filter(jobStatus -> {
                return !JobStatus.ERROR.equals(jobStatus);
            }).map(jobStatus2 -> {
                return scheduleRegistering(jobDetails, Optional.of(getRetryTrigger()));
            }).flatMap(publisherBuilder -> {
                return publisherBuilder;
            }).map(manageableJobHandle -> {
                return JobDetails.builder().of(jobWithStatusAndHandle(jobDetails, JobStatus.RETRY, manageableJobHandle)).incrementRetries().build();
            });
            ReactiveJobRepository reactiveJobRepository = this.jobRepository;
            Objects.requireNonNull(reactiveJobRepository);
            return map.map(reactiveJobRepository::save).flatMapCompletionStage(completionStage2 -> {
                return completionStage2;
            });
        }).peek(jobDetails2 -> {
            LOGGER.debug("Retry executed {}", jobDetails2);
        });
    }

    private PointInTimeTrigger getRetryTrigger() {
        return new PointInTimeTrigger(DateUtil.now().plus(this.backoffRetryMillis, (TemporalUnit) ChronoUnit.MILLIS).toInstant().toEpochMilli(), null, null);
    }

    private CompletionStage<JobDetails> handleExpiredJob(JobDetails jobDetails) {
        return (CompletionStage) Optional.of(jobWithStatus(jobDetails, JobStatus.ERROR)).map(jobDetails2 -> {
            return this.jobRepository.delete(jobDetails2).thenApply(jobDetails2 -> {
                unregisterScheduledJob(jobDetails2);
                LOGGER.warn("Retry limit exceeded for job{}", jobDetails2);
                return jobDetails2;
            });
        }).orElse(null);
    }

    private PublisherBuilder<ManageableJobHandle> scheduleRegistering(JobDetails jobDetails, Optional<Trigger> optional) {
        return doSchedule(jobDetails, optional).peek((Consumer<? super ManageableJobHandle>) registerScheduledJob(jobDetails));
    }

    private Consumer<JobHandle> registerScheduledJob(JobDetails jobDetails) {
        return jobHandle -> {
            this.schedulerControl.put(jobDetails.getId(), DateUtil.now());
        };
    }

    public abstract PublisherBuilder<ManageableJobHandle> doSchedule(JobDetails jobDetails, Optional<Trigger> optional);

    private ZonedDateTime unregisterScheduledJob(JobDetails jobDetails) {
        return this.schedulerControl.remove(jobDetails.getId());
    }

    public CompletionStage<JobDetails> cancel(CompletionStage<JobDetails> completionStage) {
        PublisherBuilder flatMap = ReactiveStreams.fromCompletionStageNullable(completionStage).peek(jobDetails -> {
            LOGGER.debug("Cancel Job Scheduling {}", jobDetails);
        }).flatMap(jobDetails2 -> {
            return (PublisherBuilder) Optional.ofNullable(jobDetails2.getScheduledId()).map(str -> {
                return ReactiveStreams.fromPublisher(doCancel(jobDetails2)).map(manageableJobHandle -> {
                    return jobDetails2;
                });
            }).orElse(ReactiveStreams.of(jobDetails2));
        });
        ReactiveJobRepository reactiveJobRepository = this.jobRepository;
        Objects.requireNonNull(reactiveJobRepository);
        return flatMap.flatMapCompletionStage(reactiveJobRepository::delete).peek(this::unregisterScheduledJob).findFirst().run().thenApply(optional -> {
            return (JobDetails) optional.orElse(null);
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler, org.kie.kogito.jobs.service.scheduler.JobScheduler
    public CompletionStage<JobDetails> cancel(String str) {
        return cancel(this.jobRepository.get(str).thenApply(jobDetails -> {
            return (JobDetails) Optional.ofNullable(jobDetails).map(jobDetails -> {
                return jobWithStatus(jobDetails, JobStatus.CANCELED);
            }).orElse(null);
        }));
    }

    public abstract Publisher<ManageableJobHandle> doCancel(JobDetails jobDetails);

    @Override // org.kie.kogito.jobs.service.scheduler.JobScheduler
    public Optional<ZonedDateTime> scheduled(String str) {
        return Optional.ofNullable(this.schedulerControl.get(str));
    }

    public void setForceExecuteExpiredJobs(boolean z) {
        this.forceExecuteExpiredJobs = Optional.of(Boolean.valueOf(z));
    }
}
