/*
 * Decompiled with CFR 0.152.
 */
package org.rhq.server.metrics.aggregation;

import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.rhq.server.metrics.AbortedException;
import org.rhq.server.metrics.ArithmeticMeanCalculator;
import org.rhq.server.metrics.DateTimeService;
import org.rhq.server.metrics.MetricsConfiguration;
import org.rhq.server.metrics.MetricsDAO;
import org.rhq.server.metrics.StorageResultSetFuture;
import org.rhq.server.metrics.aggregation.Batch;
import org.rhq.server.metrics.aggregation.IndexIterator;
import org.rhq.server.metrics.aggregation.TaskTracker;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateNumericMetricMapper;
import org.rhq.server.metrics.domain.Bucket;
import org.rhq.server.metrics.domain.IndexBucket;
import org.rhq.server.metrics.domain.IndexEntry;
import org.rhq.server.metrics.domain.NumericMetric;
import org.rhq.server.metrics.domain.RawNumericMetricMapper;
import org.rhq.server.metrics.domain.ResultSetMapper;

class DataAggregator<T extends NumericMetric> {
    private final Log log = LogFactory.getLog(this.getClass());
    private MetricsDAO dao;
    private IndexBucket bucket;
    private AsyncFunction<List<AggregateNumericMetric>, List<ResultSet>> persistMetrics;
    private Semaphore permits;
    private ListeningExecutorService aggregationTasks;
    private DateTimeService dateTimeService;
    private TaskTracker taskTracker = new TaskTracker();
    private MetricsConfiguration configuration;
    private AtomicInteger schedulesCount = new AtomicInteger();
    private BatchFinishedListener batchFinishedListener;
    private ResultSetMapper resultSetMapper;
    private Duration timeSliceDuration;
    private int batchSize;

    DataAggregator() {
    }

    void setDao(MetricsDAO dao) {
        this.dao = dao;
    }

    public void setBucket(IndexBucket bucket) {
        this.bucket = bucket;
        this.resultSetMapper = bucket == IndexBucket.RAW ? new RawNumericMetricMapper() : new AggregateNumericMetricMapper();
    }

    void setPersistMetrics(AsyncFunction<List<AggregateNumericMetric>, List<ResultSet>> persistMetrics) {
        this.persistMetrics = persistMetrics;
    }

    void setPermits(Semaphore permits) {
        this.permits = permits;
    }

    void setAggregationTasks(ListeningExecutorService aggregationTasks) {
        this.aggregationTasks = aggregationTasks;
    }

    public void setTimeSliceDuration(Duration timeSliceDuration) {
        this.timeSliceDuration = timeSliceDuration;
    }

    void setDateTimeService(DateTimeService dateTimeService) {
        this.dateTimeService = dateTimeService;
    }

    public void setConfiguration(MetricsConfiguration configuration) {
        this.configuration = configuration;
    }

