package org.rhq.server.metrics.aggregation;

import com.datastax.driver.core.ResultSet;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Duration;
import org.rhq.core.util.exception.ThrowableUtil;
import org.rhq.server.metrics.AbortedException;
import org.rhq.server.metrics.DateTimeService;
import org.rhq.server.metrics.MetricsConfiguration;
import org.rhq.server.metrics.MetricsDAO;
import org.rhq.server.metrics.SignalingCountDownLatch;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.MetricsTable;

/* loaded from: input_file:lib/rhq-server-metrics-4.10.0.jar:org/rhq/server/metrics/aggregation/Aggregator.class */
public class Aggregator {
    private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = new Comparator<AggregateNumericMetric>() { // from class: org.rhq.server.metrics.aggregation.Aggregator.1
        @Override // java.util.Comparator
        public int compare(AggregateNumericMetric aggregateNumericMetric, AggregateNumericMetric aggregateNumericMetric2) {
            if (aggregateNumericMetric.getScheduleId() < aggregateNumericMetric2.getScheduleId()) {
                return -1;
            }
            return aggregateNumericMetric.getScheduleId() == aggregateNumericMetric2.getScheduleId() ? 0 : 1;
        }
    };
    private MetricsDAO dao;
    private MetricsConfiguration configuration;
    private DateTimeService dtService;
    private DateTime startTime;
    private AggregationState state;
    private final Log log = LogFactory.getLog(Aggregator.class);
    private Set<AggregateNumericMetric> oneHourData = new ConcurrentSkipListSet(AGGREGATE_COMPARATOR);

