/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Function;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.JobStatus;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.metrics.scheduler.impl.JobDetailsImpl;
import org.hawkular.metrics.scheduler.impl.JobParametersImpl;
import org.hawkular.metrics.scheduler.impl.ScheduledExecution;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import rx.Completable;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;

public class JobsService {
    private static Logger logger = Logger.getLogger(JobsService.class);
    private RxSession session;
    private PreparedStatement findTimeSlices;
    private PreparedStatement findScheduledForTime;
    private PreparedStatement findAllScheduled;
    private PreparedStatement insertScheduled;
    private PreparedStatement update;
    private PreparedStatement updateStatus;
    private PreparedStatement deleteScheduled;
    private PreparedStatement updateJobParameters;
    private PreparedStatement findByIdAndSlice;
    private static final Function<Map<String, String>, Completable> SAVE_PARAMS_NO_OP = params -> Completable.complete();
    private Func1<Row, Boolean> filterNullJobs = row -> {
        boolean isNull;
        boolean bl = isNull = row.isNull(0) || row.isNull(1) || row.isNull(2) || row.isNull(3) || row.isNull(5);
        if (isNull) {
            logger.warnf("Scheduled job with invalid values present in job_scheduled_idx [JobId: %s, TimeSlice: %s]", (Object)row.getTimestamp(0), (Object)row.getUUID(1));
        }
        return !isNull;
    };

    public JobsService(RxSession session) {
        this.session = session;
        this.findTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM scheduled_jobs_idx");
        this.findScheduledForTime = session.getSession().prepare("SELECT time_slice, job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.findAllScheduled = session.getSession().prepare("SELECT time_slice, job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx");
        this.update = session.getSession().prepare("INSERT INTO jobs (id, type, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
        this.insertScheduled = session.getSession().prepare("INSERT INTO scheduled_jobs_idx (time_slice, job_id, job_type, job_name, job_params, trigger, status) VALUES (?, ?, ?, ?, ?, ?, ?)");
        this.updateStatus = session.getSession().prepare("UPDATE scheduled_jobs_idx SET status = ? WHERE time_slice = ? AND job_id = ?");
        this.deleteScheduled = session.getSession().prepare("DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
        this.updateJobParameters = session.getSession().prepare("UPDATE scheduled_jobs_idx SET job_params = ? WHERE time_slice = ? AND job_id = ?");
        this.findByIdAndSlice = session.getSession().prepare("SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
    }

    public Observable<Date> findActiveTimeSlices(Date currentTime, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findTimeSlices.bind(), scheduler).map(row -> row.getTimestamp(0)).filter(timestamp -> timestamp.compareTo(currentTime) < 0).toSortedList().doOnNext(timeSlices -> logger.debugf("Active time slices %s", timeSlices)).flatMap(Observable::from).concatWith(Observable.just((Object)currentTime));
    }

    public Observable<JobDetailsImpl> findAllScheduledJobs(Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findAllScheduled.bind(), scheduler).filter(this.filterNullJobs).map(row -> this.createJobDetails(row.getUUID(1), row.getString(2), row.getString(3), row.getMap(4, String.class, String.class), JobsService.getTrigger(row.getUDTValue(5)), JobStatus.fromCode(row.getByte(6)), row.getTimestamp(0)));
    }

    public Observable<JobDetailsImpl> findJobs(Date timeSlice, Scheduler scheduler) {
        return this.findActiveTimeSlices(timeSlice, scheduler).flatMap(time -> this.findScheduledJobsForTime((Date)time, scheduler)).reduce(new HashMap(), (map, details) -> {
            JobDetailsImpl other = (JobDetailsImpl)map.get(details.getJobId());
            if (other == null) {
                map.put(details.getJobId(), details);
            } else if (details.getTrigger().getTriggerTime() < other.getTrigger().getTriggerTime()) {
                map.put(details.getJobId(), details);
            }
            return map;
        }).flatMap(map -> Observable.from(map.values()));
    }

    public Completable deleteJob(UUID jobId, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findTimeSlices.bind(), scheduler).map(row -> row.getTimestamp(0)).flatMap(timeSlice -> this.session.execute((Statement)this.deleteScheduled.bind(timeSlice, jobId))).toCompletable();
    }

    public Observable<JobDetails> findScheduledJobs(Date timeSlice, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findAllScheduled.bind(), scheduler).filter(this.filterNullJobs).filter(row -> row.getTimestamp(0).compareTo(timeSlice) <= 0).map(row -> this.createJobDetails(row.getUUID(1), row.getString(2), row.getString(3), row.getMap(4, String.class, String.class), JobsService.getTrigger(row.getUDTValue(5)), JobStatus.fromCode(row.getByte(6)), timeSlice)).collect(HashMap::new, (map, details) -> {
            TreeSet<JobDetails> set = (TreeSet<JobDetails>)map.get(details.getJobId());
            if (set == null) {
                set = new TreeSet<JobDetails>((d1, d2) -> Long.compare(d1.getTrigger().getTriggerTime(), d2.getTrigger().getTriggerTime()));
            }
            set.add((JobDetails)details);
            map.put(details.getJobId(), set);
        }).flatMap(map -> Observable.from(map.entrySet())).map(entry -> (JobDetails)((SortedSet)entry.getValue()).first());
    }

