package org.rhq.cassandra.schema;

import com.datastax.driver.core.Host;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Batch;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Seconds;
import org.rhq.core.util.exception.ThrowableUtil;

/* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics.class */
public class MigrateAggregateMetrics implements Step {
    private static final Log log = LogFactory.getLog(MigrateAggregateMetrics.class);
    public static final int DEFAULT_WARM_UP = 20;
    private static final double RATE_INCREASE_PER_NODE = 0.3d;
    private Session session;
    private String dataDir;
    private DBConnectionFactory dbConnectionFactory;
    private MetricsRegistry metricsRegistry;
    private Meter migrationsMeter;
    private RateMonitor rateMonitor;
    private KeyScanner keyScanner;
    private MigrationProgressLogger progressLogger;
    private AtomicReference<RateLimiter> readPermitsRef = new AtomicReference<>();
    private AtomicReference<RateLimiter> writePermitsRef = new AtomicReference<>();
    private AtomicInteger remaining1HourMetrics = new AtomicInteger();
    private AtomicInteger remaining6HourMetrics = new AtomicInteger();
    private AtomicInteger remaining24HourMetrics = new AtomicInteger();
    private AtomicInteger migrated1HourMetrics = new AtomicInteger();
    private AtomicInteger migrated6HourMetrics = new AtomicInteger();
    private AtomicInteger migrated24HourMetrics = new AtomicInteger();
    private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(6, new SchemaUpdateThreadFactory()));
    private AtomicInteger readErrors = new AtomicInteger();
    private AtomicInteger writeErrors = new AtomicInteger();

    /* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics$Bucket.class */
    public enum Bucket {
        ONE_HOUR("one_hour"),
        SIX_HOUR("six_hour"),
        TWENTY_FOUR_HOUR("twenty_four_hour");

        private String tableName;

        Bucket(String str) {
            this.tableName = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.tableName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics$MigrationProgressLogger.class */
    public class MigrationProgressLogger implements Runnable {
        private boolean finished;
        private boolean reportMigrationRates;

        private MigrationProgressLogger() {
        }

        public void finished() {
            this.finished = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.finished) {
                try {
                    MigrateAggregateMetrics.log.info("Remaining metrics to migrate\n" + Bucket.ONE_HOUR + ": " + MigrateAggregateMetrics.this.remaining1HourMetrics + "\n" + Bucket.SIX_HOUR + ": " + MigrateAggregateMetrics.this.remaining6HourMetrics + "\n" + Bucket.TWENTY_FOUR_HOUR + ": " + MigrateAggregateMetrics.this.remaining24HourMetrics + "\n");
                    MigrateAggregateMetrics.log.info("ErrorCounts{read:" + MigrateAggregateMetrics.this.readErrors + ", write: " + MigrateAggregateMetrics.this.writeErrors + "}");
                    if (this.reportMigrationRates) {
                        MigrateAggregateMetrics.log.info("Metrics migration rates:\n1 min rate: " + MigrateAggregateMetrics.this.migrationsMeter.oneMinuteRate() + "\n5 min rate: " + MigrateAggregateMetrics.this.migrationsMeter.fiveMinuteRate() + " \n15 min rate: " + MigrateAggregateMetrics.this.migrationsMeter.fifteenMinuteRate() + "\n");
                        this.reportMigrationRates = false;
                    } else {
                        this.reportMigrationRates = true;
                    }
                    Thread.sleep(30000L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics$ReadResults.class */
    public static class ReadResults {
        Set<Integer> failedReads;
        Map<Integer, List<Row>> resultSets;

        private ReadResults() {
            this.failedReads = Collections.newSetFromMap(new ConcurrentHashMap());
            this.resultSets = new ConcurrentHashMap();
        }
    }

    @Override // org.rhq.cassandra.schema.Step
    public void setSession(Session session) {
        this.session = session;
    }

    @Override // org.rhq.cassandra.schema.Step
    public void bind(Properties properties) {
        this.dbConnectionFactory = (DBConnectionFactory) properties.get(SchemaManager.RELATIONAL_DB_CONNECTION_FACTORY_PROP);
        this.dataDir = properties.getProperty(SchemaManager.DATA_DIR, System.getProperty("jboss.server.data.dir"));
    }

    @Override // org.rhq.cassandra.schema.Step
    public void execute() {
        log.info("Starting data migration");
        this.metricsRegistry = new MetricsRegistry();
        this.migrationsMeter = this.metricsRegistry.newMeter(MigrateAggregateMetrics.class, "migrations", "migrations", TimeUnit.MINUTES);
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            try {
                try {
                    try {
                        if (this.dbConnectionFactory == null) {
                            log.info("The relational database connection factory is not set. No data migration necessary");
                            createStarted.stop();
                            log.info("Finished migrating " + this.migrated1HourMetrics + " " + Bucket.ONE_HOUR + ", " + this.migrated6HourMetrics + " " + Bucket.SIX_HOUR + ", and " + this.migrated24HourMetrics + " " + Bucket.TWENTY_FOUR_HOUR + " metrics in " + createStarted.elapsed(TimeUnit.SECONDS) + " sec");
                            shutdown();
                            return;
                        }
                        this.progressLogger = new MigrationProgressLogger();
                        this.rateMonitor = new RateMonitor(this.readPermitsRef, this.writePermitsRef);
                        this.keyScanner = new KeyScanner(this.session);
                        Set<Integer> scanFor1HourKeys = this.keyScanner.scanFor1HourKeys();
                        Set<Integer> scanFor6HourKeys = this.keyScanner.scanFor6HourKeys();
                        Set<Integer> scanFor24HourKeys = this.keyScanner.scanFor24HourKeys();
                        this.keyScanner.shutdown();
                        log.info("There are " + scanFor1HourKeys.size() + " schedule ids with " + Bucket.ONE_HOUR + " data");
                        log.info("There are " + scanFor6HourKeys.size() + " schedule ids with " + Bucket.SIX_HOUR + " data");
                        log.info("There are " + scanFor24HourKeys.size() + " schedule ids with " + Bucket.TWENTY_FOUR_HOUR + " data");
                        this.writePermitsRef.set(RateLimiter.create(getWriteLimit(getNumberOfUpNodes()), 20L, TimeUnit.SECONDS));
                        this.readPermitsRef.set(RateLimiter.create(getReadLimit(getNumberOfUpNodes()), 20L, TimeUnit.SECONDS));
                        log.info("The request limits are " + this.writePermitsRef.get().getRate() + " writes/sec and " + this.readPermitsRef.get().getRate() + " reads/sec");
                        this.remaining1HourMetrics.set(scanFor1HourKeys.size());
                        this.remaining6HourMetrics.set(scanFor6HourKeys.size());
                        this.remaining24HourMetrics.set(scanFor24HourKeys.size());
                        this.threadPool.submit(this.progressLogger);
                        this.threadPool.submit(this.rateMonitor);
                        migrate1HourData(scanFor1HourKeys);
                        migrate6HourData(scanFor6HourKeys);
                        migrate24HourData(scanFor24HourKeys);
                        if (this.remaining1HourMetrics.get() > 0 || this.remaining6HourMetrics.get() > 0 || this.remaining24HourMetrics.get() > 0) {
                            throw new RuntimeException("There are unfinished metrics migrations - {one_hour: " + this.remaining1HourMetrics + ", six_hour: " + this.remaining6HourMetrics + ", twenty_four_hour: " + this.remaining24HourMetrics + "}. The upgrade will have to be run again.");
                        }
                        dropTables();
                        createStarted.stop();
                        log.info("Finished migrating " + this.migrated1HourMetrics + " " + Bucket.ONE_HOUR + ", " + this.migrated6HourMetrics + " " + Bucket.SIX_HOUR + ", and " + this.migrated24HourMetrics + " " + Bucket.TWENTY_FOUR_HOUR + " metrics in " + createStarted.elapsed(TimeUnit.SECONDS) + " sec");
                        shutdown();
                    } catch (IOException e) {
                        throw new RuntimeException("There was an unexpected I/O error. There are unfinished metrics migrations - " + getRemainingMetrics() + ". The upgrade will have to be run again.");
                    }
                } catch (Throwable th) {
                    throw new RuntimeException("There was an unexpected error. The upgrade will have to be rerun.", th);
                }
            } catch (InterruptedException e2) {
                throw new RuntimeException("The migration was interrupted. There are still " + getRemainingMetrics() + " unfinished metrics migrations. The upgrade will have to be run again.");
            } catch (AbortedException e3) {
                throw new RuntimeException("The key scan was aborted. The upgrade will have to be rerun.", e3);
            }
        } catch (Throwable th2) {
            createStarted.stop();
            log.info("Finished migrating " + this.migrated1HourMetrics + " " + Bucket.ONE_HOUR + ", " + this.migrated6HourMetrics + " " + Bucket.SIX_HOUR + ", and " + this.migrated24HourMetrics + " " + Bucket.TWENTY_FOUR_HOUR + " metrics in " + createStarted.elapsed(TimeUnit.SECONDS) + " sec");
            shutdown();
            throw th2;
        }
    }

    private String getRemainingMetrics() {
        return "{" + Bucket.ONE_HOUR + ": " + this.remaining1HourMetrics + ", " + Bucket.SIX_HOUR + ": " + this.remaining6HourMetrics + ", " + Bucket.TWENTY_FOUR_HOUR + ": " + this.remaining24HourMetrics + "}";
    }

    private double getWriteLimit(int i) {
        int parseInt = Integer.parseInt(System.getProperty("rhq.storage.request.write-limit", "10000"));
        return parseInt + (parseInt * RATE_INCREASE_PER_NODE * (i - 1));
    }

    private double getReadLimit(int i) {
        int parseInt = Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "25"));
        return parseInt + (parseInt * RATE_INCREASE_PER_NODE * (i - 1));
    }

    private int getNumberOfUpNodes() {
        int i = 0;
        Iterator it = this.session.getCluster().getMetadata().getAllHosts().iterator();
        while (it.hasNext()) {
            if (((Host) it.next()).isUp()) {
                i++;
            }
        }
        return i;
    }

    private void shutdown() {
        if (this.dbConnectionFactory != null) {
            try {
                log.info("Shutting down migration thread pools...");
                this.rateMonitor.shutdown();
                this.progressLogger.finished();
                if (this.keyScanner != null) {
                    this.keyScanner.shutdown();
                }
                this.threadPool.shutdown();
                this.threadPool.awaitTermination(5L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }
    }

    private void migrate1HourData(Set<Integer> set) throws IOException, InterruptedException {
        DateTime dateTime = DateUtils.get1HourTimeSlice(DateTime.now());
        migrate(set, this.session.prepare("SELECT time, type, value FROM rhq.one_hour_metrics WHERE schedule_id = ? AND time >= " + dateTime.minus(Days.days(14)).getMillis() + " AND time <= " + dateTime.getMillis()), Bucket.ONE_HOUR, this.remaining1HourMetrics, Days.days(14));
    }

    private void migrate6HourData(Set<Integer> set) throws IOException, InterruptedException {
        DateTime dateTime = DateUtils.get1HourTimeSlice(DateTime.now());
        migrate(set, this.session.prepare("SELECT time, type, value FROM rhq.six_hour_metrics WHERE schedule_id = ? AND time >= " + dateTime.minus(Days.days(31)).getMillis() + " AND time <= " + dateTime.getMillis()), Bucket.SIX_HOUR, this.remaining6HourMetrics, Days.days(31));
    }

    private void migrate24HourData(Set<Integer> set) throws IOException, InterruptedException {
        DateTime dateTime = DateUtils.get1HourTimeSlice(DateTime.now());
        migrate(set, this.session.prepare("SELECT time, type, value FROM rhq.twenty_four_hour_metrics WHERE schedule_id = ? AND time >= " + dateTime.minus(Days.days(365)).getMillis() + " AND time <= " + dateTime.getMillis()), Bucket.TWENTY_FOUR_HOUR, this.remaining24HourMetrics, Days.days(365));
    }

    private void migrate(Set<Integer> set, PreparedStatement preparedStatement, Bucket bucket, AtomicInteger atomicInteger, Days days) throws IOException, InterruptedException {
        Stopwatch createStarted = Stopwatch.createStarted();
        Set<Integer> read = new MigrationLog(new File(this.dataDir, bucket + "_migration.log")).read();
        ArrayDeque arrayDeque = new ArrayDeque(set);
        while (!arrayDeque.isEmpty()) {
            ReadResults readData = readData(getNextBatch(arrayDeque, read, atomicInteger), preparedStatement, bucket);
            Iterator<Integer> it = readData.failedReads.iterator();
            while (it.hasNext()) {
                arrayDeque.offer(it.next());
            }
            Map<Integer, List<Row>> writeData = writeData(readData.resultSets, bucket, days, atomicInteger);
            while (true) {
                Map<Integer, List<Row>> map = writeData;
                if (!map.isEmpty()) {
                    writeData = writeData(map, bucket, days, atomicInteger);
                }
            }
        }
        createStarted.stop();
        log.info("Finished migrating " + bucket + " data in " + createStarted.elapsed(TimeUnit.SECONDS) + " sec");
    }

    private List<Integer> getNextBatch(Queue<Integer> queue, Set<Integer> set, AtomicInteger atomicInteger) {
        ArrayList arrayList = new ArrayList(500);
        while (!queue.isEmpty() && arrayList.size() < 500) {
            Integer poll = queue.poll();
            if (set.contains(poll)) {
                atomicInteger.decrementAndGet();
            } else {
                arrayList.add(poll);
            }
        }
        return arrayList;
    }

    private ReadResults readData(List<Integer> list, PreparedStatement preparedStatement, final Bucket bucket) throws InterruptedException {
        final ReadResults readResults = new ReadResults();
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (final Integer num : list) {
            this.readPermitsRef.get().acquire();
            Futures.addCallback(this.session.executeAsync(preparedStatement.bind(new Object[]{num})), new FutureCallback<ResultSet>() { // from class: org.rhq.cassandra.schema.MigrateAggregateMetrics.1
                public void onSuccess(ResultSet resultSet) {
                    try {
                        readResults.resultSets.put(num, resultSet.all());
                        MigrateAggregateMetrics.this.rateMonitor.requestSucceeded();
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }

                public void onFailure(Throwable th) {
                    try {
                        if (MigrateAggregateMetrics.log.isDebugEnabled()) {
                            MigrateAggregateMetrics.log.debug("Failed to read " + bucket + " data for schedule id " + num, th);
                        } else {
                            MigrateAggregateMetrics.log.info("Failed to read " + bucket + " data for schedule id " + num + ": " + ThrowableUtil.getRootMessage(th));
                        }
                        readResults.failedReads.add(num);
                        MigrateAggregateMetrics.this.rateMonitor.requestFailed();
                        MigrateAggregateMetrics.this.readErrors.incrementAndGet();
                        countDownLatch.countDown();
                    } catch (Throwable th2) {
                        countDownLatch.countDown();
                        throw th2;
                    }
                }
            });
        }
        countDownLatch.await();
        return readResults;
    }

    private Map<Integer, List<Row>> writeData(Map<Integer, List<Row>> map, final Bucket bucket, final Days days, final AtomicInteger atomicInteger) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(map.size());
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (final Map.Entry<Integer, List<Row>> entry : map.entrySet()) {
            if (entry.getValue().isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("There is no " + bucket + " data for schedule id " + entry.getKey());
                }
                countDownLatch.countDown();
            } else {
                this.threadPool.submit(new Runnable() { // from class: org.rhq.cassandra.schema.MigrateAggregateMetrics.2
                    @Override // java.lang.Runnable
                    public void run() {
                        Futures.addCallback(MigrateAggregateMetrics.this.writeData((Integer) entry.getKey(), bucket, (List<Row>) entry.getValue(), days.toStandardSeconds()), new FutureCallback<List<ResultSet>>() { // from class: org.rhq.cassandra.schema.MigrateAggregateMetrics.2.1
                            public void onSuccess(List<ResultSet> list) {
                                try {
                                    if (MigrateAggregateMetrics.log.isDebugEnabled()) {
                                        MigrateAggregateMetrics.log.debug("Finished " + bucket + " data migration for schedule id " + entry.getKey());
                                    }
                                    MigrateAggregateMetrics.this.rateMonitor.requestSucceeded();
                                    atomicInteger.decrementAndGet();
                                    MigrateAggregateMetrics.this.migrationsMeter.mark();
                                    countDownLatch.countDown();
                                } catch (Throwable th) {
                                    countDownLatch.countDown();
                                    throw th;
                                }
                            }

                            public void onFailure(Throwable th) {
                                try {
                                    if (MigrateAggregateMetrics.log.isDebugEnabled()) {
                                        MigrateAggregateMetrics.log.debug("Failed to write " + bucket + " data for schedule id " + entry.getKey(), th);
                                    } else {
                                        MigrateAggregateMetrics.log.info("Failed to write " + bucket + " data for schedule id " + entry.getKey() + ": " + ThrowableUtil.getRootMessage(th));
                                    }
                                    MigrateAggregateMetrics.this.rateMonitor.requestFailed();
                                    MigrateAggregateMetrics.this.writeErrors.incrementAndGet();
                                    concurrentHashMap.put(entry.getKey(), entry.getValue());
                                    countDownLatch.countDown();
                                } catch (Throwable th2) {
                                    countDownLatch.countDown();
                                    throw th2;
                                }
                            }
                        });
                    }
                });
            }
        }
        countDownLatch.await();
        return concurrentHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<List<ResultSet>> writeData(Integer num, Bucket bucket, List<Row> list, Seconds seconds) {
        try {
            ArrayList arrayList = new ArrayList();
            Date date = list.get(0).getDate(0);
            Double d = null;
            Double d2 = null;
            Double d3 = null;
            ArrayList arrayList2 = new ArrayList(45);
            for (Row row : list) {
                Date date2 = row.getDate(0);
                if (date2.equals(date)) {
                    switch (row.getInt(1)) {
                        case 0:
                            d = Double.valueOf(row.getDouble(2));
                            break;
                        case 1:
                            d2 = Double.valueOf(row.getDouble(2));
                            break;
                        default:
                            d3 = Double.valueOf(row.getDouble(2));
                            break;
                    }
                } else {
                    Seconds secondsBetween = Seconds.secondsBetween(new DateTime(date), DateTime.now());
                    if (secondsBetween.isLessThan(seconds)) {
                        if (!isDataMissing(d3, d, d2)) {
                            arrayList2.add(createInsertStatement(num, bucket, date, d3, d, d2, seconds.getSeconds() - secondsBetween.getSeconds()));
                            if (arrayList2.size() == 45) {
                                arrayList.add(writeBatch(arrayList2));
                                arrayList2.clear();
                            }
                        } else if (log.isDebugEnabled()) {
                            log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + num + ", time: " + date.getTime() + "}. It will not be migrated.");
                        }
                    }
                    date = date2;
                    d = Double.valueOf(row.getDouble(2));
                    d2 = null;
                    d3 = null;
                }
            }
            if (!arrayList2.isEmpty()) {
                arrayList.add(writeBatch(arrayList2));
            }
            return Futures.allAsList(arrayList);
        } catch (Exception e) {
            return Futures.immediateFailedFuture(new Exception("There was an error writing " + bucket + " data for schedule id " + num, e));
        }
    }

    private boolean isDataMissing(Double d, Double d2, Double d3) {
        return d == null || Double.isNaN(d.doubleValue()) || d2 == null || Double.isNaN(d2.doubleValue()) || d3 == null || Double.isNaN(d3.doubleValue());
    }

    private ResultSetFuture writeBatch(List<Statement> list) {
        Batch batch = QueryBuilder.batch((Statement[]) list.toArray(new Statement[list.size()]));
        this.writePermitsRef.get().acquire();
        return this.session.executeAsync(batch);
    }

    private SimpleStatement createInsertStatement(Integer num, Bucket bucket, Date date, Double d, Double d2, Double d3, int i) {
        return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) VALUES (" + num + ", '" + bucket + "', " + date.getTime() + ", " + d + ", " + d2 + ", " + d3 + ") USING TTL " + i);
    }

    private void dropTables() {
        Iterator it = this.session.execute("SELECT columnfamily_name FROM system.schema_columnfamilies WHERE keyspace_name = 'rhq'").iterator();
        while (it.hasNext()) {
            String string = ((Row) it.next()).getString(0);
            if (string.equals("one_hour_metrics") || string.equals("six_hour_metrics") || string.equals("twenty_four_hour_metrics")) {
                log.info("Dropping table " + string);
                this.session.execute("DROP table rhq." + string);
            }
        }
    }
}
