package org.rhq.cassandra.schema;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Duration;
import org.joda.time.Hours;

/* loaded from: input_file:org/rhq/cassandra/schema/ReplaceRHQ411Index.class */
public class ReplaceRHQ411Index {
    private static final int NUM_PARTITIONS = 10;
    private Session session;
    private PreparedStatement find411IndexEntries;
    private PreparedStatement find411IndexEntriesAfterScheduleId;
    private PreparedStatement updateNewIndex;
    private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new SchemaUpdateThreadFactory()));
    private AtomicReference<RateLimiter> writePermitsRef = new AtomicReference<>();
    private RateMonitor rateMonitor;
    private static final Log log = LogFactory.getLog(ReplaceRHQ411Index.class);
    private static final int PAGE_SIZE = Integer.parseInt(System.getProperty("rhq.metrics.index.page-size", "2500"));

    public ReplaceRHQ411Index(Session session) {
        this.session = session;
    }

    public void execute(DateRanges dateRanges) {
        log.info("Updating indexes");
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            try {
                waitForSchemaPropagation();
                initPreparedStatements();
                this.writePermitsRef.set(RateLimiter.create(Integer.parseInt(System.getProperty("rhq.storage.request.write-limit", "20000"))));
                this.rateMonitor = new RateMonitor(new AtomicReference(RateLimiter.create(100.0d)), this.writePermitsRef);
                this.threadPool.submit(this.rateMonitor);
                updateRawIndex(dateRanges.rawStartTime, dateRanges.rawEndTime);
                update1HourIndex(dateRanges.oneHourStartTime, dateRanges.oneHourEndTime);
                update6HourIndex(dateRanges.sixHourStartTime, dateRanges.sixHourEndTime);
                drop411Index();
                createStarted.stop();
                shutdown();
                log.info("Finished updating indexes in " + createStarted.elapsed(TimeUnit.MILLISECONDS) + " ms");
            } catch (InterruptedException e) {
                throw new RuntimeException("The index update failed due to an interrupt", e);
            }
        } catch (Throwable th) {
            createStarted.stop();
            shutdown();
            log.info("Finished updating indexes in " + createStarted.elapsed(TimeUnit.MILLISECONDS) + " ms");
            throw th;
        }
    }

    private void shutdown() {
        this.rateMonitor.shutdown();
        this.threadPool.shutdown();
    }

    private void initPreparedStatements() {
        this.find411IndexEntries = this.session.prepare("SELECT schedule_id FROM rhq.metrics_index WHERE bucket = ? AND time = ? LIMIT " + PAGE_SIZE);
        this.find411IndexEntriesAfterScheduleId = this.session.prepare("SELECT schedule_id FROM rhq.metrics_index WHERE bucket = ? AND time = ? AND schedule_id > ? LIMIT " + PAGE_SIZE);
        this.updateNewIndex = this.session.prepare("INSERT INTO rhq.metrics_idx (bucket, partition, time, schedule_id) VALUES (?, ?, ?, ?)");
    }

    private void waitForSchemaPropagation() throws InterruptedException {
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            if (!this.session.execute("SELECT columnfamily_name FROM system.schema_columnfamilies WHERE keyspace_name = 'rhq' AND columnfamily_name = 'metrics_idx'").isExhausted()) {
                return;
            }
            Thread.sleep(1000L);
        }
        throw new RuntimeException("The metrics_idx table does not exist. The upgrade needs to be rerun to ensure that the index table is created and is updated.");
    }

    private void updateRawIndex(DateTime dateTime, DateTime dateTime2) {
        log.info("Updating raw index");
        updateIndex("one_hour_metrics", "raw", dateTime, dateTime2, Hours.ONE.toStandardDuration());
    }

    private void update1HourIndex(DateTime dateTime, DateTime dateTime2) {
        log.info("Updating one_hour index");
        updateIndex("six_hour_metrics", "one_hour", dateTime, dateTime2, Hours.SIX.toStandardDuration());
    }

    private void update6HourIndex(DateTime dateTime, DateTime dateTime2) {
        log.info("Updating six_hour index");
        updateIndex("twenty_four_hour_metrics", "six_hour", dateTime, dateTime2, Days.ONE.toStandardDuration());
    }

    private void updateIndex(String str, String str2, DateTime dateTime, DateTime dateTime2, Duration duration) {
        BoundStatement bind;
        try {
            DateTime dateTime3 = dateTime;
            ResultSet execute = this.session.execute(this.find411IndexEntries.bind(new Object[]{str, dateTime.toDate()}));
            int i = 0;
            int i2 = 0;
            TaskTracker taskTracker = new TaskTracker();
            do {
                Iterator it = execute.iterator();
                while (it.hasNext()) {
                    i2 = ((Row) it.next()).getInt(0);
                    i++;
                    addScheduleIdToIndex(str2, duration, dateTime3, i2, taskTracker);
                }
                if (i < PAGE_SIZE) {
                    dateTime3 = DateUtils.plusDSTAware(dateTime3, duration);
                    bind = this.find411IndexEntries.bind(new Object[]{str, dateTime3.toDate()});
                } else {
                    bind = this.find411IndexEntriesAfterScheduleId.bind(new Object[]{str, dateTime3.toDate(), Integer.valueOf(i2)});
                }
                i = 0;
                execute = this.session.execute(bind);
            } while (!dateTime3.isAfter(dateTime2));
            taskTracker.finishedSchedulingTasks();
            taskTracker.waitForTasksToFinish();
        } catch (InterruptedException e) {
            throw new RuntimeException("The index update did not complete due to an interrupt", e);
        } catch (AbortedException e2) {
            throw new RuntimeException("The index update was aborted", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addScheduleIdToIndex(String str, Duration duration, DateTime dateTime, int i, TaskTracker taskTracker) {
        taskTracker.addTask();
        int i2 = i % NUM_PARTITIONS;
        this.writePermitsRef.get().acquire();
        Futures.addCallback(this.session.executeAsync(this.updateNewIndex.bind(new Object[]{str, Integer.valueOf(i2), DateUtils.getUTCTimeSlice(dateTime, duration).toDate(), Integer.valueOf(i)})), indexUpdated(str, duration, dateTime, i, taskTracker), this.threadPool);
    }

    private FutureCallback<ResultSet> indexUpdated(final String str, final Duration duration, final DateTime dateTime, final int i, final TaskTracker taskTracker) {
        return new FutureCallback<ResultSet>() { // from class: org.rhq.cassandra.schema.ReplaceRHQ411Index.1
            public void onSuccess(ResultSet resultSet) {
                taskTracker.finishedTask();
            }

            public void onFailure(Throwable th) {
                ReplaceRHQ411Index.this.rateMonitor.requestFailed();
                ReplaceRHQ411Index.this.addScheduleIdToIndex(str, duration, dateTime, i, taskTracker);
            }
        };
    }

    private void drop411Index() {
        log.info("Dropping table metrics_index");
        this.session.execute("DROP table rhq.metrics_index");
    }
}
