/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.RetryPolicy;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.metrics.scheduler.impl.LockManager;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import rx.Completable;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class SchedulerImpl
implements org.hawkular.metrics.scheduler.api.Scheduler {
    private static Logger logger = Logger.getLogger(SchedulerImpl.class);
    private Map<String, Func1<JobDetails, Completable>> jobFactories;
    private Map<String, Func2<JobDetails, Throwable, RetryPolicy>> retryFunctions;
    private ScheduledExecutorService tickExecutor;
    private Scheduler tickScheduler;
    private ExecutorService queryExecutor;
    private Scheduler queryScheduler;
    private RxSession session;
    private PreparedStatement insertJob;
    private PreparedStatement insertScheduledJob;
    private PreparedStatement findScheduledJobs;
    private PreparedStatement deleteScheduledJobs;
    private PreparedStatement findFinishedJobs;
    private PreparedStatement deleteFinishedJobs;
    private PreparedStatement updateJobToFinished;
    private PreparedStatement findJob;
    private PreparedStatement addActiveTimeSlice;
    private PreparedStatement findActiveTimeSlices;
    private PreparedStatement deleteActiveTimeSlice;
    private LockManager lockManager;
    private boolean running;
    private AtomicInteger ticks = new AtomicInteger();
    private static Func2<JobDetails, Throwable, RetryPolicy> NO_RETRY = (details, throwable) -> RetryPolicy.NONE;
    static final String QUEUE_LOCK_PREFIX = "org.hawkular.metrics.scheduler.queue.";
    private Optional<PublishSubject<Date>> finishedTimeSlices;
    private Optional<PublishSubject<JobDetails>> jobFinished;
    private final Object lock = new Object();

    public SchedulerImpl(RxSession session) {
        this.session = session;
        this.jobFactories = new HashMap<String, Func1<JobDetails, Completable>>();
        this.retryFunctions = new HashMap<String, Func2<JobDetails, Throwable, RetryPolicy>>();
        this.tickExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ticker-pool-%d").build());
        this.tickScheduler = Schedulers.from((Executor)this.tickExecutor);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build();
        this.queryExecutor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.DiscardPolicy());
        this.queryScheduler = Schedulers.from((Executor)this.queryExecutor);
        this.lockManager = new LockManager(session);
        this.insertJob = session.getSession().prepare("INSERT INTO jobs (id, type, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
        this.insertScheduledJob = session.getSession().prepare("INSERT INTO scheduled_jobs_idx (time_slice, job_id) VALUES (?, ?)");
        this.findScheduledJobs = session.getSession().prepare("SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.deleteScheduledJobs = session.getSession().prepare("DELETE FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.findFinishedJobs = session.getSession().prepare("SELECT job_id FROM finished_jobs_idx WHERE time_slice = ?");
        this.deleteFinishedJobs = session.getSession().prepare("DELETE FROM finished_jobs_idx WHERE time_slice = ?");
        this.updateJobToFinished = session.getSession().prepare("INSERT INTO finished_jobs_idx (time_slice, job_id) VALUES (?, ?)");
        this.findJob = session.getSession().prepare("SELECT type, name, params, trigger FROM jobs WHERE id = ?");
        this.addActiveTimeSlice = session.getSession().prepare("INSERT INTO active_time_slices (time_slice) VALUES (?)");
        this.findActiveTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM active_time_slices");
        this.deleteActiveTimeSlice = session.getSession().prepare("DELETE FROM active_time_slices WHERE time_slice = ?");
        this.finishedTimeSlices = Optional.empty();
        this.jobFinished = Optional.empty();
    }

    public void setTickScheduler(Scheduler scheduler) {
        this.tickScheduler = scheduler;
    }

    public void setTimeSlicesSubject(PublishSubject<Date> timeSlicesSubject) {
        this.finishedTimeSlices = Optional.of(timeSlicesSubject);
    }

    public void setJobFinishedSubject(PublishSubject<JobDetails> subject) {
        this.jobFinished = Optional.of(subject);
    }

    @Override
    public void register(String jobType, Func1<JobDetails, Completable> jobProducer) {
        this.jobFactories.put(jobType, jobProducer);
    }

    @Override
    public void register(String jobType, Func1<JobDetails, Completable> jobProducer, Func2<JobDetails, Throwable, RetryPolicy> retryFunction) {
        this.jobFactories.put(jobType, jobProducer);
        this.retryFunctions.put(jobType, retryFunction);
    }

    @Override
    public Single<JobDetails> scheduleJob(String type, String name, Map<String, String> parameter, Trigger trigger) {
        if (System.currentTimeMillis() >= trigger.getTriggerTime()) {
            return Single.error((Throwable)new RuntimeException("Trigger time has already passed"));
        }
        String lockName = QUEUE_LOCK_PREFIX + trigger.getTriggerTime();
        return this.lockManager.acquireLock(lockName, "scheduling", 5).map(acquired -> {
            if (acquired.booleanValue()) {
                UUID jobId = UUID.randomUUID();
                return new JobDetails(jobId, type, name, parameter, trigger);
            }
            throw new RuntimeException("Failed to acquire scheduling lock [" + lockName + "]");
        }).toSingle().flatMap(details -> Completable.merge((Completable[])new Completable[]{this.insertIntoJobsTable((JobDetails)details), this.updateScheduledJobsIndex((JobDetails)details)}).andThen(Single.just((Object)details)));
    }

    @Override
    public void start() {
        this.running = true;
        ConcurrentSkipListSet activeTimeSlices = new ConcurrentSkipListSet();
        ConcurrentSkipListSet activeJobs = new ConcurrentSkipListSet();
        this.doOnTick(() -> {
            logger.debug((Object)("Activating scheduler for [" + DateTimeService.currentMinute().toDate() + "]"));
            Date timeSlice = DateTimeService.currentMinute().toDate();
            this.updateActiveTimeSlicesX(timeSlice).flatMap(aVoid -> this.findTimeSlices()).filter(d -> {
                Object object = this.lock;
                synchronized (object) {
                    if (!activeTimeSlices.contains(d)) {
                        activeTimeSlices.add(d);
                        return true;
                    }
                    return false;
                }
            }).doOnNext(d -> logger.debug((Object)("Running job scheduler for [" + d + "]"))).flatMap(this::acquireTimeSliceLock).flatMap(timeSliceLock -> this.findScheduledJobs(timeSliceLock.getTimeSlice()).map(scheduledJobs -> this.computeRemainingJobs((Set<UUID>)scheduledJobs, timeSliceLock.getTimeSlice(), activeJobs)).flatMap(Observable::from).flatMap(this::acquireJobLock).filter(JobLock::isAcquired).flatMap(jobLock -> this.findJob(jobLock.getJobId())).flatMap(details -> this.doJobExecution((JobDetails)details, activeJobs).toObservable().map(o -> timeSliceLock.getTimeSlice())).defaultIfEmpty((Object)timeSliceLock.getTimeSlice())).flatMap(time -> {
                Observable<? extends Set<UUID>> scheduled = this.findScheduledJobs((Date)time);
                Observable<? extends Set<UUID>> finished = this.findFinishedJobs((Date)time);
                return Observable.sequenceEqual(scheduled, finished).flatMap(allFinished -> {
                    if (allFinished.booleanValue()) {
                        logger.debug((Object)("All jobs for time slice [" + time + "] have finished"));
                        return Completable.merge((Completable[])new Completable[]{this.deleteActiveTimeSlice((Date)time), this.deleteFinishedJobs((Date)time), this.deleteScheduledJobs((Date)time)}).toObservable().reduce(null, (o1, o2) -> o2).map(o -> time);
                    }
                    return Observable.just((Object)time);
                });
            }).subscribe(d -> {
                logger.debug((Object)("Finished post job execution clean up for [" + d + "]"));
                activeTimeSlices.remove(d);
                this.finishedTimeSlices.ifPresent(subject -> subject.onNext(d));
            }, t -> logger.warn((Object)"Job execution failed", t), () -> logger.debug((Object)"Done!"));
        });
    }

    private Observable<TimeSliceLock> acquireTimeSliceLock(Date timeSlice) {
        String lockName = QUEUE_LOCK_PREFIX + timeSlice.getTime();
        int delay = 5;
        Observable observable = Observable.create(subscriber -> this.lockManager.acquireLock(lockName, "executing", 3600).map(acquired -> {
            if (!acquired.booleanValue()) {
                logger.debug((Object)("Failed to acquire time slice lock for [" + timeSlice + "]. Will attempt " + "to acquire it again in " + delay + " seconds."));
                throw new RuntimeException();
            }
            return new TimeSliceLock(timeSlice, lockName, (boolean)acquired);
        }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0), () -> ((Subscriber)subscriber).onCompleted()));
        return observable.retryWhen(errors -> errors.flatMap(e -> Observable.timer((long)delay, (TimeUnit)TimeUnit.SECONDS, (Scheduler)this.queryScheduler)));
    }

    private Observable<JobLock> acquireJobLock(UUID jobId) {
        String jobLock = "org.hawkular.metrics.scheduler.job." + jobId;
        return this.lockManager.acquireLock(jobLock, "locked", 3600).map(acquired -> new JobLock(jobId, (boolean)acquired));
    }

    private Completable doJobExecution(final JobDetails details, Set<UUID> activeJobs) {
        logger.debug((Object)("Starting execution for " + details));
        Func1<JobDetails, Completable> factory = this.jobFactories.get(details.getJobType());
        Completable job = (Completable)factory.call((Object)details);
        logger.debug((Object)("Preparing to execute " + details));
        Date timeSlice = new Date(details.getTrigger().getTriggerTime());
        return Completable.concat((Completable[])new Completable[]{job.onErrorResumeNext(t -> {
            final RetryPolicy retryPolicy = (RetryPolicy)this.retryFunctions.getOrDefault(details.getJobType(), NO_RETRY).call((Object)details, t);
            if (retryPolicy == RetryPolicy.NONE) {
                return Completable.complete();
            }
            if (details.getTrigger().nextTrigger() != null) {
                logger.warn((Object)("Retry Policies cannot be used with jobs that repeat. " + details + " will execute again according to its next trigger."));
            }
            if (retryPolicy == RetryPolicy.NOW) {
                return (Completable)factory.call((Object)details);
            }
            Trigger newTrigger = new Trigger(){

                @Override
                public long getTriggerTime() {
                    return details.getTrigger().getTriggerTime();
                }

                @Override
                public Trigger nextTrigger() {
                    return new SingleExecutionTrigger.Builder().withDelay(retryPolicy.getDelay(), TimeUnit.MILLISECONDS).build();
                }
            };
            JobDetails newDetails = new JobDetails(details.getJobId(), details.getJobType(), details.getJobName(), details.getParameters(), newTrigger);
            return this.rescheduleJob(newDetails);
        }), this.setJobFinished(timeSlice, details), this.rescheduleJob(details), this.deactivateJob(activeJobs, details), Completable.fromAction(() -> this.jobFinished.ifPresent(subject -> subject.onNext((Object)details)))}).subscribeOn(Schedulers.io());
    }

    private Trigger getNextTrigger(Date timeSlice, Trigger trigger) {
        Trigger next = trigger;
        while (timeSlice.getTime() > next.getTriggerTime()) {
            next = next.nextTrigger();
        }
        return next;
    }

    private Completable setJobFinished(Date timeSlice, JobDetails details) {
        return this.session.execute((Statement)this.updateJobToFinished.bind(timeSlice, details.getJobId()), this.queryScheduler).doOnCompleted(() -> logger.debug((Object)("Updating " + details + " status to finished for time slice [" + timeSlice + "]"))).toCompletable();
    }

    private Completable rescheduleJob(JobDetails details) {
        Trigger nextTrigger = details.getTrigger().nextTrigger();
        if (nextTrigger == null) {
            return Completable.fromAction(() -> logger.debug((Object)("No more scheduled executions for " + details)));
        }
        JobDetails newDetails = new JobDetails(details.getJobId(), details.getJobType(), details.getJobName(), details.getParameters(), nextTrigger);
        return Completable.concat((Completable[])new Completable[]{Completable.fromAction(() -> logger.debug((Object)("Scheduling " + newDetails + " for next execution at " + new Date(nextTrigger.getTriggerTime())))), this.updateScheduledJobsIndex(newDetails), this.insertIntoJobsTable(newDetails)});
    }

    private Completable deactivateJob(Set<UUID> activeJobs, JobDetails details) {
        String jobLock = "org.hawkular.metrics.scheduler.job." + details.getJobId();
        Completable removeActiveJobId = Completable.fromAction(() -> {
            logger.debug((Object)("Removing " + details + " from active jobs"));
            activeJobs.remove(details.getJobId());
        });
        Completable releaseLock = this.lockManager.releaseLock(jobLock, "locked").doOnNext(released -> logger.debug((Object)("Released job lock [" + jobLock + "]? " + released))).toCompletable();
        return removeActiveJobId.concatWith(releaseLock);
    }

    private Completable deleteScheduledJobs(Date timeSlice) {
        return this.session.execute((Statement)this.deleteScheduledJobs.bind(timeSlice), this.queryScheduler).doOnCompleted(() -> logger.debug((Object)("Deleted scheduled jobs time slice [" + timeSlice + "]"))).toCompletable();
    }

    private Completable deleteFinishedJobs(Date timeSlice) {
        return this.session.execute((Statement)this.deleteFinishedJobs.bind(timeSlice), this.queryScheduler).doOnCompleted(() -> logger.debug((Object)("Deleted finished jobs time slice [" + timeSlice + "]"))).toCompletable();
    }

    private Completable deleteActiveTimeSlice(Date timeSlice) {
        return this.session.execute((Statement)this.deleteActiveTimeSlice.bind(timeSlice), this.queryScheduler).doOnCompleted(() -> logger.debug((Object)("Deleted active time slice [" + timeSlice + "]"))).toCompletable();
    }

    @Override
    public void shutdown() {
        try {
            this.running = false;
            this.tickExecutor.shutdown();
            this.tickExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.queryExecutor.shutdown();
            this.queryExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            logger.info((Object)"Shutdown complete");
        }
        catch (InterruptedException e) {
            logger.warn((Object)"Interrupted during shutdown", (Throwable)e);
        }
    }

    void reset(Scheduler tickScheduler) {
        logger.debug((Object)"Starting reset");
        this.shutdown();
        this.jobFactories = new HashMap<String, Func1<JobDetails, Completable>>();
        this.tickScheduler = tickScheduler;
        this.queryExecutor = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build());
        this.queryScheduler = Schedulers.from((Executor)this.queryExecutor);
    }

    private Set<UUID> findScheduledJobsBlocking(Date timeSlice) {
        logger.debug((Object)("Fetching scheduled jobs for [" + timeSlice + "]"));
        return (Set)this.session.execute((Statement)this.findScheduledJobs.bind(timeSlice)).flatMap(Observable::from).map(row -> row.getUUID(0)).doOnNext(uuid -> logger.debug((Object)("Scheduled job [" + uuid + "]"))).collect(HashSet::new, HashSet::add).toBlocking().firstOrDefault(new HashSet());
    }

    private Observable<? extends Set<UUID>> findScheduledJobs(Date timeSlice) {
        logger.debug((Object)("Fetching scheduled jobs for [" + timeSlice + "]"));
        return this.session.execute((Statement)this.findScheduledJobs.bind(timeSlice), this.queryScheduler).flatMap(Observable::from).map(row -> row.getUUID(0)).collect(HashSet::new, HashSet::add);
    }

    private Set<UUID> computeRemainingJobs(Set<UUID> scheduledJobs, Date timeSlice, Set<UUID> activeJobs) {
        Set<UUID> finishedJobs = this.findFinishedJobsBlocking(timeSlice);
        activeJobs.removeAll(finishedJobs);
        Sets.SetView<UUID> jobs = Sets.difference(scheduledJobs, finishedJobs);
        return Sets.difference(jobs, activeJobs);
    }

    private Set<UUID> findFinishedJobsBlocking(Date timeSlice) {
        return (Set)this.session.execute((Statement)this.findFinishedJobs.bind(timeSlice), this.queryScheduler).flatMap(Observable::from).map(row -> row.getUUID(0)).collect(HashSet::new, HashSet::add).toBlocking().firstOrDefault(new HashSet());
    }

    private Observable<? extends Set<UUID>> findFinishedJobs(Date timeSlice) {
        return this.session.execute((Statement)this.findFinishedJobs.bind(timeSlice), this.queryScheduler).flatMap(Observable::from).map(row -> row.getUUID(0)).doOnNext(id -> logger.debug((Object)("Finished job [" + id + "]"))).collect(HashSet::new, HashSet::add);
    }

    private Completable insertIntoJobsTable(JobDetails details) {
        return this.session.execute((Statement)this.insertJob.bind(details.getJobId(), details.getJobType(), details.getJobName(), details.getParameters(), SchedulerImpl.getTriggerValue(this.session, details.getTrigger())), this.queryScheduler).toCompletable();
    }

    private Completable updateScheduledJobsIndex(JobDetails details) {
        return this.session.execute((Statement)this.insertScheduledJob.bind(new Date(details.getTrigger().getTriggerTime()), details.getJobId()), this.queryScheduler).toCompletable();
    }

    private void doOnTick(Action0 action) {
        Action0 wrapper = () -> {
            Date timeSlice = DateTimeService.getTimeSlice(new DateTime(this.tickScheduler.now()), Minutes.minutes(1).toStandardDuration()).toDate();
            logger.debug((Object)("[TICK][" + timeSlice + "] executing action"));
            action.call();
            logger.debug((Object)("Finished tick for [" + timeSlice + "]"));
        };
        AtomicReference previousTimeSliceRef = new AtomicReference();
        Observable.interval((long)0L, (long)1L, (TimeUnit)TimeUnit.MINUTES, (Scheduler)this.tickScheduler).doOnNext(tick -> logger.debug((Object)("CURRENT MINUTE = " + DateTimeService.currentMinute().toDate()))).filter(tick -> {
            DateTime time = DateTimeService.currentMinute();
            if (previousTimeSliceRef.get() == null) {
                previousTimeSliceRef.set(time);
                return true;
            }
            logger.debug((Object)("previous=[" + ((DateTime)previousTimeSliceRef.get()).toLocalDateTime() + "], current=[" + time.toLocalDateTime() + "]"));
            if (((DateTime)previousTimeSliceRef.get()).equals(time)) {
                return false;
            }
            previousTimeSliceRef.set(time);
            return true;
        }).takeUntil(d -> !this.running).subscribe(tick -> wrapper.call(), t -> logger.warn(t));
    }

    private Completable updateActiveTimeSlices(Date timeSlice) {
        return this.session.execute((Statement)this.addActiveTimeSlice.bind(timeSlice)).toCompletable();
    }

    private Observable<Void> updateActiveTimeSlicesX(Date timeSlice) {
        return this.session.execute((Statement)this.addActiveTimeSlice.bind(timeSlice)).map(resultSet -> null);
    }

    private Observable<Date> findTimeSlices() {
        return this.session.execute((Statement)this.findActiveTimeSlices.bind(), this.queryScheduler).flatMap(Observable::from).map(row -> row.getTimestamp(0)).toSortedList().flatMap(Observable::from).doOnNext(d -> logger.debug((Object)("Time slice [" + d + "]")));
    }

    private Observable<JobDetails> findJob(UUID jobId) {
        return this.session.execute((Statement)this.findJob.bind(jobId), this.queryScheduler).flatMap(Observable::from).map(row -> new JobDetails(jobId, row.getString(0), row.getString(1), row.getMap(2, String.class, String.class), SchedulerImpl.getTrigger(row.getUDTValue(3)))).doOnError(t -> logger.warn((Object)("Failed to fetch job [" + jobId + "]"), t));
    }

    static Trigger getTrigger(UDTValue value) {
        int type = value.getInt("type");
        switch (type) {
            case 0: {
                return new SingleExecutionTrigger(value.getLong("trigger_time"));
            }
            case 1: {
                return new RepeatingTrigger(value.getLong("interval"), value.getLong("delay"), value.getLong("trigger_time"), value.getInt("repeat_count"), value.getInt("execution_count"));
            }
        }
        throw new IllegalArgumentException("Trigger type [" + type + "] is not supported");
    }

    static UDTValue getTriggerValue(RxSession session, Trigger trigger) {
        if (trigger instanceof RepeatingTrigger) {
            return SchedulerImpl.getRepeatingTriggerValue(session, (RepeatingTrigger)trigger);
        }
        if (trigger instanceof SingleExecutionTrigger) {
            return SchedulerImpl.getSingleExecutionTriggerValue(session, (SingleExecutionTrigger)trigger);
        }
        throw new IllegalArgumentException(trigger.getClass() + " is not a supported trigger type");
    }

    static UDTValue getSingleExecutionTriggerValue(RxSession session, SingleExecutionTrigger trigger) {
        UserType triggerType = SchedulerImpl.getKeyspace(session).getUserType("trigger_def");
        UDTValue triggerUDT = triggerType.newValue();
        triggerUDT.setInt("type", 0);
        triggerUDT.setLong("trigger_time", trigger.getTriggerTime());
        return triggerUDT;
    }

    static UDTValue getRepeatingTriggerValue(RxSession session, RepeatingTrigger trigger) {
        UserType triggerType = SchedulerImpl.getKeyspace(session).getUserType("trigger_def");
        UDTValue triggerUDT = triggerType.newValue();
        triggerUDT.setInt("type", 1);
        triggerUDT.setLong("interval", trigger.getInterval());
        triggerUDT.setLong("trigger_time", trigger.getTriggerTime());
        if (trigger.getDelay() > 0L) {
            triggerUDT.setLong("delay", trigger.getDelay());
        }
        if (trigger.getRepeatCount() != null) {
            triggerUDT.setInt("repeat_count", (int)trigger.getRepeatCount());
            triggerUDT.setInt("execution_count", trigger.getExecutionCount());
        }
        return triggerUDT;
    }

    private static KeyspaceMetadata getKeyspace(RxSession session) {
        return session.getCluster().getMetadata().getKeyspace(session.getLoggedKeyspace());
    }

    private static class JobLock {
        private UUID jobId;
        private boolean acquired;

        public JobLock(UUID jobId, boolean acquired) {
            this.jobId = jobId;
            this.acquired = acquired;
        }

        public UUID getJobId() {
            return this.jobId;
        }

        public String getName() {
            return "org.hawkular.metrics.scheduler.job." + this.jobId;
        }

        public boolean isAcquired() {
            return this.acquired;
        }
    }

    private static class TimeSliceLock {
        private Date timeSlice;
        private String name;
        private boolean acquired;

        public TimeSliceLock(Date timeSlice, String name, boolean acquired) {
            this.timeSlice = timeSlice;
            this.name = name;
            this.acquired = acquired;
        }

        public Date getTimeSlice() {
            return this.timeSlice;
        }

        public String getName() {
            return this.name;
        }

        public boolean isAcquired() {
            return this.acquired;
        }
    }
}

