/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.JobStatus;
import org.hawkular.metrics.scheduler.api.RetryPolicy;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.metrics.scheduler.impl.JobsService;
import org.hawkular.metrics.scheduler.impl.LockManager;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Minutes;
import rx.Completable;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class SchedulerImpl
implements org.hawkular.metrics.scheduler.api.Scheduler {
    private static Logger logger = Logger.getLogger(SchedulerImpl.class);
    private Map<String, Func1<JobDetails, Completable>> jobFactories;
    private Map<String, Func2<JobDetails, Throwable, RetryPolicy>> retryFunctions;
    private ScheduledExecutorService tickExecutor;
    private Scheduler tickScheduler;
    private ExecutorService queryExecutor;
    private Scheduler queryScheduler;
    private RxSession session;
    private PreparedStatement insertJob;
    private PreparedStatement insertScheduledJob;
    private PreparedStatement deleteScheduleJob;
    private PreparedStatement findScheduledJobs;
    private PreparedStatement deleteScheduledJobs;
    private PreparedStatement deleteScheduledJob;
    private PreparedStatement findFinishedJobs;
    private PreparedStatement deleteFinishedJobs;
    private PreparedStatement updateJobToFinished;
    private PreparedStatement findJob;
    private PreparedStatement findAllJobs;
    private PreparedStatement addActiveTimeSlice;
    private PreparedStatement findActiveTimeSlices;
    private PreparedStatement deleteActiveTimeSlice;
    private LockManager lockManager;
    private JobsService jobsService;
    private boolean running;
    private AtomicInteger ticks = new AtomicInteger();
    private static Func2<JobDetails, Throwable, RetryPolicy> NO_RETRY = (details, throwable) -> RetryPolicy.NONE;
    static final String QUEUE_LOCK_PREFIX = "org.hawkular.metrics.scheduler.queue.";
    static final String SCHEDULING_LOCK = "scheduling";
    static final String TIME_SLICE_EXECUTION_LOCK = "executing";
    static final String JOB_EXECUTION_LOCK = "locked";
    static final int SCHEDULING_LOCK_TIMEOUT_IN_SEC = 5;
    static final int JOB_EXECUTION_LOCK_TIMEOUT_IN_SEC = 3600;
    private Optional<PublishSubject<Date>> finishedTimeSlices;
    private Optional<PublishSubject<JobDetails>> jobFinished;
    private final Object lock = new Object();

    public SchedulerImpl(RxSession session) {
        this.session = session;
        this.jobFactories = new HashMap<String, Func1<JobDetails, Completable>>();
        this.retryFunctions = new HashMap<String, Func2<JobDetails, Throwable, RetryPolicy>>();
        this.tickExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("ticker-pool-%d").build(), new ThreadPoolExecutor.DiscardPolicy());
        this.tickScheduler = Schedulers.from((Executor)this.tickExecutor);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build();
        this.queryExecutor = new ThreadPoolExecutor(this.getQueryThreadPoolSize(), this.getQueryThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.DiscardPolicy());
        this.queryScheduler = Schedulers.from((Executor)this.queryExecutor);
        this.lockManager = new LockManager(session);
        this.jobsService = new JobsService(session);
        this.insertJob = this.initQuery("INSERT INTO jobs (id, type, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
        this.insertScheduledJob = this.initQuery("INSERT INTO scheduled_jobs_idx (time_slice, job_id) VALUES (?, ?)");
        this.findScheduledJobs = this.initQuery("SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.deleteScheduledJobs = this.initQuery("DELETE FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.deleteScheduledJob = this.initQuery("DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
        this.findFinishedJobs = this.initQuery("SELECT job_id FROM finished_jobs_idx WHERE time_slice = ?");
        this.deleteFinishedJobs = this.initQuery("DELETE FROM finished_jobs_idx WHERE time_slice = ?");
        this.updateJobToFinished = this.initQuery("INSERT INTO finished_jobs_idx (time_slice, job_id) VALUES (?, ?)");
        this.findJob = this.initQuery("SELECT type, name, params, trigger FROM jobs WHERE id = ?");
        this.findAllJobs = this.initQuery("SELECT id, type, name, params, trigger FROM jobs");
        this.addActiveTimeSlice = this.initQuery("INSERT INTO active_time_slices (time_slice) VALUES (?)");
        this.findActiveTimeSlices = this.initQuery("SELECT DISTINCT time_slice FROM active_time_slices");
        this.deleteActiveTimeSlice = this.initQuery("DELETE FROM active_time_slices WHERE time_slice = ?");
        this.finishedTimeSlices = Optional.empty();
        this.jobFinished = Optional.empty();
    }

    private PreparedStatement initQuery(String cql) {
        return this.session.getSession().prepare(cql).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
    }

    private int getQueryThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
    }

    public void setTickScheduler(Scheduler scheduler) {
        this.tickScheduler = scheduler;
    }

    public void setTimeSlicesSubject(PublishSubject<Date> timeSlicesSubject) {
        this.finishedTimeSlices = Optional.of(timeSlicesSubject);
    }

    public void setJobFinishedSubject(PublishSubject<JobDetails> subject) {
        this.jobFinished = Optional.of(subject);
    }

    @Override
    public void register(String jobType, Func1<JobDetails, Completable> jobProducer) {
        this.jobFactories.put(jobType, jobProducer);
    }

    @Override
    public void register(String jobType, Func1<JobDetails, Completable> jobProducer, Func2<JobDetails, Throwable, RetryPolicy> retryFunction) {
        this.jobFactories.put(jobType, jobProducer);
        this.retryFunctions.put(jobType, retryFunction);
    }

    @Override
    public Single<JobDetails> scheduleJob(String type, String name, Map<String, String> parameter, Trigger trigger) {
        if (((DateTime)DateTimeService.now.get()).getMillis() >= trigger.getTriggerTime()) {
            return Single.error((Throwable)new RuntimeException("Trigger time has already passed"));
        }
        String lockName = QUEUE_LOCK_PREFIX + trigger.getTriggerTime();
        return this.lockManager.acquireSharedLock(lockName, SCHEDULING_LOCK, 5).map(acquired -> {
            if (acquired.booleanValue()) {
                UUID jobId = UUID.randomUUID();
                return new JobDetails(jobId, type, name, parameter, trigger);
            }
            throw new RuntimeException("Failed to acquire scheduling lock [" + lockName + "]");
        }).flatMap(details -> this.jobsService.insert(new Date(trigger.getTriggerTime()), (JobDetails)details).map(resultSet -> details)).toSingle();
    }

    @Override
    public void start() {
        this.running = true;
        ConcurrentSkipListSet activeTimeSlices = new ConcurrentSkipListSet();
        ConcurrentSkipListSet activeJobs = new ConcurrentSkipListSet();
        this.doOnTick(() -> {
            logger.debug((Object)("Activating scheduler for [" + DateTimeService.currentMinute().toDate() + "]"));
            Observable.just((Object)DateTimeService.currentMinute().toDate()).flatMap(time -> this.jobsService.findActiveTimeSlices((Date)time, this.queryScheduler)).filter(d -> {
                Object object = this.lock;
                synchronized (object) {
                    if (!activeTimeSlices.contains(d)) {
                        activeTimeSlices.add(d);
                        return true;
                    }
                    return false;
                }
            }).doOnNext(d -> logger.debug((Object)("Running job scheduler for [" + d + "]"))).flatMap(this::acquireTimeSliceLock).flatMap(timeSliceLock -> this.findScheduledJobs(timeSliceLock.getTimeSlice()).doOnError(t -> logger.warn((Object)("Failed to find scheduled jobs for time slice " + ((TimeSliceLock)timeSliceLock).timeSlice))).doOnNext(jobs -> logger.debug((Object)("[" + ((TimeSliceLock)timeSliceLock).timeSlice + "] scheduled jobs: " + jobs))).flatMap(scheduledJobs -> this.computeRemainingJobs((Set<JobDetails>)scheduledJobs, timeSliceLock.getTimeSlice(), activeJobs)).doOnNext(jobs -> logger.debug((Object)("[" + ((TimeSliceLock)timeSliceLock).timeSlice + "] remaining jobs: " + jobs))).flatMap(Observable::from).filter(jobDetails -> !activeJobs.contains(jobDetails.getJobId())).flatMap(this::acquireJobLock).filter(jobLock -> jobLock.acquired).map(jobLock -> jobLock.jobDetails).doOnNext(details -> logger.debug((Object)("Acquired job lock for " + details + " in time slice " + ((TimeSliceLock)timeSliceLock).timeSlice))).flatMap(details -> this.executeJob((JobDetails)details, ((TimeSliceLock)timeSliceLock).timeSlice, activeJobs).toObservable().map(o -> timeSliceLock.getTimeSlice())).defaultIfEmpty((Object)timeSliceLock.getTimeSlice())).flatMap(time -> {
                Observable scheduled = this.jobsService.findScheduledJobsForTime((Date)time, this.queryScheduler).map(JobDetails::getJobId).collect(HashSet::new, HashSet::add);
                Observable<? extends Set<UUID>> finished = this.findFinishedJobs((Date)time);
                return Observable.sequenceEqual((Observable)scheduled, finished).flatMap(allFinished -> {
                    if (allFinished.booleanValue()) {
                        logger.debug((Object)("All jobs for time slice [" + time + "] have finished"));
                        return this.deleteFinishedJobs((Date)time).mergeWith(this.deleteScheduledJobs((Date)time)).toObservable().reduce(null, (o1, o2) -> o2).map(o -> time);
                    }
                    return Observable.just((Object)time);
                });
            }).subscribe(d -> {
                logger.debug((Object)("Finished post job execution clean up for [" + d + "]"));
                activeTimeSlices.remove(d);
                this.finishedTimeSlices.ifPresent(subject -> subject.onNext(d));
            }, t -> {
                logger.warn((Object)"Job execution failed", t);
                Object object = this.lock;
                synchronized (object) {
                    activeTimeSlices.clear();
                }
            }, () -> logger.debug((Object)"Done!"));
        });
    }

    private Observable<TimeSliceLock> acquireTimeSliceLock(Date timeSlice) {
        String lockName = QUEUE_LOCK_PREFIX + timeSlice.getTime();
        int delay = 5;
        Observable observable = Observable.create(subscriber -> this.lockManager.acquireSharedLock(lockName, TIME_SLICE_EXECUTION_LOCK, 3600).map(acquired -> {
            if (!acquired.booleanValue()) {
                logger.debug((Object)("Failed to acquire time slice lock for [" + timeSlice + "]. Will attempt to acquire it again in " + delay + " seconds."));
                throw new RuntimeException();
            }
            return new TimeSliceLock(timeSlice, lockName, (boolean)acquired);
        }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0), () -> ((Subscriber)subscriber).onCompleted()));
        return observable.retryWhen(errors -> errors.flatMap(e -> Observable.timer((long)delay, (TimeUnit)TimeUnit.SECONDS, (Scheduler)this.queryScheduler)));
    }

    private Observable<JobLock> acquireJobLock(JobDetails jobDetails) {
        String jobLock = "org.hawkular.metrics.scheduler.job." + jobDetails.getJobId();
        return this.lockManager.acquireExclusiveLock(jobLock, JOB_EXECUTION_LOCK, 3600).map(acquired -> new JobLock(jobDetails, (boolean)acquired));
    }

    private Completable executeJob(final JobDetails details, Date timeSlice, Set<UUID> activeJobs) {
        logger.debug((Object)("Starting execution for " + details + " in time slice [" + timeSlice + "]"));
        Stopwatch stopwatch = Stopwatch.createStarted();
        Func1<JobDetails, Completable> factory = this.jobFactories.get(details.getJobType());
        Completable job = details.getStatus() == JobStatus.FINISHED ? Completable.complete() : (Completable)factory.call((Object)details);
        return job.onErrorResumeNext(t -> {
            logger.info((Object)("Execution of " + details + " in time slice [" + timeSlice + "] failed"), t);
            final RetryPolicy retryPolicy = (RetryPolicy)this.retryFunctions.getOrDefault(details.getJobType(), NO_RETRY).call((Object)details, t);
            if (retryPolicy == RetryPolicy.NONE) {
                return Completable.complete();
            }
            if (details.getTrigger().nextTrigger() != null) {
                logger.warn((Object)("Retry policies cannot be used with jobs that repeat. " + details + " will execute again according to its next trigger."));
                return Completable.complete();
            }
            if (retryPolicy == RetryPolicy.NOW) {
                return (Completable)factory.call((Object)details);
            }
            Trigger newTrigger = new Trigger(){

                @Override
                public long getTriggerTime() {
                    return details.getTrigger().getTriggerTime();
                }

                @Override
                public Trigger nextTrigger() {
                    return new SingleExecutionTrigger.Builder().withDelay(retryPolicy.getDelay(), TimeUnit.MILLISECONDS).build();
                }
            };
            JobDetails newDetails = new JobDetails(details.getJobId(), details.getJobType(), details.getJobName(), details.getParameters(), newTrigger);
            return this.reschedule(new JobExecutionState(newDetails, activeJobs)).toCompletable();
        }).doOnCompleted(() -> {
            stopwatch.stop();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Finished executing " + details + " in time slice [" + timeSlice + "] " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"));
            }
        }).toSingle(() -> new JobExecutionState(details, activeJobs)).flatMap(state -> this.jobsService.updateStatusToFinished(timeSlice, state.currentDetails.getJobId()).toSingle().map(resultSet -> state)).flatMap(this::reschedule).flatMap(state -> {
            if (state.isBehindSchedule()) {
                return this.setJobFinished((JobExecutionState)state).flatMap(this::scheduleImmediateExecutionIfNecessary);
            }
            return this.releaseJobExecutionLock((JobExecutionState)state).flatMap(this::deactivate).flatMap(this::setJobFinished);
        }).doOnError(t -> {
            logger.debug((Object)("There was an error during post-job execution. Making sure " + details + " is removed from active jobs cache"));
            activeJobs.remove(details.getJobId());
            this.publishJobFinished(details);
        }).doOnSuccess(states -> this.publishJobFinished(states.currentDetails)).toCompletable().subscribeOn(Schedulers.io());
    }

    private void publishJobFinished(JobDetails details) {
        this.jobFinished.ifPresent(subject -> subject.onNext((Object)details));
    }

    private Single<JobExecutionState> setJobFinished(JobExecutionState state) {
        return this.session.execute((Statement)this.updateJobToFinished.bind(new Object[]{state.timeSlice, state.currentDetails.getJobId()}), this.queryScheduler).toSingle().map(resultSet -> state).doOnError(t -> logger.warn((Object)("There was an error while updating the finished jobs index for " + state.currentDetails), t));
    }

    private Single<JobExecutionState> reschedule(JobExecutionState executionState) {
        Trigger nextTrigger = executionState.currentDetails.getTrigger().nextTrigger();
        if (nextTrigger == null) {
            logger.debug((Object)("No more scheduled executions for " + executionState.currentDetails));
            return Single.just((Object)executionState);
        }
        JobDetails details = executionState.currentDetails;
        JobDetails newDetails = new JobDetails(details.getJobId(), details.getJobType(), details.getJobName(), details.getParameters(), nextTrigger);
        if (nextTrigger.getTriggerTime() <= ((DateTime)DateTimeService.now.get()).getMillis()) {
            logger.info((Object)(details + " missed its next execution at " + nextTrigger.getTriggerTime() + ". It will be rescheduled for immediate execution"));
            AtomicLong nextTimeSlice = new AtomicLong(DateTimeService.currentMinute().getMillis());
            Observable scheduled = Observable.defer(() -> this.lockManager.acquireSharedLock(QUEUE_LOCK_PREFIX + nextTimeSlice.addAndGet(60000L), SCHEDULING_LOCK, 5)).map(acquired -> {
                if (!acquired.booleanValue()) {
                    throw new RuntimeException();
                }
                return acquired;
            }).retry();
            return scheduled.map(acquired -> new JobExecutionState(executionState.currentDetails, executionState.timeSlice, newDetails, new Date(nextTimeSlice.get()), executionState.activeJobs)).flatMap(state -> this.jobsService.insert(state.nextTimeSlice, state.nextDetails).map(updated -> state)).toSingle();
        }
        logger.debug((Object)("Scheduling " + newDetails + " for next execution at " + new Date(nextTrigger.getTriggerTime())));
        JobExecutionState newState = new JobExecutionState(details, executionState.timeSlice, newDetails, new Date(nextTrigger.getTriggerTime()), executionState.activeJobs);
        return this.jobsService.insert(newState.nextTimeSlice, newState.nextDetails).map(updated -> newState).toSingle();
    }

    private Single<JobExecutionState> scheduleImmediateExecutionIfNecessary(JobExecutionState state) {
        Scheduler.Worker worker = Schedulers.io().createWorker();
        worker.schedule(() -> {
            logger.debug((Object)("Starting immediate execution of " + state.nextDetails));
            String jobLock = "org.hawkular.metrics.scheduler.job." + state.nextDetails.getJobId();
            this.lockManager.renewLock(jobLock, JOB_EXECUTION_LOCK, 3600).map(renewed -> {
                if (!renewed.booleanValue()) {
                    throw new RuntimeException("Failed to renew job lock for " + state.nextDetails);
                }
                return renewed;
            }).toCompletable().concatWith(this.executeJob(state.nextDetails, state.nextTimeSlice, state.activeJobs)).subscribe(() -> logger.debug((Object)("Finished executing " + state.nextDetails)), t -> logger.warn((Object)("There was an error executing " + state.nextDetails)));
        });
        return Single.just((Object)state);
    }

    private Single<JobExecutionState> deactivate(JobExecutionState state) {
        logger.debug((Object)("Removing " + state.currentDetails + " from active jobs " + state.activeJobs));
        state.activeJobs.remove(state.currentDetails.getJobId());
        return Single.just((Object)state);
    }

    private Single<JobExecutionState> releaseJobExecutionLock(JobExecutionState state) {
        String jobLock = "org.hawkular.metrics.scheduler.job." + state.currentDetails.getJobId();
        return this.lockManager.releaseLock(jobLock, JOB_EXECUTION_LOCK).map(released -> {
            if (!released.booleanValue()) {
                logger.warn((Object)("Failed to release job lock for " + state.currentDetails));
                throw new RuntimeException("Failed to release job lock for " + state.currentDetails);
            }
            return state;
        }).toSingle().doOnError(t -> logger.warn((Object)("There was an error trying to release job lock [" + jobLock + "] for " + state.currentDetails), t));
    }

    private Completable deleteScheduledJobs(Date timeSlice) {
        return this.session.execute((Statement)this.deleteScheduledJobs.bind(new Object[]{timeSlice}), this.queryScheduler).doOnCompleted(() -> logger.debug((Object)("Deleted scheduled jobs time slice [" + timeSlice + "]"))).toCompletable();
    }

    private Completable deleteFinishedJobs(Date timeSlice) {
        return this.session.execute((Statement)this.deleteFinishedJobs.bind(new Object[]{timeSlice}), this.queryScheduler).doOnCompleted(() -> logger.debug((Object)("Deleted finished jobs time slice [" + timeSlice + "]"))).toCompletable();
    }

    private Completable deleteActiveTimeSlice(Date timeSlice) {
        return this.session.execute((Statement)this.deleteActiveTimeSlice.bind(new Object[]{timeSlice}), this.queryScheduler).toCompletable();
    }

    @Override
    public void shutdown() {
        try {
            this.running = false;
            this.tickExecutor.shutdown();
            this.tickExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.queryExecutor.shutdown();
            this.queryExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            logger.info((Object)"Shutdown complete");
        }
        catch (InterruptedException e) {
            logger.warn((Object)"Interrupted during shutdown", (Throwable)e);
        }
    }

    private Observable<? extends Set<JobDetails>> findScheduledJobs(Date timeSlice) {
        logger.debug((Object)("Fetching scheduled jobs for [" + timeSlice + "]"));
        return this.jobsService.findScheduledJobsForTime(timeSlice, this.queryScheduler).collect(HashSet::new, HashSet::add);
    }

    private Observable<Set<JobDetails>> computeRemainingJobs(Set<JobDetails> scheduledJobs, Date timeSlice, Set<UUID> activeJobs) {
        Observable<? extends Set<UUID>> finished = this.findFinishedJobs(timeSlice);
        return finished.map(finishedJobs -> {
            HashSet active = new HashSet(activeJobs);
            active.removeAll((Collection<?>)finishedJobs);
            Set jobs = scheduledJobs.stream().filter(details -> !finishedJobs.contains(details.getJobId())).collect(Collectors.toSet());
            return jobs.stream().filter(details -> !active.contains(details.getJobId())).collect(Collectors.toSet());
        });
    }

    private Observable<? extends Set<UUID>> findFinishedJobs(Date timeSlice) {
        return this.session.execute((Statement)this.findFinishedJobs.bind(new Object[]{timeSlice}), this.queryScheduler).flatMap(Observable::from).map(row -> row.getUUID(0)).collect(HashSet::new, HashSet::add);
    }

    private void doOnTick(Action0 action) {
        Action0 wrapper = () -> {
            Date timeSlice = DateTimeService.getTimeSlice((DateTime)new DateTime(this.tickScheduler.now()), (Duration)Minutes.minutes((int)1).toStandardDuration()).toDate();
            logger.debug((Object)("[TICK][" + timeSlice + "] executing action"));
            action.call();
            logger.debug((Object)("Finished tick for [" + timeSlice + "]"));
        };
        AtomicReference previousTimeSliceRef = new AtomicReference();
        Observable.interval((long)0L, (long)1L, (TimeUnit)TimeUnit.MINUTES, (Scheduler)this.tickScheduler).doOnNext(tick -> logger.debug((Object)("CURRENT MINUTE = " + DateTimeService.currentMinute().toDate()))).filter(tick -> {
            DateTime time = DateTimeService.currentMinute();
            if (previousTimeSliceRef.get() == null) {
                previousTimeSliceRef.set(time);
                return true;
            }
            if (((DateTime)previousTimeSliceRef.get()).equals((Object)time)) {
                return false;
            }
            previousTimeSliceRef.set(time);
            return true;
        }).takeUntil(d -> !this.running).subscribe(tick -> wrapper.call(), t -> logger.warn(t));
    }

    private Observable<Void> updateActiveTimeSlices(Date timeSlice) {
        return this.session.execute((Statement)this.addActiveTimeSlice.bind(new Object[]{timeSlice})).map(resultSet -> null);
    }

    private Observable<Date> findTimeSlices() {
        return this.session.execute((Statement)this.findActiveTimeSlices.bind(), this.queryScheduler).flatMap(Observable::from).map(row -> row.getTimestamp(0)).toSortedList().flatMap(Observable::from);
    }

    private static class JobExecutionState {
        final JobDetails currentDetails;
        final JobDetails nextDetails;
        final Set<UUID> activeJobs;
        final Date timeSlice;
        final Date nextTimeSlice;

        public JobExecutionState(JobDetails details, Set<UUID> activeJobs) {
            this.currentDetails = details;
            this.activeJobs = activeJobs;
            this.timeSlice = new Date(details.getTrigger().getTriggerTime());
            this.nextDetails = null;
            this.nextTimeSlice = null;
        }

        public JobExecutionState(JobDetails details, Date timeSlice, JobDetails nextDetails, Date nextTimeSlice, Set<UUID> activeJobs) {
            this.currentDetails = details;
            this.timeSlice = timeSlice;
            this.nextDetails = nextDetails;
            this.nextTimeSlice = nextTimeSlice;
            this.activeJobs = activeJobs;
        }

        public boolean isRepeating() {
            return this.nextDetails != null && this.nextTimeSlice != null;
        }

        public boolean isBehindSchedule() {
            return this.isRepeating() && this.nextTimeSlice.getTime() > this.nextDetails.getTrigger().getTriggerTime();
        }
    }

    private static class JobLock {
        final JobDetails jobDetails;
        final boolean acquired;
        final String name;

        public JobLock(JobDetails jobDetails, boolean acquired) {
            this.jobDetails = jobDetails;
            this.acquired = acquired;
            this.name = "org.hawkular.metrics.scheduler.job." + jobDetails.getJobId();
        }
    }

    private static class TimeSliceLock {
        private Date timeSlice;
        private String name;
        private boolean acquired;

        public TimeSliceLock(Date timeSlice, String name, boolean acquired) {
            this.timeSlice = timeSlice;
            this.name = name;
            this.acquired = acquired;
        }

        public Date getTimeSlice() {
            return this.timeSlice;
        }

        public String getName() {
            return this.name;
        }

        public boolean isAcquired() {
            return this.acquired;
        }
    }

    private static class QueryExecution {
        private Statement query;
        private ResultSet resultSet;
        private Throwable error;

        public QueryExecution(Statement query, ResultSet resultSet) {
            this.resultSet = resultSet;
        }

        public QueryExecution(Statement query, Throwable t) {
            this.error = t;
        }
    }
}

