package org.kie.kogito.jobs.service.scheduler.impl;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.job.DelegateJob;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ManageableJobHandle;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler;
import org.kie.kogito.jobs.service.stream.AvailableStreams;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.kie.kogito.timer.Job;
import org.kie.kogito.timer.JobContext;
import org.kie.kogito.timer.Trigger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.class */
public class TimerDelegateJobScheduler extends BaseTimerJobScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(TimerDelegateJobScheduler.class);
    private JobExecutorResolver jobExecutorResolver;
    private VertxTimerServiceScheduler delegate;
    private JobStreams jobStreams;

    protected TimerDelegateJobScheduler() {
    }

    @Inject
    public TimerDelegateJobScheduler(ReactiveJobRepository reactiveJobRepository, @ConfigProperty(name = "kogito.jobs-service.backoffRetryMillis", defaultValue = "1000") long j, @ConfigProperty(name = "kogito.jobs-service.maxIntervalLimitToRetryMillis", defaultValue = "60000") long j2, @ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes", defaultValue = "10") long j3, @ConfigProperty(name = "kogito.jobs-service.forceExecuteExpiredJobs", defaultValue = "false") boolean z, JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler vertxTimerServiceScheduler, JobStreams jobStreams) {
        super(reactiveJobRepository, j, j2, j3, Boolean.valueOf(z));
        this.jobExecutorResolver = jobExecutorResolver;
        this.delegate = vertxTimerServiceScheduler;
        this.jobStreams = jobStreams;
    }

    @Override // org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler
    public PublisherBuilder<ManageableJobHandle> doSchedule(JobDetails jobDetails, Optional<Trigger> optional) {
        LOGGER.debug("Job Scheduling {}", jobDetails);
        return ReactiveStreams.of(jobDetails).map(jobDetails2 -> {
            return this.delegate.m15scheduleJob((Job) new DelegateJob(this.jobExecutorResolver, this.jobStreams), (JobContext) new JobDetailsContext(jobDetails2), (Trigger) optional.orElse(jobDetails2.getTrigger()));
        });
    }

    @Override // org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler
    public Publisher<ManageableJobHandle> doCancel(JobDetails jobDetails) {
        return ReactiveStreams.of(jobDetails).map((v0) -> {
            return v0.getScheduledId();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(str -> {
            ManageableJobHandle manageableJobHandle = new ManageableJobHandle(str);
            manageableJobHandle.setCancel(this.delegate.removeJob(manageableJobHandle));
            return manageableJobHandle;
        }).buildRs();
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Incoming(AvailableStreams.JOB_ERROR_EVENTS)
    public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse jobExecutionResponse) {
        LOGGER.warn("Error received {}", jobExecutionResponse);
        return ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionError, jobExecutionResponse).findFirst().run().thenApply((v0) -> {
            return v0.isPresent();
        }).exceptionally(th -> {
            LOGGER.error("Error handling error {}", jobExecutionResponse, th);
            return false;
        });
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
    public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse jobExecutionResponse) {
        LOGGER.debug("Success received to be processed {}", jobExecutionResponse);
        return ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionSuccess, jobExecutionResponse).findFirst().run().thenApply((v0) -> {
            return v0.isPresent();
        }).exceptionally(th -> {
            LOGGER.error("Error handling error {}", jobExecutionResponse, th);
            return false;
        });
    }
}