    void setBatchFinishedListener(BatchFinishedListener batchFinishedListener) {
        this.batchFinishedListener = batchFinishedListener;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    protected void aggregationTaskFinished(ListenableFuture<List<AggregateNumericMetric>> metricsFuture, ListenableFuture<List<ResultSet>> deletedIndexEntriesFuture) {
        ListenableFuture argsFuture = Futures.allAsList((ListenableFuture[])new ListenableFuture[]{metricsFuture, deletedIndexEntriesFuture});
        Futures.addCallback((ListenableFuture)argsFuture, (FutureCallback)new AggregationTaskFinishedCallback<List<List<?>>>(){

            @Override
            protected void onFinish(List<List<?>> args) {
                List<AggregateNumericMetric> metrics = args.get(0);
                if (DataAggregator.this.batchFinishedListener != null) {
                    DataAggregator.this.batchFinishedListener.onFinish(metrics);
                }
                DataAggregator.this.schedulesCount.addAndGet(metrics.size());
            }
        }, (Executor)this.aggregationTasks);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int execute(DateTime start, DateTime end) throws InterruptedException, AbortedException {
        this.log.info((Object)("Starting " + (Object)((Object)this.bucket) + " data aggregation"));
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            IndexIterator iterator = new IndexIterator(start, end, this.bucket, this.dao, this.configuration);
            Batch batch = new Batch();
            while (iterator.hasNext()) {
                IndexEntry indexEntry = iterator.next();
                if (batch.getStartTime() == null) {
                    batch.setStartTime(indexEntry.getTimestamp());
                    batch.setEndTime(new DateTime(indexEntry.getTimestamp()).plus((ReadableDuration)this.timeSliceDuration));
                }
                if (batch.getStartTime().getMillis() == indexEntry.getTimestamp()) {
                    batch.add(indexEntry);
                    if (batch.size() != this.batchSize) continue;
                    this.submitAggregationTask(batch);
                    batch = new Batch();
                    continue;
                }
                this.submitAggregationTask(batch);
                batch = new Batch().setStartTime(indexEntry.getTimestamp()).setEndTime(new DateTime(indexEntry.getTimestamp()).plus((ReadableDuration)this.timeSliceDuration)).add(indexEntry);
            }
            if (batch.size() > 0) {
                this.submitAggregationTask(batch);
            }
            iterator = null;
            this.taskTracker.finishedSchedulingTasks();
            this.taskTracker.waitForTasksToFinish();
        }
        catch (InterruptedException e) {
            this.log.warn((Object)"There was an interrupt while scheduling aggregation tasks.", (Throwable)e);
            this.taskTracker.abort("There was an interrupt while scheduling aggregation tasks.");
        }
        catch (Exception e) {
            this.log.warn((Object)"There was an unexpected error scheduling aggregation tasks", (Throwable)e);
            this.taskTracker.abort("There was an unexpected error scheduling aggregation tasks: " + e.getMessage());
        }
        finally {
            stopwatch.stop();
            this.log.info((Object)("Finished " + this.schedulesCount + " " + (Object)((Object)this.bucket) + " data aggregations in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"));
        }
        return this.schedulesCount.get();
    }

    protected void submitAggregationTask(Batch batch) throws InterruptedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("Scheduling " + (Object)((Object)this.bucket) + " aggregation task for " + batch));
        }
        this.permits.acquire();
        this.aggregationTasks.submit((Runnable)new AggregationTask(batch){

            @Override
            void run(Batch batch) {
                switch (DataAggregator.this.bucket) {
                    case RAW: {
                        DataAggregator.this.fetchRawData(batch);
                        DataAggregator.this.processBatch(batch, Bucket.ONE_HOUR);
                        break;
                    }
                    case ONE_HOUR: {
                        DataAggregator.this.fetchData(batch, Bucket.ONE_HOUR);
                        DataAggregator.this.processBatch(batch, Bucket.SIX_HOUR);
                        break;
                    }
                    default: {
                        DataAggregator.this.fetchData(batch, Bucket.SIX_HOUR);
                        DataAggregator.this.processBatch(batch, Bucket.TWENTY_FOUR_HOUR);
                    }
                }
            }
        });
        this.taskTracker.addTask();
    }

    protected void fetchRawData(Batch batch) {
        ArrayList<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>();
        for (IndexEntry indexEntry : batch) {
            queryFutures.add(this.dao.findRawMetricsAsync(indexEntry.getScheduleId(), batch.getStartTime().getMillis(), batch.getEndTime().getMillis()));
        }
        batch.setQueriesFuture((ListenableFuture<List<ResultSet>>)Futures.allAsList(queryFutures));
    }

    protected void fetchData(Batch batch, Bucket bucket) {
        ArrayList<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>();
        for (IndexEntry indexEntry : batch) {
            queryFutures.add(this.dao.findAggregateMetricsAsync(indexEntry.getScheduleId(), bucket, batch.getStartTime().getMillis(), batch.getEndTime().getMillis()));
        }
        batch.setQueriesFuture((ListenableFuture<List<ResultSet>>)Futures.allAsList(queryFutures));
    }

    protected void processBatch(Batch batch, Bucket bucket) {
        ListenableFuture iterableFuture = Futures.transform(batch.getQueriesFuture(), this.toIterable(), (Executor)this.aggregationTasks);
        ListenableFuture metricsFuture = Futures.transform((ListenableFuture)iterableFuture, this.computeAggregates(batch.getStartTime().getMillis(), bucket), (Executor)this.aggregationTasks);
        ListenableFuture insertsFuture = Futures.transform((ListenableFuture)metricsFuture, this.persistMetrics, (Executor)this.aggregationTasks);
        ListenableFuture deleteIndexEntriesFuture = Futures.transform((ListenableFuture)insertsFuture, this.deleteIndexEntries(batch), (Executor)this.aggregationTasks);
        this.aggregationTaskFinished((ListenableFuture<List<AggregateNumericMetric>>)metricsFuture, (ListenableFuture<List<ResultSet>>)deleteIndexEntriesFuture);
    }

    protected Function<List<ResultSet>, Iterable<List<T>>> toIterable() {
        return new Function<List<ResultSet>, Iterable<List<T>>>(){

            public Iterable<List<T>> apply(final List<ResultSet> resultSets) {
                return new Iterable<List<T>>(){
                    private Iterator<ResultSet> resultSetIterator;
                    {
                        this.resultSetIterator = resultSets.iterator();
                    }

                    @Override
                    public Iterator<List<T>> iterator() {
                        return new Iterator<List<T>>(){

                            @Override
                            public boolean hasNext() {
                                return resultSetIterator.hasNext();
                            }

                            @Override
                            public List<T> next() {
                                return DataAggregator.this.resultSetMapper.mapAll((ResultSet)resultSetIterator.next());
                            }

                            @Override
                            public void remove() {
                                throw new UnsupportedOperationException();
                            }
                        };
                    }
                };
            }
        };
    }

    protected Function<Iterable<? extends Collection<T>>, List<AggregateNumericMetric>> computeAggregates(final long timeSlice, final Bucket bucket) {
        return new Function<Iterable<? extends Collection<T>>, List<AggregateNumericMetric>>(){

            public List<AggregateNumericMetric> apply(Iterable<? extends Collection<T>> values) {
                ArrayList<AggregateNumericMetric> aggregates = new ArrayList<AggregateNumericMetric>(DataAggregator.this.batchSize);
                for (Collection metricList : values) {
                    if (metricList.isEmpty()) {
                        DataAggregator.this.log.warn((Object)("Cannot compute a new " + AggregateNumericMetric.class.getSimpleName() + " from an empty list. The bucket is " + (Object)((Object)bucket) + " and the time slice is " + timeSlice));
                        continue;
                    }
                    aggregates.add(DataAggregator.this.computeAggregate(metricList, timeSlice, bucket));
                }
                return aggregates;
            }
        };
    }

    private AggregateNumericMetric computeAggregate(Collection<T> metrics, long timeSlice, Bucket bucket) {
        Iterator<T> iterator = metrics.iterator();
        NumericMetric metric = (NumericMetric)iterator.next();
        int scheduleId = metric.getScheduleId();
        Double min = metric.getMin();
        Double max = metric.getMax();
        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
        mean.add(metric.getAvg());
        while (iterator.hasNext()) {
            metric = (NumericMetric)iterator.next();
            mean.add(metric.getAvg());
            if (metric.getMin() < min) {
                min = metric.getMin();
            }
            if (!(metric.getMax() > max)) continue;
            max = metric.getMax();
        }
        return new AggregateNumericMetric(scheduleId, bucket, mean.getArithmeticMean(), min, max, timeSlice);
    }

    protected AsyncFunction<List<ResultSet>, List<ResultSet>> deleteIndexEntries(final Batch batch) {
        return new AsyncFunction<List<ResultSet>, List<ResultSet>>(){

            public ListenableFuture<List<ResultSet>> apply(List<ResultSet> insertResultSets) throws Exception {
                ArrayList<StorageResultSetFuture> deleteFutures = new ArrayList<StorageResultSetFuture>();
                for (IndexEntry indexEntry : batch) {
                    deleteFutures.add(DataAggregator.this.dao.deleteIndexEntry(indexEntry));
                }
                return Futures.allAsList(deleteFutures);
            }
        };
    }

    protected class AggregationTaskFinishedCallback<R>
    implements FutureCallback<R> {
        protected AggregationTaskFinishedCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(R args) {
            try {
                this.onFinish(args);
            }
            finally {
                DataAggregator.this.permits.release();
                DataAggregator.this.taskTracker.finishedTask();
                if (DataAggregator.this.log.isDebugEnabled()) {
                    DataAggregator.this.log.debug((Object)("There are " + DataAggregator.this.taskTracker.getRemainingTasks() + " remaining " + (Object)((Object)DataAggregator.this.bucket) + " aggregation tasks and " + DataAggregator.this.permits.availablePermits() + " available permits"));
                }
            }
        }

        protected void onFinish(R args) {
        }

        public void onFailure(Throwable t) {
            DataAggregator.this.log.warn((Object)"There was an error aggregating data", t);
            DataAggregator.this.permits.release();
            DataAggregator.this.taskTracker.finishedTask();
            if (DataAggregator.this.log.isDebugEnabled()) {
                DataAggregator.this.log.debug((Object)("There are " + DataAggregator.this.taskTracker.getRemainingTasks() + " remaining tasks and " + DataAggregator.this.permits.availablePermits() + " available permits"));
            }
        }
    }

    protected abstract class AggregationTask
    implements Runnable {
        private Batch batch;

        public AggregationTask(Batch batch) {
            this.batch = batch;
        }

        @Override
        public void run() {
            try {
                this.run(this.batch);
            }
            catch (Exception e) {
                DataAggregator.this.log.error((Object)"Aggregation will be aborted due to an unexpected error", (Throwable)e);
                DataAggregator.this.taskTracker.abort("Aborting aggregation due to an unexpected error: " + e.getMessage());
            }
        }

        abstract void run(Batch var1);
    }

    static interface BatchFinishedListener {
        public void onFinish(List<AggregateNumericMetric> var1);
    }
}

