package org.hawkular.metrics.tasks.impl;

import com.datastax.driver.core.ResultSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.DateTimeService;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;

/* loaded from: input_file:WEB-INF/lib/hawkular-metrics-task-queue-0.3.5-SNAPSHOT.jar:org/hawkular/metrics/tasks/impl/TaskServiceImpl.class */
public class TaskServiceImpl implements TaskService {
    private static final Logger logger = LoggerFactory.getLogger(TaskServiceImpl.class);
    private RxSession rxSession;
    private Queries queries;
    private List<TaskType> taskTypes;
    private LeaseService leaseService;
    private String owner;
    private DateTimeService dateTimeService;
    private ScheduledExecutorService ticker = Executors.newScheduledThreadPool(1);
    private ExecutorService scheduler = Executors.newSingleThreadExecutor();
    private ListeningExecutorService workers = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
    private Duration timeSliceDuration = Duration.standardMinutes(1);
    private TimeUnit timeUnit = TimeUnit.MINUTES;
    private Map<TaskType, PublishSubject<Task>> subjects = new HashMap();
    private Func2<TaskContainer, Task, TaskContainer> executeTask = (taskContainer, task) -> {
        try {
            this.subjects.get(task.getTaskType()).onNext(task);
        } catch (Exception e) {
            taskContainer.getFailedTimeSlices().add(task.getTimeSlice());
        }
        return taskContainer;
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.hawkular.metrics.tasks.impl.TaskServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-task-queue-0.3.5-SNAPSHOT.jar:org/hawkular/metrics/tasks/impl/TaskServiceImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$java$util$concurrent$TimeUnit = new int[TimeUnit.values().length];

        static {
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.SECONDS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.MINUTES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.HOURS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public TaskServiceImpl(RxSession rxSession, Queries queries, LeaseService leaseService, List<TaskType> list) {
        try {
            this.rxSession = rxSession;
            this.queries = queries;
            this.leaseService = leaseService;
            this.taskTypes = list;
            this.dateTimeService = new DateTimeService();
            this.owner = InetAddress.getLocalHost().getHostName();
            list.forEach(taskType -> {
                this.subjects.put(taskType, PublishSubject.create());
            });
            new SchemaManager(rxSession.getSession()).createSchema(System.getProperty("keyspace", "hawkular_metrics"));
        } catch (UnknownHostException e) {
            throw new RuntimeException("Failed to initialize owner name", e);
        } catch (IOException e2) {
            throw new RuntimeException("Failed to initialize schema", e2);
        }
    }

    public void setTimeUnit(TimeUnit timeUnit) {
        switch (AnonymousClass1.$SwitchMap$java$util$concurrent$TimeUnit[timeUnit.ordinal()]) {
            case 1:
                this.timeUnit = TimeUnit.SECONDS;
                this.timeSliceDuration = Duration.standardSeconds(1L);
                return;
            case 2:
                this.timeUnit = TimeUnit.MINUTES;
                this.timeSliceDuration = Duration.standardMinutes(1L);
                return;
            case 3:
                this.timeUnit = TimeUnit.HOURS;
                this.timeSliceDuration = Duration.standardMinutes(60L);
                return;
            default:
                throw new IllegalArgumentException(timeUnit + " is not a supported time unit");
        }
    }

    @Override // org.hawkular.metrics.tasks.api.TaskService
    public void start() {
        this.ticker.scheduleAtFixedRate(() -> {
            DateTime timeSlice = this.dateTimeService.getTimeSlice(DateTime.now(), this.timeSliceDuration);
            this.scheduler.submit(() -> {
                executeTasks(timeSlice);
            });
        }, 0L, 1L, this.timeUnit);
    }

    @Override // org.hawkular.metrics.tasks.api.TaskService
    public void shutdown() {
        logger.info("Shutting down");
        this.leaseService.shutdown();
        this.ticker.shutdownNow();
        this.scheduler.shutdownNow();
        this.workers.shutdown();
        try {
            logger.debug("Waiting for active jobs to finish");
            this.workers.awaitTermination(1L, this.timeUnit);
        } catch (InterruptedException e) {
            logger.info("The shutdown process has been interrupted. Attempting to forcibly terminate active jobs.");
            this.workers.shutdownNow();
        }
    }

    public Subscription subscribe(TaskType taskType, Action1<? super Task> action1, Action1<Throwable> action12, Action0 action0) {
        PublishSubject<Task> publishSubject = this.subjects.get(taskType);
        if (publishSubject == null) {
            throw new IllegalArgumentException(taskType + " is not a recognized task type");
        }
        return publishSubject.subscribe(action1, action12, action0);
    }

    public Subscription subscribe(TaskType taskType, Action1<? super Task> action1) {
        PublishSubject<Task> publishSubject = this.subjects.get(taskType);
        if (publishSubject == null) {
            throw new IllegalArgumentException(taskType + " is not a recognized task type");
        }
        return publishSubject.subscribe(action1);
    }

    public Observable<TaskContainer> findTasks(String str, DateTime dateTime, int i) {
        TaskType findTaskType = findTaskType(str);
        return this.rxSession.execute(this.queries.findTasks.bind(str, dateTime.toDate(), Integer.valueOf(i))).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return new TaskContainer(findTaskType, row.getString(0), dateTime, i, row.getString(1), row.getSet(2, String.class), row.getInt(3), row.getInt(4), (Set) row.getSet(5, Date.class).stream().map((v1) -> {
                return new DateTime(v1);
            }).collect(Collectors.toSet()));
        });
    }