    public Aggregator(ListeningExecutorService listeningExecutorService, MetricsDAO metricsDAO, MetricsConfiguration metricsConfiguration, DateTimeService dateTimeService, DateTime dateTime, int i, int i2) {
        this.dao = metricsDAO;
        this.configuration = metricsConfiguration;
        this.dtService = dateTimeService;
        this.startTime = dateTime;
        DateTime dateTime2 = get6HourTimeSlice();
        DateTime dateTime3 = get24HourTimeSlice();
        this.state = new AggregationState().setDao(metricsDAO).setStartTime(dateTime).setBatchSize(i).setAggregationTasks(listeningExecutorService).setPermits(new Semaphore(i2 * i, true)).setRawAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1))).setOneHourAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1))).setSixHourAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1))).setOneHourTimeSlice(dateTime).setOneHourTimeSliceEnd(dateTime.plus(metricsConfiguration.getRawTimeSliceDuration())).setSixHourTimeSlice(dateTime2).setSixHourTimeSliceEnd(dateTime2.plus(metricsConfiguration.getOneHourTimeSliceDuration())).setTwentyFourHourTimeSlice(dateTime3).setTwentyFourHourTimeSliceEnd(dateTime3.plus(metricsConfiguration.getSixHourTimeSliceDuration())).setCompute1HourData(new Compute1HourData(dateTime, dateTime2, metricsDAO, this.oneHourData)).setCompute6HourData(new Compute6HourData(dateTime2, dateTime3, metricsDAO)).setCompute24HourData(new Compute24HourData(dateTime3, metricsDAO)).set6HourTimeSliceFinished(hasTimeSliceEnded(dateTime2, metricsConfiguration.getOneHourTimeSliceDuration())).set24HourTimeSliceFinished(hasTimeSliceEnded(dateTime3, metricsConfiguration.getSixHourTimeSliceDuration())).setRemainingRawData(new AtomicInteger(0)).setRemaining1HourData(new AtomicInteger(0)).setRemaining6HourData(new AtomicInteger(0));
    }

    private DateTime get24HourTimeSlice() {
        return this.dtService.getTimeSlice(this.startTime, this.configuration.getSixHourTimeSliceDuration());
    }

    private DateTime get6HourTimeSlice() {
        return this.dtService.getTimeSlice(this.startTime, this.configuration.getOneHourTimeSliceDuration());
    }

    private boolean hasTimeSliceEnded(DateTime dateTime, Duration duration) {
        return DateTimeComparator.getInstance().compare(currentHour(), dateTime.plus(duration)) >= 0;
    }

    protected DateTime currentHour() {
        return this.dtService.getTimeSlice(this.dtService.now(), this.configuration.getRawTimeSliceDuration());
    }

    public Set<AggregateNumericMetric> run() {
        this.log.info("Starting aggregation for time slice " + this.startTime);
        try {
            Stopwatch start = new Stopwatch().start();
            ArrayList arrayList = new ArrayList(3);
            arrayList.add(MetricsTable.ONE_HOUR);
            Futures.addCallback(this.dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR, this.startTime.getMillis()), new RawDataScheduler(this.state), this.state.getAggregationTasks());
            this.state.getRawAggregationDone().await();
            start.stop();
            this.log.info("Finished aggregating raw data in " + start.elapsed(TimeUnit.MILLISECONDS) + " ms");
            if (this.state.is6HourTimeSliceFinished()) {
                this.log.info("Starting aggregation of 1 hour data");
                start.reset().start();
                arrayList.add(MetricsTable.SIX_HOUR);
                Futures.addCallback(this.dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR, this.state.getSixHourTimeSlice().getMillis()), new OneHourDataScheduler(this.state), this.state.getAggregationTasks());
                this.state.getOneHourAggregationDone().await();
                start.stop();
                this.log.info("Finished aggregating one hour data in " + start.elapsed(TimeUnit.MILLISECONDS) + " ms");
            }
            if (this.state.is24HourTimeSliceFinished()) {
                this.log.info("Starting aggregation of 6 hour data");
                start.reset().start();
                arrayList.add(MetricsTable.TWENTY_FOUR_HOUR);
                Futures.addCallback(this.dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR, this.state.getTwentyFourHourTimeSlice().getMillis()), new SixHourDataScheduler(this.state), this.state.getAggregationTasks());
                this.state.getSixHourAggregationDone().await();
                start.stop();
                this.log.info("Finished aggregating six hour data in " + start.elapsed(TimeUnit.MILLISECONDS) + " ms");
            }
            CountDownLatch countDownLatch = new CountDownLatch(arrayList.size());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                deleteIndexEntries((MetricsTable) it.next(), countDownLatch);
            }
            countDownLatch.await();
            return this.oneHourData;
        } catch (InterruptedException e) {
            this.log.info("There was an interrupt while waiting for aggregation to finish. Aggregation will be aborted.");
            return Collections.emptySet();
        } catch (AbortedException e2) {
            this.log.warn("Aggregation has been aborted: " + e2.getMessage());
            return Collections.emptySet();
        }
    }

    private void deleteIndexEntries(final MetricsTable metricsTable, final CountDownLatch countDownLatch) {
        DateTime twentyFourHourTimeSlice;
        switch (metricsTable) {
            case ONE_HOUR:
                twentyFourHourTimeSlice = this.startTime;
                break;
            case SIX_HOUR:
                twentyFourHourTimeSlice = this.state.getSixHourTimeSlice();
                break;
            default:
                twentyFourHourTimeSlice = this.state.getTwentyFourHourTimeSlice();
                break;
        }
        this.log.debug("Deleting " + metricsTable + " index entries for time slice " + twentyFourHourTimeSlice);
        final DateTime dateTime = twentyFourHourTimeSlice;
        Futures.addCallback(this.dao.deleteMetricsIndexEntriesAsync(metricsTable, twentyFourHourTimeSlice.getMillis()), new FutureCallback<ResultSet>() { // from class: org.rhq.server.metrics.aggregation.Aggregator.2
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ResultSet resultSet) {
                countDownLatch.countDown();
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (Aggregator.this.log.isDebugEnabled()) {
                    Aggregator.this.log.debug("Failed to delete index entries for table " + metricsTable + " at time [" + dateTime + "]. An unexpected error occurred.", th);
                } else {
                    Aggregator.this.log.warn("Failed to delete index entries for table " + metricsTable + " at time [" + dateTime + "]. An unexpected error occurred: " + ThrowableUtil.getRootMessage(th));
                }
                countDownLatch.countDown();
            }
        });
    }
}
