/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import java.util.Date;
import java.util.HashMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.JobStatus;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.metrics.scheduler.impl.ScheduledExecution;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import rx.Observable;
import rx.Scheduler;

public class JobsService {
    private static Logger logger = Logger.getLogger(JobsService.class);
    private RxSession session;
    private PreparedStatement findTimeSlices;
    private PreparedStatement findScheduledForTime;
    private PreparedStatement findAllScheduled;
    private PreparedStatement insertScheduled;
    private PreparedStatement update;
    private PreparedStatement updateStatus;

    public JobsService(RxSession session) {
        this.session = session;
        this.findTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM scheduled_jobs_idx");
        this.findScheduledForTime = session.getSession().prepare("SELECT job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.findAllScheduled = session.getSession().prepare("SELECT job_id, job_type, job_name, job_params, trigger, status, time_slice FROM scheduled_jobs_idx");
        this.update = session.getSession().prepare("INSERT INTO jobs (id, type, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
        this.insertScheduled = session.getSession().prepare("INSERT INTO scheduled_jobs_idx (time_slice, job_id, job_type, job_name, job_params, trigger, status) VALUES (?, ?, ?, ?, ?, ?, ?)");
        this.updateStatus = session.getSession().prepare("UPDATE scheduled_jobs_idx SET status = ? WHERE time_slice = ? AND job_id = ?");
    }

    public Observable<Date> findActiveTimeSlices(Date currentTime, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findTimeSlices.bind(), scheduler).map(row -> row.getTimestamp(0)).filter(timestamp -> timestamp.compareTo(currentTime) < 0).toSortedList().doOnNext(timeSlices -> logger.debugf("Active time slices %s", timeSlices)).flatMap(Observable::from).concatWith(Observable.just((Object)currentTime));
    }

    public Observable<JobDetails> findJobs(Date timeSlice, Scheduler scheduler) {
        return this.findActiveTimeSlices(timeSlice, scheduler).flatMap(time -> this.findScheduledJobsForTime((Date)time, scheduler)).reduce(new HashMap(), (map, details) -> {
            JobDetails other = (JobDetails)map.get(details.getJobId());
            if (other == null) {
                map.put(details.getJobId(), details);
            } else if (details.getTrigger().getTriggerTime() < other.getTrigger().getTriggerTime()) {
                map.put(details.getJobId(), details);
            }
            return map;
        }).flatMap(map -> Observable.from(map.values()));
    }

    public Observable<JobDetails> findScheduledJobs(Date timeSlice, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findAllScheduled.bind(), scheduler).filter(row -> row.getTimestamp(6).compareTo(timeSlice) <= 0).map(row -> new JobDetails(row.getUUID(0), row.getString(1), row.getString(2), row.getMap(3, String.class, String.class), JobsService.getTrigger(row.getUDTValue(4)), JobStatus.fromCode(row.getByte(5)))).collect(HashMap::new, (map, details) -> {
            TreeSet<JobDetails> set = (TreeSet<JobDetails>)map.get(details.getJobId());
            if (set == null) {
                set = new TreeSet<JobDetails>((d1, d2) -> Long.compare(d1.getTrigger().getTriggerTime(), d2.getTrigger().getTriggerTime()));
            }
            set.add((JobDetails)details);
            map.put(details.getJobId(), set);
        }).flatMap(map -> Observable.from(map.entrySet())).map(entry -> (JobDetails)((SortedSet)entry.getValue()).first());
    }

    public Observable<JobDetails> findScheduledJobsForTime(Date timeSlice, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findScheduledForTime.bind(new Object[]{timeSlice}), scheduler).map(row -> new JobDetails(row.getUUID(0), row.getString(1), row.getString(2), row.getMap(3, String.class, String.class), JobsService.getTrigger(row.getUDTValue(4)), JobStatus.fromCode(row.getByte(5)))).doOnSubscribe(() -> logger.debugf("Fetching scheduled jobs tor time slice [%s]", (Object)timeSlice)).doOnNext(details -> logger.debugf("Found job details %s", details));
    }

    public Observable<ScheduledExecution> findScheduledExecutions(UUID jobId, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findAllScheduled.bind(), scheduler).filter(row -> row.getUUID(0).equals(jobId)).map(row -> new ScheduledExecution(row.getTimestamp(6), new JobDetails(jobId, row.getString(1), row.getString(2), row.getMap(3, String.class, String.class), JobsService.getTrigger(row.getUDTValue(4)), JobStatus.fromCode(row.getByte(5)))));
    }

    public Observable<ResultSet> insert(Date timeSlice, JobDetails job) {
        return this.session.execute((Statement)this.insertScheduled.bind(new Object[]{timeSlice, job.getJobId(), job.getJobType(), job.getJobName(), job.getParameters(), JobsService.getTriggerValue(this.session, job.getTrigger())}));
    }

    public Observable<ResultSet> updateStatusToFinished(Date timeSlice, UUID jobId) {
        return this.session.execute((Statement)this.updateStatus.bind(new Object[]{(byte)1, timeSlice, jobId})).doOnError(t -> logger.warnf("There was an error updating the status to finished for %s in time slice [%s]", (Object)jobId, (Object)timeSlice.getTime()));
    }

    static Trigger getTrigger(UDTValue value) {
        int type = value.getInt("type");
        switch (type) {
            case 0: {
                return new SingleExecutionTrigger(value.getLong("trigger_time"));
            }
            case 1: {
                return new RepeatingTrigger(value.getLong("interval"), value.getLong("delay"), value.getLong("trigger_time"), value.getInt("repeat_count"), value.getInt("execution_count"));
            }
        }
        throw new IllegalArgumentException("Trigger type [" + type + "] is not supported");
    }

    static UDTValue getTriggerValue(RxSession session, Trigger trigger) {
        if (trigger instanceof RepeatingTrigger) {
            return JobsService.getRepeatingTriggerValue(session, (RepeatingTrigger)trigger);
        }
        if (trigger instanceof SingleExecutionTrigger) {
            return JobsService.getSingleExecutionTriggerValue(session, (SingleExecutionTrigger)trigger);
        }
        throw new IllegalArgumentException(trigger.getClass() + " is not a supported trigger type");
    }

    static UDTValue getSingleExecutionTriggerValue(RxSession session, SingleExecutionTrigger trigger) {
        UserType triggerType = JobsService.getKeyspace(session).getUserType("trigger_def");
        UDTValue triggerUDT = triggerType.newValue();
        triggerUDT.setInt("type", 0);
        triggerUDT.setLong("trigger_time", trigger.getTriggerTime());
        return triggerUDT;
    }

    static UDTValue getRepeatingTriggerValue(RxSession session, RepeatingTrigger trigger) {
        UserType triggerType = JobsService.getKeyspace(session).getUserType("trigger_def");
        UDTValue triggerUDT = triggerType.newValue();
        triggerUDT.setInt("type", 1);
        triggerUDT.setLong("interval", trigger.getInterval());
        triggerUDT.setLong("trigger_time", trigger.getTriggerTime());
        if (trigger.getDelay() > 0L) {
            triggerUDT.setLong("delay", trigger.getDelay());
        }
        if (trigger.getRepeatCount() != null) {
            triggerUDT.setInt("repeat_count", trigger.getRepeatCount().intValue());
            triggerUDT.setInt("execution_count", trigger.getExecutionCount());
        }
        return triggerUDT;
    }

    private static KeyspaceMetadata getKeyspace(RxSession session) {
        return session.getCluster().getMetadata().getKeyspace(session.getLoggedKeyspace());
    }
}