    public Observable<TaskContainer> findTasks(Lease lease, TaskType taskType) {
        int segmentOffset = lease.getSegmentOffset();
        return Observable.range(segmentOffset, segmentOffset + taskType.getSegments()).flatMap(num -> {
            return findTasks(lease.getTaskType(), lease.getTimeSlice(), num.intValue());
        });
    }

    private TaskType findTaskType(String str) {
        return this.taskTypes.stream().filter(taskType -> {
            return taskType.getName().equals(str);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException(str + " is not a recognized task type");
        });
    }

    @Override // org.hawkular.metrics.tasks.api.TaskService
    public Observable<Task> scheduleTask(DateTime dateTime, Task task) {
        findTaskType(task.getTaskType().getName());
        return scheduleTaskAt(this.dateTimeService.getTimeSlice(dateTime, getDuration(task.getInterval())).plus(getDuration(task.getInterval())), task).map(dateTime2 -> {
            return new TaskImpl(task.getTaskType(), task.getTenantId(), dateTime2, task.getTarget(), task.getSources(), task.getInterval(), task.getWindow());
        });
    }

    private Observable<TaskContainer> rescheduleTask(TaskContainer taskContainer) {
        TaskType taskType = taskContainer.getTaskType();
        DateTime plus = taskContainer.getTimeSlice().plus(getDuration(taskContainer.getInterval()));
        int abs = Math.abs(taskContainer.getTarget().hashCode() % taskType.getSegments());
        int segments = taskType.getSegments() / taskType.getSegmentOffsets();
        int i = (abs / segments) * segments;
        Observable<ResultSet> execute = taskContainer.getFailedTimeSlices().isEmpty() ? this.rxSession.execute(this.queries.createTask.bind(taskType.getName(), taskContainer.getTenantId(), plus.toDate(), Integer.valueOf(abs), taskContainer.getTarget(), taskContainer.getSources(), Integer.valueOf(taskContainer.getInterval()), Integer.valueOf(taskContainer.getWindow()))) : this.rxSession.execute(this.queries.createTaskWithFailures.bind(taskType.getName(), taskContainer.getTenantId(), plus.toDate(), Integer.valueOf(abs), taskContainer.getTarget(), taskContainer.getSources(), Integer.valueOf(taskContainer.getInterval()), Integer.valueOf(taskContainer.getWindow()), toDates(taskContainer.getFailedTimeSlices())));
        Observable<ResultSet> execute2 = this.rxSession.execute(this.queries.createLease.bind(plus.toDate(), taskType.getName(), Integer.valueOf(i)));
        Observable<ResultSet> observable = execute;
        return Observable.create(subscriber -> {
            Observable concatWith = observable.concatWith(execute2);
            Action1 action1 = resultSet -> {
            };
            subscriber.getClass();
            concatWith.subscribe(action1, subscriber::onError, () -> {
                subscriber.onNext(taskContainer);
                subscriber.onCompleted();
            });
        });
    }

