package org.hawkular.metrics.tasks.impl;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.UDTValue;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.hawkular.metrics.tasks.DateTimeService;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.SingleExecutionTrigger;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.log.TaskQueueLogger;
import org.hawkular.metrics.tasks.log.TaskQueueLogging;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* loaded from: input_file:WEB-INF/lib/hawkular-metrics-task-queue-0.16.0.Final.jar:org/hawkular/metrics/tasks/impl/TaskSchedulerImpl.class */
public class TaskSchedulerImpl implements TaskScheduler {
    private static TaskQueueLogger log = TaskQueueLogging.getTaskQueueLogger(TaskSchedulerImpl.class);
    public static final int DEFAULT_LEASE_TTL = 180;
    private RxSession session;
    private Queries queries;
    private boolean running;
    private Subscription leasesSubscription;
    private int numShards = Integer.parseInt(System.getProperty("hawkular.scheduler.shards", "10"));
    private HashFunction hashFunction = Hashing.murmur3_128();
    private DateTimeService dateTimeService = new DateTimeService();
    private ScheduledExecutorService tickExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ticker-pool-%d").build());
    private Scheduler tickScheduler = Schedulers.from(this.tickExecutor);
    private ExecutorService leaseExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("lease-pool-%d").build());
    private ExecutorService tasksExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat("tasks-pool-%d").build());
    private Scheduler tasksScheduler = Schedulers.from(this.tasksExecutor);
    private Scheduler leaseScheduler = Schedulers.from(this.leaseExecutor);
    private PublishSubject<Task2> taskSubject = PublishSubject.create();
    private PublishSubject<Long> tickSubject = PublishSubject.create();

    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-task-queue-0.16.0.Final.jar:org/hawkular/metrics/tasks/impl/TaskSchedulerImpl$SubscriberWrapper.class */
    private class SubscriberWrapper extends Subscriber<Task2> {
        private Subscriber<Task2> delegate;

        public SubscriberWrapper(Subscriber<Task2> subscriber) {
            this.delegate = subscriber;
        }

        @Override // rx.Observer
        public void onCompleted() {
            this.delegate.onCompleted();
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            this.delegate.onError(th);
        }

        @Override // rx.Observer
        public void onNext(Task2 task2) {
            try {
                this.delegate.onNext(task2);
            } catch (Exception e) {
                TaskSchedulerImpl.log.warnTaskExecutionFailed(task2, e);
            }
        }
    }

    public TaskSchedulerImpl(RxSession rxSession, Queries queries) {
        this.session = rxSession;
        this.queries = queries;
    }

    public void setTickScheduler(Scheduler scheduler) {
        this.tickScheduler = scheduler;
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public Subscription subscribe(Action1<Task2> action1) {
        return this.taskSubject.subscribe(action1);
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public Subscription subscribe(Subscriber<Task2> subscriber) {
        return this.taskSubject.subscribe((Subscriber<? super Task2>) new SubscriberWrapper(subscriber));
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public Observable<Long> getFinishedTimeSlices() {
        return this.tickSubject;
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public Observable<Task2> getTasks() {
        return this.taskSubject;
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public Observable<Lease> start() {
        Observable<R> flatMap = createTicks().flatMap(this::getAvailableLeases);
        Observable create = Observable.create(subscriber -> {
            this.leasesSubscription = flatMap.subscribe(lease -> {
                log.debugf("Loading tasks for %s", lease);
                CountDownLatch countDownLatch = new CountDownLatch(1);
                getQueue(lease).observeOn(this.tasksScheduler).groupBy((v0) -> {
                    return v0.getGroupKey();
                }).flatMap(groupedObservable -> {
                    return groupedObservable.flatMap(this::execute).map(this::rescheduleTask);
                }).subscribe(observable -> {
                    log.debugf("Finished executing %s", observable);
                }, th -> {
                    log.warnTasksObservationProblem(th);
                }, () -> {
                    Date date = new Date(lease.getTimeSlice());
                    Observable.merge(this.session.execute(this.queries.deleteTasks.bind(date, Integer.valueOf(lease.getShard())), this.tasksScheduler), this.session.execute(this.queries.finishLease.bind(date, Integer.valueOf(lease.getShard())), this.tasksScheduler)).subscribe(resultSet -> {
                    }, th2 -> {
                        log.warnTaskPostProcessProblem(th2);
                        subscriber.onError(th2);
                    }, () -> {
                        log.debugf("Finished executing tasks for %s", lease);
                        countDownLatch.countDown();
                        subscriber.onNext(lease);
                    });
                });
                log.debugf("Started processing tasks for %s", lease);
                try {
                    countDownLatch.await();
                    log.debug("Done waiting!");
                } catch (InterruptedException e) {
                    log.warnInterruptionOnTaskCompleteWaiting(e);
                }
            }, th -> {
                log.warnLeasesObservationProblem(th);
            }, () -> {
                log.debug("Finished observing leases");
                subscriber.onCompleted();
            });
        });
        PublishSubject create2 = PublishSubject.create();
        create.subscribe(create2);
        this.running = true;
        return create2;
    }

    private Observable<Date> createTicks() {
        return Observable.interval(0L, 1L, TimeUnit.MINUTES, this.tickScheduler).map(l -> {
            return currentTimeSlice();
        }).takeUntil((Func1<? super R, Boolean>) date -> {
            return Boolean.valueOf(!this.running);
        }).doOnNext(date2 -> {
            log.debugf("Tick %s", date2);
        }).observeOn(this.leaseScheduler);
    }

    private Observable<Lease> getAvailableLeases(Date date) {
        return Observable.create(subscriber -> {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Loading leases for " + date);
                    log.debug("Timestamp is " + date.getTime());
                }
                List<Lease> findAvailableLeases = findAvailableLeases(date);
                while (!findAvailableLeases.isEmpty()) {
                    Iterator<Lease> it = findAvailableLeases.iterator();
                    if (it.hasNext()) {
                        Lease next = it.next();
                        if (acquire(next)) {
                            log.debugf("Acquired %s", next);
                            subscriber.onNext(next);
                            log.debugf("Finished with %s", next);
                        }
                    }
                    log.debug("Looking for available leases");
                    findAvailableLeases = findAvailableLeases(date);
                }
                log.debugf("No more leases to process for %s", date);
                this.session.execute(this.queries.deleteLeases.bind(date)).toBlocking().first();
                subscriber.onCompleted();
                this.tickSubject.onNext(Long.valueOf(date.getTime()));
            } catch (Exception e) {
                subscriber.onError(e);
            }
        });
    }

    private List<Lease> findAvailableLeases(Date date) {
        return (List) this.session.execute(this.queries.findLeases.bind(date)).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return new Lease(date.getTime(), row.getInt(0), row.getString(1), row.getBool(2));
        }).filter(lease -> {
            return Boolean.valueOf(!lease.isFinished() && lease.getOwner() == null);
        }).toList().toBlocking().firstOrDefault(Collections.emptyList());
    }

    private boolean acquire(Lease lease) {
        return ((Boolean) this.session.execute(this.queries.acquireLease.bind(180, "localhost", new Date(lease.getTimeSlice()), Integer.valueOf(lease.getShard()))).map((v0) -> {
            return v0.wasApplied();
        }).toBlocking().firstOrDefault(false)).booleanValue();
    }

    Observable<Task2Impl> getQueue(Lease lease) {
        log.debugf("Loading task queue for %s", lease);
        return this.session.execute(this.queries.getTasksFromQueue.bind(new Date(lease.getTimeSlice()), Integer.valueOf(lease.getShard())), Schedulers.immediate()).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return new Task2Impl(row.getUUID(2), row.getString(0), row.getInt(1), row.getString(3), row.getMap(4, String.class, String.class), getTrigger(row.getUDTValue(5)));
        });
    }

    private Observable<Task2Impl> execute(Task2Impl task2Impl) {
        return Observable.create(subscriber -> {
            log.debugf("Emitting %s for execution", task2Impl);
            this.taskSubject.onNext(task2Impl);
            subscriber.onNext(task2Impl);
            subscriber.onCompleted();
            log.debugf("Finished executing %s", task2Impl);
        }).subscribeOn(Schedulers.immediate());
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public void shutdown() {
        try {
            log.debug("shutting down");
            this.running = false;
            if (this.leasesSubscription != null) {
                this.leasesSubscription.unsubscribe();
            }
            this.tasksExecutor.shutdown();
            this.tasksExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.leaseExecutor.shutdown();
            this.leaseExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.tickExecutor.shutdown();
            this.tickExecutor.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted during shutdown", e);
        }
    }

    private Date currentTimeSlice() {
        return this.dateTimeService.getTimeSlice(new DateTime(this.tickScheduler.now()), Duration.standardMinutes(1L)).toDate();
    }

    public Observable<Task2> findTask(UUID uuid) {
        return Observable.create(subscriber -> {
            Observable<R> flatMap = this.session.execute(this.queries.findTask.bind(uuid)).flatMap((v0) -> {
                return Observable.from(v0);
            });
            Action1 action1 = row -> {
                subscriber.onNext(new Task2Impl(uuid, row.getString(0), row.getInt(1), row.getString(2), row.getMap(3, String.class, String.class), getTrigger(row.getUDTValue(4))));
            };
            Action1<Throwable> action12 = th -> {
                subscriber.onError(new RuntimeException("Failed to find task with id " + uuid, th));
            };
            subscriber.getClass();
            flatMap.subscribe(action1, action12, subscriber::onCompleted);
        });
    }

    @Override // org.hawkular.metrics.tasks.api.TaskScheduler
    public Observable<Task2> scheduleTask(String str, String str2, int i, Map<String, String> map, Trigger trigger) {
        UUID randomUUID = UUID.randomUUID();
        int computeShard = computeShard(str2);
        UDTValue triggerValue = getTriggerValue(this.session, trigger);
        Date date = new Date(trigger.getTriggerTime());
        Task2Impl task2Impl = new Task2Impl(randomUUID, str2, i, str, map, trigger);
        log.debugf("Scheduling %s", task2Impl);
        Observable<ResultSet> execute = this.session.execute(this.queries.createTask2.bind(randomUUID, str2, Integer.valueOf(i), str, map, triggerValue));
        Observable<ResultSet> execute2 = this.session.execute(this.queries.insertIntoQueue.bind(date, Integer.valueOf(computeShard), randomUUID, str2, Integer.valueOf(i), str, map, triggerValue));
        Observable<ResultSet> execute3 = this.session.execute(this.queries.createLease.bind(date, Integer.valueOf(computeShard)));
        return Observable.create(subscriber -> {
            Observable.merge(execute, execute2, execute3).subscribe(resultSet -> {
            }, th -> {
                subscriber.onError(new RuntimeException("Failed to schedule task [" + task2Impl + "]", th));
            }, () -> {
                try {
                    subscriber.onNext(task2Impl);
                    subscriber.onCompleted();
                } catch (Throwable th2) {
                    subscriber.onError(th2);
                }
            });
        });
    }

    private Observable<Task2Impl> rescheduleTask(Task2Impl task2Impl) {
        if (task2Impl.getTrigger().nextTrigger() == null) {
            log.debugf("There are no more executions for %s", task2Impl);
            return Observable.just(task2Impl);
        }
        int computeShard = computeShard(task2Impl.getGroupKey());
        Task2Impl task2Impl2 = new Task2Impl(task2Impl.getId(), task2Impl.getGroupKey(), task2Impl.getOrder(), task2Impl.getName(), task2Impl.getParameters(), task2Impl.getTrigger().nextTrigger());
        UDTValue triggerValue = getTriggerValue(this.session, task2Impl2.getTrigger());
        Date date = new Date(task2Impl2.getTrigger().getTriggerTime());
        if (log.isDebugEnabled()) {
            log.debug("Next execution time for Task2Impl{id=" + task2Impl2.getId() + ", name=" + task2Impl2.getName() + "} is " + new Date(task2Impl2.getTrigger().getTriggerTime()));
        }
        Observable<ResultSet> execute = this.session.execute(this.queries.insertIntoQueue.bind(date, Integer.valueOf(computeShard), task2Impl2.getId(), task2Impl.getGroupKey(), Integer.valueOf(task2Impl.getOrder()), task2Impl2.getName(), task2Impl2.getParameters(), triggerValue), this.tasksScheduler);
        Observable<ResultSet> execute2 = this.session.execute(this.queries.createLease.bind(date, Integer.valueOf(computeShard)), this.tasksScheduler);
        return Observable.create(subscriber -> {
            Observable.merge(execute, execute2).subscribe(resultSet -> {
            }, th -> {
                subscriber.onError(new RuntimeException("Failed to reschedule " + task2Impl2, th));
            }, () -> {
                try {
                    log.debugf("Received result set for reschedule task, %s", task2Impl2);
                    subscriber.onNext(task2Impl2);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            });
        });
    }

    int computeShard(String str) {
        return Hashing.consistentHash(this.hashFunction.hashBytes(str.getBytes()), this.numShards);
    }

    private static KeyspaceMetadata getKeyspace(RxSession rxSession) {
        return rxSession.getCluster().getMetadata().getKeyspace(rxSession.getLoggedKeyspace());
    }

    static Trigger getTrigger(UDTValue uDTValue) {
        int i = uDTValue.getInt("type");
        switch (i) {
            case 0:
                return new SingleExecutionTrigger(uDTValue.getLong("trigger_time"));
            case 1:
                return new RepeatingTrigger(uDTValue.getLong("interval"), uDTValue.getLong("delay"), uDTValue.getLong("trigger_time"), uDTValue.getInt("repeat_count"), uDTValue.getInt("execution_count"));
            default:
                throw new IllegalArgumentException("Trigger type [" + i + "] is not supported");
        }
    }

    static UDTValue getTriggerValue(RxSession rxSession, Trigger trigger) {
        if (trigger instanceof RepeatingTrigger) {
            return getRepeatingTriggerValue(rxSession, (RepeatingTrigger) trigger);
        }
        if (trigger instanceof SingleExecutionTrigger) {
            return getSingleExecutionTriggerValue(rxSession, (SingleExecutionTrigger) trigger);
        }
        throw new IllegalArgumentException(trigger.getClass() + " is not a supported trigger type");
    }

    static UDTValue getSingleExecutionTriggerValue(RxSession rxSession, SingleExecutionTrigger singleExecutionTrigger) {
        UDTValue newValue = getKeyspace(rxSession).getUserType("trigger_def").newValue();
        newValue.setInt("type", 0);
        newValue.setLong("trigger_time", singleExecutionTrigger.getTriggerTime());
        return newValue;
    }

    static UDTValue getRepeatingTriggerValue(RxSession rxSession, RepeatingTrigger repeatingTrigger) {
        UDTValue newValue = getKeyspace(rxSession).getUserType("trigger_def").newValue();
        newValue.setInt("type", 1);
        newValue.setLong("interval", repeatingTrigger.getInterval());
        newValue.setLong("trigger_time", repeatingTrigger.getTriggerTime());
        if (repeatingTrigger.getDelay() > 0) {
            newValue.setLong("delay", repeatingTrigger.getDelay());
        }
        if (repeatingTrigger.getRepeatCount() != null) {
            newValue.setInt("repeat_count", repeatingTrigger.getRepeatCount().intValue());
            newValue.setInt("execution_count", repeatingTrigger.getExecutionCount());
        }
        return newValue;
    }
}
