package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.JobStatus;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.Scheduler;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.28.4.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl.class */
public class SchedulerImpl implements Scheduler {
    private static Logger logger = Logger.getLogger(SchedulerImpl.class);
    private Map<String, Func1<JobDetails, Completable>> jobFactories;
    private ScheduledExecutorService tickExecutor;
    private rx.Scheduler tickScheduler;
    private ExecutorService queryExecutor;
    private rx.Scheduler queryScheduler;
    private RxSession session;
    private PreparedStatement deleteScheduledJobs;
    private PreparedStatement findFinishedJobs;
    private PreparedStatement deleteFinishedJobs;
    private PreparedStatement updateJobToFinished;
    private PreparedStatement addActiveTimeSlice;
    private PreparedStatement findActiveTimeSlices;
    private PreparedStatement deleteActiveTimeSlice;
    private LockManager lockManager;
    private JobsService jobsService;
    private boolean running;
    static final String QUEUE_LOCK_PREFIX = "org.hawkular.metrics.scheduler.queue.";
    static final String SCHEDULING_LOCK = "scheduling";
    static final String TIME_SLICE_EXECUTION_LOCK = "executing";
    static final int SCHEDULING_LOCK_TIMEOUT_IN_SEC = 5;
    static final int JOB_EXECUTION_LOCK_TIMEOUT_IN_SEC = 1800;
    private Optional<PublishSubject<Date>> finishedTimeSlices;
    private Optional<PublishSubject<JobDetails>> jobFinished;
    private final Object lock;
    private String hostname;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.28.4.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$JobExecutionState.class */
    public static class JobExecutionState {
        final JobDetailsImpl currentDetails;
        final JobDetailsImpl nextDetails;
        final Set<UUID> activeJobs;
        final Date timeSlice;
        final Date nextTimeSlice;
        final Throwable error;

        public JobExecutionState(JobDetailsImpl jobDetailsImpl, Set<UUID> set) {
            this.currentDetails = jobDetailsImpl;
            this.activeJobs = set;
            this.timeSlice = new Date(jobDetailsImpl.getTrigger().getTriggerTime());
            this.nextDetails = null;
            this.nextTimeSlice = null;
            this.error = null;
        }

        public JobExecutionState(JobDetailsImpl jobDetailsImpl, Date date, Throwable th, Set<UUID> set) {
            this.currentDetails = jobDetailsImpl;
            this.activeJobs = set;
            this.timeSlice = date;
            this.nextDetails = null;
            this.nextTimeSlice = null;
            this.error = th;
        }

        public JobExecutionState(JobDetailsImpl jobDetailsImpl, Date date, JobDetailsImpl jobDetailsImpl2, Date date2, Set<UUID> set) {
            this.currentDetails = jobDetailsImpl;
            this.timeSlice = date;
            this.nextDetails = jobDetailsImpl2;
            this.nextTimeSlice = date2;
            this.activeJobs = set;
            this.error = null;
        }

        public boolean isRepeating() {
            return (this.nextDetails == null || this.nextTimeSlice == null) ? false : true;
        }