    public Observable<JobDetailsImpl> findScheduledJobsForTime(Date timeSlice, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findScheduledForTime.bind(timeSlice), scheduler).filter(this.filterNullJobs).map(row -> this.createJobDetails(row.getUUID(1), row.getString(2), row.getString(3), row.getMap(4, String.class, String.class), JobsService.getTrigger(row.getUDTValue(5)), JobStatus.fromCode(row.getByte(6)), timeSlice)).doOnSubscribe(() -> logger.debugf("Fetching scheduled jobs tor time slice [%s]", (Object)timeSlice)).doOnNext(details -> logger.debugf("Found job details %s", details));
    }

    public Observable<ScheduledExecution> findScheduledExecutions(UUID jobId, Scheduler scheduler) {
        return this.session.executeAndFetch((Statement)this.findAllScheduled.bind(), scheduler).filter(this.filterNullJobs).filter(row -> row.getUUID(1).equals(jobId)).map(row -> new ScheduledExecution(row.getTimestamp(0), this.createJobDetails(row.getUUID(1), row.getString(2), row.getString(3), row.getMap(4, String.class, String.class), JobsService.getTrigger(row.getUDTValue(5)), JobStatus.fromCode(row.getByte(6)), row.getTimestamp(0))));
    }

    public Observable<ResultSet> insert(Date timeSlice, JobDetails job) {
        if (job.getJobId() == null || job.getJobName() == null || job.getJobType() == null || job.getTrigger() == null) {
            logger.warnf("Tried to insert job on scheduled jobs with invalid values, [JobId: %s, TimeSlice: %s]", (Object)job.getJobId(), (Object)timeSlice);
        }
        return this.session.execute((Statement)this.insertScheduled.bind(timeSlice, job.getJobId(), job.getJobType(), job.getJobName(), job.getParameters().getMap(), JobsService.getTriggerValue(this.session, job.getTrigger())));
    }

    public Observable<ResultSet> updateStatusToFinished(Date timeSlice, UUID jobId) {
        return this.session.executeAndFetch((Statement)this.findByIdAndSlice.bind(timeSlice, jobId)).flatMap(jobRow -> this.session.execute((Statement)this.updateStatus.bind((byte)1, timeSlice, jobId)).doOnError(t -> logger.warnf("There was an error updating the status to finished for %s in time slice [%s]", (Object)jobId, (Object)timeSlice.getTime()))).switchIfEmpty(Observable.empty().doOnCompleted(() -> logger.warnf("Attempt to update the status of a non-exist job [%s] in time slice [%s]", (Object)jobId, (Object)timeSlice.getTime())));
    }

    public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters, Trigger trigger, Date timeSlice) {
        return this.createJobDetails(jobId, jobType, jobName, parameters, trigger, JobStatus.NONE, timeSlice);
    }

    public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters, Trigger trigger, JobStatus status, Date timeSlice) {
        if (jobId == null || jobType == null || jobName == null || trigger == null || timeSlice == null) {
            logger.warnf("Tried to insert job on scheduled jobs with invalid values [JobId: %s, TimeSlice: %s]", (Object)jobId, (Object)timeSlice);
        }
        Function<Map, Completable> saveParameters = params -> this.session.execute((Statement)this.updateJobParameters.bind(params, timeSlice, jobId)).toCompletable();
        return new JobDetailsImpl(jobId, jobType, jobName, new JobParametersImpl(parameters, SAVE_PARAMS_NO_OP), trigger, status);
    }

    public void prepareJobDetailsForExecution(JobDetailsImpl jobDetails, Date timeSlice) {
        if (jobDetails.getJobId() == null || jobDetails.getJobType() == null || jobDetails.getJobName() == null || jobDetails.getTrigger() == null) {
            logger.warn((Object)"Tried to prepare job for execution with invalid values");
        }
        Function<Map<String, String>, Completable> saveParameters = params -> this.session.execute((Statement)this.updateJobParameters.bind(jobDetails.getParameters().getMap(), timeSlice, jobDetails.getJobId())).toCompletable();
        jobDetails.setSaveParameters(saveParameters);
    }

    public void resetJobDetails(JobDetailsImpl jobDetails) {
        jobDetails.setSaveParameters(SAVE_PARAMS_NO_OP);
    }

    static Trigger getTrigger(UDTValue value) {
        int type = value.getInt("type");
        switch (type) {
            case 0: {
                return new SingleExecutionTrigger(value.getLong("trigger_time"));
            }
            case 1: {
                return new RepeatingTrigger(value.getLong("interval"), value.getLong("delay"), value.getLong("trigger_time"), value.getInt("repeat_count"), value.getInt("execution_count"));
            }
        }
        throw new IllegalArgumentException("Trigger type [" + type + "] is not supported");
    }

    static UDTValue getTriggerValue(RxSession session, Trigger trigger) {
        if (trigger instanceof RepeatingTrigger) {
            return JobsService.getRepeatingTriggerValue(session, (RepeatingTrigger)trigger);
        }
        if (trigger instanceof SingleExecutionTrigger) {
            return JobsService.getSingleExecutionTriggerValue(session, (SingleExecutionTrigger)trigger);
        }
        throw new IllegalArgumentException(trigger.getClass() + " is not a supported trigger type");
    }

    static UDTValue getSingleExecutionTriggerValue(RxSession session, SingleExecutionTrigger trigger) {
        UserType triggerType = JobsService.getKeyspace(session).getUserType("trigger_def");
        UDTValue triggerUDT = triggerType.newValue();
        triggerUDT.setInt("type", 0);
        triggerUDT.setLong("trigger_time", trigger.getTriggerTime());
        return triggerUDT;
    }

    static UDTValue getRepeatingTriggerValue(RxSession session, RepeatingTrigger trigger) {
        UserType triggerType = JobsService.getKeyspace(session).getUserType("trigger_def");
        UDTValue triggerUDT = triggerType.newValue();
        triggerUDT.setInt("type", 1);
        triggerUDT.setLong("interval", trigger.getInterval());
        triggerUDT.setLong("trigger_time", trigger.getTriggerTime());
        if (trigger.getDelay() > 0L) {
            triggerUDT.setLong("delay", trigger.getDelay());
        }
        if (trigger.getRepeatCount() != null) {
            triggerUDT.setInt("repeat_count", (int)trigger.getRepeatCount());
            triggerUDT.setInt("execution_count", trigger.getExecutionCount());
        }
        return triggerUDT;
    }

    private static KeyspaceMetadata getKeyspace(RxSession session) {
        return session.getCluster().getMetadata().getKeyspace(session.getLoggedKeyspace());
    }
}