    private Set<Date> toDates(Set<DateTime> set) {
        return (Set) set.stream().map((v0) -> {
            return v0.toDate();
        }).collect(Collectors.toSet());
    }

    private Observable<DateTime> scheduleTaskAt(DateTime dateTime, Task task) {
        TaskType taskType = task.getTaskType();
        int abs = Math.abs(task.getTarget().hashCode() % taskType.getSegments());
        int segments = taskType.getSegments() / taskType.getSegmentOffsets();
        int i = (abs / segments) * segments;
        Observable<ResultSet> execute = this.rxSession.execute(this.queries.createTask.bind(taskType.getName(), task.getTenantId(), dateTime.toDate(), Integer.valueOf(abs), task.getTarget(), task.getSources(), Integer.valueOf(task.getInterval()), Integer.valueOf(task.getWindow())));
        Observable<ResultSet> execute2 = this.rxSession.execute(this.queries.createLease.bind(dateTime.toDate(), taskType.getName(), Integer.valueOf(i)));
        return Observable.create(subscriber -> {
            Observable concatWith = execute.concatWith(execute2);
            Action1 action1 = resultSet -> {
            };
            subscriber.getClass();
            concatWith.subscribe(action1, subscriber::onError, () -> {
                subscriber.onNext(dateTime);
                subscriber.onCompleted();
            });
        });
    }

    void executeTasks(DateTime dateTime) {
        try {
            this.taskTypes.forEach(taskType -> {
                executeTasks(dateTime, taskType);
            });
            this.leaseService.deleteLeases(dateTime).toBlocking().lastOrDefault(null);
        } catch (Exception e) {
            logger.warn("Failed to delete lease partition for time slice " + dateTime, (Throwable) e);
        }
    }

    private void executeTasks(DateTime dateTime, TaskType taskType) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Observable<R> flatMap = this.leaseService.findUnfinishedLeases(dateTime).filter(lease -> {
            return Boolean.valueOf(lease.getTaskType().equals(taskType.getName()));
        }).flatMap(lease2 -> {
            return findTasks(lease2, taskType).map(TaskContainer::copyWithoutFailures).flatMap(taskContainer -> {
                return Observable.from(taskContainer).reduce(taskContainer, this.executeTask);
            }).flatMap(this::rescheduleTask).flatMap(this::deleteTaskSegment).flatMap(resultSet -> {
                Observable<Boolean> finish = this.leaseService.finish(lease2);
                lease2.getClass();
                return finish.map((v1) -> {
                    return r1.setFinished(v1);
                });
            });
        });
        Action1 action1 = lease3 -> {
            if (lease3.isFinished()) {
                return;
            }
            logger.warn("Failed to mark {} finished", lease3);
        };
        Action1<Throwable> action12 = th -> {
            logger.warn("Task execution failed", th);
        };
        countDownLatch.getClass();
        flatMap.subscribe(action1, action12, countDownLatch::countDown);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.warn("There was an interrupt waiting for task execution of type " + taskType.getName() + "to complete for time slice " + dateTime, (Throwable) e);
        }
    }

    private Observable<ResultSet> deleteTaskSegment(TaskContainer taskContainer) {
        return this.rxSession.execute(this.queries.deleteTasks.bind(taskContainer.getTaskType().getName(), taskContainer.getTimeSlice().toDate(), Integer.valueOf(taskContainer.getSegment())));
    }

    private Duration getDuration(int i) {
        switch (AnonymousClass1.$SwitchMap$java$util$concurrent$TimeUnit[this.timeUnit.ordinal()]) {
            case 1:
                return Duration.standardSeconds(i);
            case 2:
                return Duration.standardMinutes(i);
            case 3:
                return Duration.standardHours(i);
            default:
                throw new IllegalArgumentException(this.timeUnit + " is not a supported time unit");
        }
    }
}