        public boolean isBehindSchedule() {
            return isRepeating() && this.nextTimeSlice != null && this.nextTimeSlice.getTime() > this.nextDetails.getTrigger().getTriggerTime();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.28.4.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$JobLock.class */
    public static class JobLock {
        final JobDetailsImpl jobDetails;
        final boolean acquired;
        final String name;

        public JobLock(JobDetailsImpl jobDetailsImpl, Lock lock) {
            this.jobDetails = jobDetailsImpl;
            this.acquired = lock.isLocked();
            this.name = "org.hawkular.metrics.scheduler.job." + jobDetailsImpl.getJobId();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.28.4.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$TimeSliceLock.class */
    public static class TimeSliceLock {
        private Date timeSlice;
        private String name;
        private boolean acquired;

        public TimeSliceLock(Date date, String str, boolean z) {
            this.timeSlice = date;
            this.name = str;
            this.acquired = z;
        }

        public Date getTimeSlice() {
            return this.timeSlice;
        }

        public String getName() {
            return this.name;
        }

        public boolean isAcquired() {
            return this.acquired;
        }
    }

    public SchedulerImpl(RxSession rxSession, String str) {
        this(rxSession, str, new JobsService(rxSession));
    }

    public SchedulerImpl(RxSession rxSession, String str, JobsService jobsService) {
        this.lock = new Object();
        this.session = rxSession;
        this.jobFactories = new HashMap();
        this.tickExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("ticker-pool-%d").build(), new ThreadPoolExecutor.DiscardPolicy());
        this.tickScheduler = Schedulers.from(this.tickExecutor);
        this.queryExecutor = new ThreadPoolExecutor(getQueryThreadPoolSize(), getQueryThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build(), new ThreadPoolExecutor.DiscardPolicy());
        this.queryScheduler = Schedulers.from(this.queryExecutor);
        this.lockManager = new LockManager(rxSession);
        this.jobsService = jobsService;
        this.deleteScheduledJobs = initQuery("DELETE FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.findFinishedJobs = initQuery("SELECT job_id FROM finished_jobs_idx WHERE time_slice = ?");
        this.deleteFinishedJobs = initQuery("DELETE FROM finished_jobs_idx WHERE time_slice = ?");
        this.updateJobToFinished = initQuery("INSERT INTO finished_jobs_idx (time_slice, job_id) VALUES (?, ?)");
        this.addActiveTimeSlice = initQuery("INSERT INTO active_time_slices (time_slice) VALUES (?)");
        this.findActiveTimeSlices = initQuery("SELECT DISTINCT time_slice FROM active_time_slices");
        this.deleteActiveTimeSlice = initQuery("DELETE FROM active_time_slices WHERE time_slice = ?");
        this.finishedTimeSlices = Optional.empty();
        this.jobFinished = Optional.empty();
        this.hostname = str;
    }

    private PreparedStatement initQuery(String str) {
        return this.session.getSession().prepare(str).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
    }

    private int getQueryThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
    }

    public void setTickScheduler(rx.Scheduler scheduler) {
        this.tickScheduler = scheduler;
    }

    public void setTimeSlicesSubject(PublishSubject<Date> publishSubject) {
        this.finishedTimeSlices = Optional.of(publishSubject);
    }

    public void setJobFinishedSubject(PublishSubject<JobDetails> publishSubject) {
        this.jobFinished = Optional.of(publishSubject);
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void register(String str, Func1<JobDetails, Completable> func1) {
        this.jobFactories.put(str, func1);
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public Single<? extends JobDetails> scheduleJob(String str, String str2, Map<String, String> map, Trigger trigger) {
        if (DateTimeService.now.get().getMillis() >= trigger.getTriggerTime()) {
            return Single.error(new RuntimeException("Trigger time has already passed"));
        }
        String str3 = QUEUE_LOCK_PREFIX + trigger.getTriggerTime();
        return this.lockManager.acquireLock(str3, SCHEDULING_LOCK, 5, false).map(lock -> {
            if (!lock.isLocked()) {
                throw new RuntimeException("Failed to acquire scheduling lock [" + str3 + "]");
            }
            return this.jobsService.createJobDetails(UUID.randomUUID(), str, str2, map, trigger, new Date(trigger.getTriggerTime()));
        }).flatMap(jobDetailsImpl -> {
            return this.jobsService.insert(new Date(trigger.getTriggerTime()), jobDetailsImpl).map(resultSet -> {
                return jobDetailsImpl;
            });
        }).toSingle().doOnError(th -> {
            logger.warn("Failed to schedule job " + str2, th);
        });
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public Completable unscheduleJobById(String str) {
        return this.jobsService.deleteJob(UUID.fromString(str), this.queryScheduler);
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public Completable unscheduleJobByTypeAndName(String str, String str2) {
        return Completable.merge((Observable<? extends Completable>) this.jobsService.findAllScheduledJobs(this.queryScheduler).filter(jobDetailsImpl -> {
            return Boolean.valueOf(jobDetailsImpl.getJobType().equals(str) && jobDetailsImpl.getJobName().equals(str2));
        }).map(jobDetailsImpl2 -> {
            return this.jobsService.deleteJob(jobDetailsImpl2.getJobId(), this.queryScheduler);
        }));
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void start() {
        this.running = true;
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        ConcurrentSkipListSet concurrentSkipListSet2 = new ConcurrentSkipListSet();
        doOnTick(() -> {
            logger.debugf("Activating scheduler for [%s]", DateTimeService.currentMinute().toDate());
            Observable.just(DateTimeService.currentMinute().toDate()).concatMap(date -> {
                return this.jobsService.findActiveTimeSlices(date, this.queryScheduler);
            }).filter(date2 -> {
                synchronized (this.lock) {
                    if (concurrentSkipListSet.contains(date2)) {
                        return false;
                    }
                    concurrentSkipListSet.add(date2);
                    return true;
                }
            }).concatMap(this::acquireTimeSliceLock).concatMap(timeSliceLock -> {
                return findScheduledJobs(timeSliceLock.getTimeSlice()).doOnError(th -> {
                    logger.warnf("Failed to find schedule jobs for time slice %s", timeSliceLock.timeSlice);
                }).doOnNext(set -> {
                    logger.debugf("[%s] scheduled jobs: %s", timeSliceLock.timeSlice, set);
                }).flatMap(set2 -> {
                    return computeRemainingJobs(set2, timeSliceLock.getTimeSlice(), concurrentSkipListSet2);
                }).doOnNext(set3 -> {
                    logger.debugf("[%s] remaining jobs: %s", timeSliceLock.timeSlice, set3);
                }).flatMap((v0) -> {
                    return Observable.from(v0);
                }).filter(jobDetailsImpl -> {
                    return Boolean.valueOf(!concurrentSkipListSet2.contains(jobDetailsImpl.getJobId()));
                }).flatMap(this::acquireJobLock).filter(jobLock -> {
                    return Boolean.valueOf(jobLock.acquired);
                }).map(jobLock2 -> {
                    return jobLock2.jobDetails;
                }).doOnNext(jobDetailsImpl2 -> {
                    logger.debugf("Acquired job lock for %s in time slice %s", jobDetailsImpl2, timeSliceLock.timeSlice);
                    concurrentSkipListSet2.add(jobDetailsImpl2.getJobId());
                }).observeOn(Schedulers.io()).flatMap(jobDetailsImpl3 -> {
                    return executeJob(jobDetailsImpl3, timeSliceLock.timeSlice, concurrentSkipListSet2).doOnTerminate(() -> {
                        concurrentSkipListSet2.remove(jobDetailsImpl3.getJobId());
                    }).toObservable().map(obj -> {
                        return timeSliceLock.getTimeSlice();
                    });
                }).defaultIfEmpty(timeSliceLock.getTimeSlice());
            }).observeOn(this.queryScheduler).concatMap(date3 -> {
                return Observable.sequenceEqual(this.jobsService.findScheduledJobsForTime(date3, this.queryScheduler).map((v0) -> {
                    return v0.getJobId();
                }).collect(HashSet::new, (v0, v1) -> {
                    v0.add(v1);
                }), findFinishedJobs(date3)).flatMap(bool -> {
                    if (!bool.booleanValue()) {
                        return Observable.just(date3);
                    }
                    logger.debugf("All jobs for time slice [%s] have finished", date3);
                    return deleteFinishedJobs(date3).mergeWith(deleteScheduledJobs(date3)).toObservable().reduce(null, (obj, obj2) -> {
                        return obj2;
                    }).map(obj3 -> {
                        return date3;
                    });
                });
            }).subscribe(date4 -> {
                logger.debugf("Finished post job execution clean up for [%s]", date4);
                concurrentSkipListSet.remove(date4);
                this.finishedTimeSlices.ifPresent(publishSubject -> {
                    publishSubject.onNext(date4);
                });
            }, th -> {
                logger.warn("Job execution failed", th);
                synchronized (this.lock) {
                    concurrentSkipListSet.clear();
                }
            }, () -> {
                logger.debug("Done!");
            });
        });
    }

    private Observable<TimeSliceLock> acquireTimeSliceLock(Date date) {
        String str = QUEUE_LOCK_PREFIX + date.getTime();
        int i = 5;
        return Observable.create(subscriber -> {
            Observable<R> map = this.lockManager.acquireLock(str, TIME_SLICE_EXECUTION_LOCK, JOB_EXECUTION_LOCK_TIMEOUT_IN_SEC, false).map(lock -> {
                if (lock.isLocked()) {
                    return new TimeSliceLock(date, str, lock.isLocked());
                }
                logger.debugf("Failed to acquire time slice lock for [%s]. Will attempt to acquire it again in %d seconds", date, Integer.valueOf(i));
                throw new RuntimeException();
            });
            subscriber.getClass();
            Action1 action1 = (v1) -> {
                r1.onNext(v1);
            };
            subscriber.getClass();
            Action1<Throwable> action12 = subscriber::onError;
            subscriber.getClass();
            map.subscribe(action1, action12, subscriber::onCompleted);
        }).retryWhen(observable -> {
            return observable.flatMap(th -> {
                return Observable.timer(i, TimeUnit.SECONDS, this.queryScheduler);
            });
        });
    }

    private Observable<JobLock> acquireJobLock(JobDetailsImpl jobDetailsImpl) {
        return this.lockManager.acquireLock("org.hawkular.metrics.scheduler.job." + jobDetailsImpl.getJobId(), this.hostname, calculateTimeout(jobDetailsImpl.getTrigger()), true).map(lock -> {
            return new JobLock(jobDetailsImpl, lock);
        }).doOnNext(jobLock -> {
            logger.debugf("Acquired lock for %s? %s", jobDetailsImpl.getJobName(), Boolean.valueOf(jobLock.acquired));
        });
    }

    private int calculateTimeout(Trigger trigger) {
        if (!(trigger instanceof RepeatingTrigger)) {
            return JOB_EXECUTION_LOCK_TIMEOUT_IN_SEC;
        }
        int interval = (int) (((RepeatingTrigger) trigger).getInterval() / 1000);
        return (int) (interval + (interval * 0.25d));
    }

    private Completable executeJob(JobDetailsImpl jobDetailsImpl, Date date, Set<UUID> set) {
        Completable doPostJobExecution;
        logger.debugf("Starting execution for %s in time slice [%s]", jobDetailsImpl, date);
        Stopwatch createStarted = Stopwatch.createStarted();
        Func1<JobDetails, Completable> func1 = this.jobFactories.get(jobDetailsImpl.getJobType());
        if (func1 == null) {
            doPostJobExecution = Completable.error(new UnregisteredJobException(jobDetailsImpl, date));
        } else if (jobDetailsImpl.getStatus() == JobStatus.FINISHED) {
            doPostJobExecution = Completable.merge((Observable<? extends Completable>) this.jobsService.findScheduledExecutions(jobDetailsImpl.getJobId(), this.queryScheduler).filter(scheduledExecution -> {
                return Boolean.valueOf(scheduledExecution.getJobDetails().getStatus() == JobStatus.NONE && scheduledExecution.getJobDetails().getTrigger().getTriggerTime() > jobDetailsImpl.getTrigger().getTriggerTime());
            }).isEmpty().map(bool -> {
                return bool.booleanValue() ? doPostJobExecution(Completable.complete(), jobDetailsImpl, date, set) : doPostJobExecutionWithoutRescheduling(Completable.complete(), jobDetailsImpl, date, set);
            }));
        } else {
            this.jobsService.prepareJobDetailsForExecution(jobDetailsImpl, date);
            doPostJobExecution = doPostJobExecution(func1.call(jobDetailsImpl).doOnTerminate(() -> {
                this.jobsService.resetJobDetails(jobDetailsImpl);
            }).doOnCompleted(() -> {
                createStarted.stop();
                if (logger.isDebugEnabled()) {
                    logger.debugf("Finished executing %s in time slice [%s] in %s ms", jobDetailsImpl, date, Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
                }
            }), jobDetailsImpl, date, set);
        }
        return doPostJobExecution.subscribeOn(Schedulers.io());
    }

    private Completable doPostJobExecution(Completable completable, JobDetailsImpl jobDetailsImpl, Date date, Set<UUID> set) {
        return completable.toSingle(() -> {
            return new JobExecutionState(jobDetailsImpl, date, null, null, set);
        }).onErrorReturn(th -> {
            logger.warnf(th, "Job execution of %s for time slice %d failed", jobDetailsImpl, Long.valueOf(date.getTime()));
            return new JobExecutionState(jobDetailsImpl, date, th, set);
        }).flatMap(jobExecutionState -> {
            return jobExecutionState.error == null ? this.jobsService.updateStatusToFinished(date, jobExecutionState.currentDetails.getJobId()).toSingle().map(resultSet -> {
                return jobExecutionState;
            }) : Single.just(jobExecutionState);
        }).flatMap(jobExecutionState2 -> {
            return jobExecutionState2.error == null ? reschedule(jobExecutionState2) : Single.just(jobExecutionState2);
        }).flatMap(jobExecutionState3 -> {
            return jobExecutionState3.error == null ? releaseJobExecutionLock(jobExecutionState3).flatMap(this::setJobFinished) : Single.just(jobExecutionState3);
        }).doOnError(th2 -> {
            logger.warn("There was an error during post-job execution", th2);
            publishJobFinished(jobDetailsImpl);
        }).doAfterTerminate(() -> {
            publishJobFinished(jobDetailsImpl);
        }).toCompletable();
    }

    private Completable doPostJobExecutionWithoutRescheduling(Completable completable, JobDetailsImpl jobDetailsImpl, Date date, Set<UUID> set) {
        return completable.toSingle(() -> {
            return new JobExecutionState(jobDetailsImpl, set);
        }).flatMap(this::releaseJobExecutionLock).flatMap(this::setJobFinished).doOnError(th -> {
            logger.debug("There was an error during post-job execution, but the job has already been rescheduled.", th);
            publishJobFinished(jobDetailsImpl);
        }).doOnSuccess(jobExecutionState -> {
            publishJobFinished(jobExecutionState.currentDetails);
        }).toCompletable();
    }

    private void publishJobFinished(JobDetails jobDetails) {
        this.jobFinished.ifPresent(publishSubject -> {
            publishSubject.onNext(jobDetails);
        });
    }

    private Single<JobExecutionState> setJobFinished(JobExecutionState jobExecutionState) {
        return this.session.execute((Statement) this.updateJobToFinished.bind(new Object[]{jobExecutionState.timeSlice, jobExecutionState.currentDetails.getJobId()}), this.queryScheduler).toSingle().map(resultSet -> {
            return jobExecutionState;
        }).doOnError(th -> {
            logger.warnf(th, "There was an error while updated the finished jobs index for %s", jobExecutionState.currentDetails);
        });
    }

    private Single<JobExecutionState> reschedule(JobExecutionState jobExecutionState) {
        Trigger nextTrigger = jobExecutionState.currentDetails.getTrigger().nextTrigger();
        if (nextTrigger == null) {
            logger.debugf("No more scheduled executions for %s", jobExecutionState.currentDetails);
            return Single.just(jobExecutionState);
        }
        JobDetailsImpl jobDetailsImpl = jobExecutionState.currentDetails;
        JobDetailsImpl jobDetailsImpl2 = new JobDetailsImpl(jobDetailsImpl, nextTrigger);
        if (nextTrigger.getTriggerTime() <= DateTimeService.now.get().getMillis()) {
            logger.infof("%s missed its next execution at %d. It will be rescheduled for immediate execution.", jobDetailsImpl, Long.valueOf(nextTrigger.getTriggerTime()));
            AtomicLong atomicLong = new AtomicLong(DateTimeService.currentMinute().getMillis());
            return Observable.defer(() -> {
                return this.lockManager.acquireLock(QUEUE_LOCK_PREFIX + atomicLong.addAndGet(60000L), SCHEDULING_LOCK, 5, false);
            }).map(lock -> {
                if (lock.isLocked()) {
                    return lock;
                }
                throw new RuntimeException();
            }).retry().map(lock2 -> {
                return new JobExecutionState(jobExecutionState.currentDetails, jobExecutionState.timeSlice, jobDetailsImpl2, new Date(atomicLong.get()), jobExecutionState.activeJobs);
            }).flatMap(jobExecutionState2 -> {
                return this.jobsService.insert(jobExecutionState2.nextTimeSlice, jobExecutionState2.nextDetails).map(resultSet -> {
                    return jobExecutionState2;
                });
            }).doOnNext(jobExecutionState3 -> {
                logger.debugf("Rescheduled %s to execute in time slice %s with trigger time of %s", jobExecutionState3.nextDetails.getJobName(), jobExecutionState3.nextTimeSlice, new Date(jobExecutionState3.nextDetails.getTrigger().getTriggerTime()));
            }).toSingle();
        }
        logger.debugf("Scheduling %s for next execution at %s", jobDetailsImpl2, new Date(nextTrigger.getTriggerTime()));
        JobExecutionState jobExecutionState4 = new JobExecutionState(jobDetailsImpl, jobExecutionState.timeSlice, jobDetailsImpl2, new Date(nextTrigger.getTriggerTime()), jobExecutionState.activeJobs);
        return this.jobsService.insert(jobExecutionState4.nextTimeSlice, jobExecutionState4.nextDetails).map(resultSet -> {
            return jobExecutionState4;
        }).toSingle();
    }

    private Single<JobExecutionState> releaseJobExecutionLock(JobExecutionState jobExecutionState) {
        String str = "org.hawkular.metrics.scheduler.job." + jobExecutionState.currentDetails.getJobId();
        return this.lockManager.releaseLock(str, this.hostname).map(bool -> {
            if (!bool.booleanValue()) {
                logger.warnf("Failed to release job lock for %s", jobExecutionState.currentDetails);
            }
            return jobExecutionState;
        }).toSingle().doOnError(th -> {
            logger.warnf(th, "There was an error trying to release job lock [%s] for %s", str, jobExecutionState.currentDetails);
        });
    }

    private Completable deleteScheduledJobs(Date date) {
        return this.session.execute((Statement) this.deleteScheduledJobs.bind(new Object[]{date}), this.queryScheduler).doOnCompleted(() -> {
            logger.debugf("Deleted scheduled jobs time slice [%s]", date);
        }).toCompletable();
    }

    private Completable deleteFinishedJobs(Date date) {
        return this.session.execute((Statement) this.deleteFinishedJobs.bind(new Object[]{date}), this.queryScheduler).doOnCompleted(() -> {
            logger.debugf("Deleted finished jobs time slice [%s]", date);
        }).toCompletable();
    }

    private Completable deleteActiveTimeSlice(Date date) {
        return this.session.execute((Statement) this.deleteActiveTimeSlice.bind(new Object[]{date}), this.queryScheduler).toCompletable();
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void shutdown() {
        try {
            this.running = false;
            this.tickExecutor.shutdown();
            this.tickExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.queryExecutor.shutdown();
            this.queryExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            this.lockManager.shutdown();
            logger.info("Shutdown complete");
        } catch (InterruptedException e) {
            logger.warn("Interrupted during shutdown", e);
        }
    }

    private Observable<? extends Set<JobDetailsImpl>> findScheduledJobs(Date date) {
        logger.debugf("Fetching scheduled jobs for [%s]", date);
        return this.jobsService.findScheduledJobsForTime(date, this.queryScheduler).collect(HashSet::new, (v0, v1) -> {
            v0.add(v1);
        });
    }

    private Observable<Set<JobDetailsImpl>> computeRemainingJobs(Set<JobDetailsImpl> set, Date date, Set<UUID> set2) {
        return findFinishedJobs(date).map(set3 -> {
            HashSet hashSet = new HashSet(set2);
            hashSet.removeAll(set3);
            return (Set) ((Set) set.stream().filter(jobDetailsImpl -> {
                return !set3.contains(jobDetailsImpl.getJobId());
            }).collect(Collectors.toSet())).stream().filter(jobDetailsImpl2 -> {
                return !hashSet.contains(jobDetailsImpl2.getJobId());
            }).collect(Collectors.toSet());
        });
    }

    private Observable<? extends Set<UUID>> findFinishedJobs(Date date) {
        return this.session.execute((Statement) this.findFinishedJobs.bind(new Object[]{date}), this.queryScheduler).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return row.getUUID(0);
        }).collect(HashSet::new, (v0, v1) -> {
            v0.add(v1);
        });
    }

    private void doOnTick(Action0 action0) {
        Action0 action02 = () -> {
            DateTimeService.getTimeSlice(new DateTime(this.tickScheduler.now()), Minutes.minutes(1).toStandardDuration()).toDate();
            action0.call();
        };
        AtomicReference atomicReference = new AtomicReference();
        Observable.interval(0L, 1L, TimeUnit.MINUTES, this.tickScheduler).filter(l -> {
            DateTime currentMinute = DateTimeService.currentMinute();
            if (atomicReference.get() == null) {
                atomicReference.set(currentMinute);
                return true;
            }
            if (((DateTime) atomicReference.get()).equals(currentMinute)) {
                return false;
            }
            atomicReference.set(currentMinute);
            return true;
        }).takeUntil(l2 -> {
            return Boolean.valueOf(!this.running);
        }).subscribe(l3 -> {
            action02.call();
        }, th -> {
            logger.warn(th);
        });
    }

    private Observable<Void> updateActiveTimeSlices(Date date) {
        return this.session.execute((Statement) this.addActiveTimeSlice.bind(new Object[]{date})).map(resultSet -> {
            return null;
        });
    }

    private Observable<Date> findTimeSlices() {
        return this.session.execute((Statement) this.findActiveTimeSlices.bind(), this.queryScheduler).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return row.getTimestamp(0);
        }).toSortedList().flatMap((v0) -> {
            return Observable.from(v0);
        });
    }
}
