package org.rhq.cassandra.schema;

import com.datastax.driver.core.Host;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics.class */
public class MigrateAggregateMetrics implements Step {
    private static final Log log = LogFactory.getLog(MigrateAggregateMetrics.class);
    private Session session;
    private DBConnectionFactory dbConnectionFactory;
    private PreparedStatement find1HourData;
    private PreparedStatement find6HourData;
    private PreparedStatement find24HourData;
    private RateLimiter writePermits;
    private Semaphore readPermits = new Semaphore(1);
    private AtomicInteger failedMigrations = new AtomicInteger();
    private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new SchemaUpdateThreadFactory()));
    private String dataDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics$Bucket.class */
    public enum Bucket {
        ONE_HOUR("one_hour"),
        SIX_HOUR("six_hour"),
        TWENTY_FOUR_HOUR("twenty_four_hour");

        private String tableName;

        Bucket(String str) {
            this.tableName = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.tableName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics$MetricsWriter.class */
    public class MetricsWriter implements Callable<Integer>, FutureCallback<ResultSet> {
        private Integer scheduleId;
        private Bucket bucket;
        private ResultSet resultSet;
        private boolean writeFailed;
        private AtomicInteger metricsMigrated = new AtomicInteger();

        public MetricsWriter(Integer num, Bucket bucket, ResultSet resultSet) {
            this.scheduleId = num;
            this.bucket = bucket;
            this.resultSet = resultSet;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            List<Row> all = this.resultSet.all();
            if (all.isEmpty()) {
                MigrateAggregateMetrics.log.debug("No " + this.bucket + " data to migrate for schedule id " + this.scheduleId);
                return 0;
            }
            Date date = ((Row) all.get(0)).getDate(1);
            Double d = null;
            Double d2 = null;
            Double d3 = null;
            Long valueOf = Long.valueOf(((Row) all.get(0)).getLong(5));
            Integer valueOf2 = Integer.valueOf(((Row) all.get(0)).getInt(4));
            for (Row row : all) {
                if (this.writeFailed) {
                    throw new Exception("Migration of " + this.bucket + " data for schedule id " + this.scheduleId + " failed");
                }
                Date date2 = row.getDate(1);
                if (date2.equals(date)) {
                    switch (row.getInt(2)) {
                        case 0:
                            d = Double.valueOf(row.getDouble(3));
                            break;
                        case 1:
                            d2 = Double.valueOf(row.getDouble(3));
                            break;
                        default:
                            d3 = Double.valueOf(row.getDouble(3));
                            break;
                    }
                } else {
                    if (isDataMissing(d3, d, d2)) {
                        MigrateAggregateMetrics.log.debug("We only have a partial " + this.bucket + " metric for {scheduleId: " + this.scheduleId + ", time: " + date.getTime() + "}. It will not be migrated.");
                    } else {
                        Futures.addCallback(writeMetrics(date, d3, d, d2, valueOf2, valueOf), this);
                    }
                    date = date2;
                    d = Double.valueOf(row.getDouble(3));
                    d2 = null;
                    d3 = null;
                    valueOf2 = Integer.valueOf(row.getInt(4));
                    valueOf = Long.valueOf(row.getLong(5));
                }
            }
            if (this.writeFailed) {
                throw new Exception("Migration of " + this.bucket + " data for schedule id " + this.scheduleId + " failed");
            }
            return Integer.valueOf(this.metricsMigrated.get());
        }

        private boolean isDataMissing(Double d, Double d2, Double d3) {
            return d == null || Double.isNaN(d.doubleValue()) || d2 == null || Double.isNaN(d2.doubleValue()) || d3 == null || Double.isNaN(d3.doubleValue());
        }

        public void onSuccess(ResultSet resultSet) {
            this.metricsMigrated.incrementAndGet();
        }

        public void onFailure(Throwable th) {
            this.writeFailed = true;
            MigrateAggregateMetrics.log.warn("Migration of " + this.bucket + " data for schedule id " + this.scheduleId + " failed", th);
        }

        private ResultSetFuture writeMetrics(Date date, Double d, Double d2, Double d3, Integer num, Long l) {
            MigrateAggregateMetrics.this.writePermits.acquire();
            return MigrateAggregateMetrics.this.session.executeAsync("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) VALUES (" + this.scheduleId + ", '" + this.bucket + "', " + date.getTime() + ", " + d + ", " + d2 + ", " + d3 + ") USING TTL " + num + " AND TIMESTAMP " + l);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/rhq/cassandra/schema/MigrateAggregateMetrics$MigrationProgressLogger.class */
    public class MigrationProgressLogger implements Runnable {
        private Bucket bucket;
        private CountDownLatch latch;
        private boolean finished;

        public MigrationProgressLogger(Bucket bucket, CountDownLatch countDownLatch) {
            this.bucket = bucket;
            this.latch = countDownLatch;
        }

        public void finished() {
            this.finished = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.finished && this.latch.getCount() > 0) {
                try {
                    MigrateAggregateMetrics.log.info("There are " + this.latch.getCount() + " remaining schedules for the " + this.bucket + " data migration");
                    Thread.sleep(30000L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @Override // org.rhq.cassandra.schema.Step
    public void setSession(Session session) {
        this.session = session;
    }

    @Override // org.rhq.cassandra.schema.Step
    public void bind(Properties properties) {
        this.dbConnectionFactory = (DBConnectionFactory) properties.get(SchemaManager.RELATIONAL_DB_CONNECTION_FACTORY_PROP);
        this.dataDir = properties.getProperty(SchemaManager.DATA_DIR, System.getProperty("jboss.server.data.dir"));
    }

    public String toString() {
        return getClass().getSimpleName();
    }

    @Override // org.rhq.cassandra.schema.Step
    public void execute() {
        if (this.dbConnectionFactory == null) {
            log.info("The relational database connection factory is not set. No data migration necessary");
        } else {
            this.writePermits = RateLimiter.create(calculatePermits(), 30L, TimeUnit.SECONDS);
            Stopwatch start = new Stopwatch().start();
            initPreparedStatements();
            Set<Integer> loadScheduleIds = loadScheduleIds();
            log.info("Migrating aggregate metrics for " + loadScheduleIds.size() + " schedule ids");
            migrate(loadScheduleIds, this.find1HourData, Bucket.ONE_HOUR);
            migrate(loadScheduleIds, this.find6HourData, Bucket.SIX_HOUR);
            migrate(loadScheduleIds, this.find24HourData, Bucket.TWENTY_FOUR_HOUR);
            start.stop();
            log.info("Finished aggregate metrics migration in " + start.elapsed(TimeUnit.SECONDS) + " seconds");
            if (this.failedMigrations.get() > 0) {
                throw new RuntimeException("There were " + this.failedMigrations.get() + " failed migrations. The upgrade will have to be run again to complete the migration.");
            }
        }
        dropTables();
    }

    private int calculatePermits() {
        return Integer.parseInt(System.getProperty("rhq.storage.request.limit", "20000")) * getNumberOfUpNodes();
    }

    private int getNumberOfUpNodes() {
        int i = 0;
        Iterator it = this.session.getCluster().getMetadata().getAllHosts().iterator();
        while (it.hasNext()) {
            if (((Host) it.next()).isUp()) {
                i++;
            }
        }
        return i;
    }

    private void migrate(Set<Integer> set, PreparedStatement preparedStatement, Bucket bucket) {
        log.info("Migrating " + bucket + " data for " + set.size() + " schedules");
        CountDownLatch countDownLatch = new CountDownLatch(set.size());
        MigrationProgressLogger migrationProgressLogger = new MigrationProgressLogger(bucket, countDownLatch);
        File file = new File(this.dataDir, bucket + "_migration.log");
        MigrationLog migrationLog = null;
        try {
            try {
                try {
                    migrationLog = new MigrationLog(file);
                    Set<Integer> read = migrationLog.read();
                    this.threadPool.submit(migrationProgressLogger);
                    for (Integer num : set) {
                        if (read.contains(num)) {
                            log.debug(bucket + " data for schedule id " + num + " has already been migrated. It will be skipped.");
                            countDownLatch.countDown();
                        } else {
                            this.readPermits.acquire();
                            Futures.addCallback(this.threadPool.submit(new MetricsWriter(num, bucket, this.session.execute(preparedStatement.bind(new Object[]{num})))), migrationFinished(num, bucket, countDownLatch, migrationLog));
                        }
                    }
                    countDownLatch.await();
                    log.info("Finished migrating " + bucket + " data");
                    migrationProgressLogger.finished();
                    try {
                        migrationLog.close();
                    } catch (IOException e) {
                        log.warn("There was an error closing " + file.getAbsolutePath(), e);
                    }
                } catch (Throwable th) {
                    migrationProgressLogger.finished();
                    try {
                        migrationLog.close();
                    } catch (IOException e2) {
                        log.warn("There was an error closing " + file.getAbsolutePath(), e2);
                    }
                    throw th;
                }
            } catch (InterruptedException e3) {
                this.threadPool.shutdownNow();
                throw new RuntimeException("Migration of " + bucket + " data did not complete due to an interrupt. The upgrade will have to be run again to finish the migration", e3);
            }
        } catch (IOException e4) {
            throw new RuntimeException("Migration of " + bucket + " data did not complete due to an I/O error. The upgrade will have to be run again to finish the migration", e4);
        }
    }

    private void initPreparedStatements() {
        this.find1HourData = this.session.prepare("SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics WHERE schedule_id = ?");
        this.find6HourData = this.session.prepare("SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics WHERE schedule_id = ?");
        this.find24HourData = this.session.prepare("SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics WHERE schedule_id = ?");
    }

    private Set<Integer> loadScheduleIds() {
        Connection connection = null;
        Statement statement = null;
        java.sql.ResultSet resultSet = null;
        try {
            try {
                connection = this.dbConnectionFactory.newConnection();
                statement = connection.createStatement();
                resultSet = statement.executeQuery("SELECT s.id FROM rhq_measurement_sched s INNER JOIN rhq_measurement_def d on s.definition = d.id WHERE d.data_type = 0");
                HashSet hashSet = new HashSet();
                while (resultSet.next()) {
                    hashSet.add(Integer.valueOf(resultSet.getInt(1)));
                }
                if (resultSet != null) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        log.info("There was an error closing the SQL result set", e);
                    }
                }
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e2) {
                        log.info("There was an error closing the SQL statement", e2);
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e3) {
                        log.info("There was an error closing the SQL connection", e3);
                    }
                }
                return hashSet;
            } catch (SQLException e4) {
                throw new RuntimeException("Cannot migrate aggregate metrics. There was an error loading schedule ids", e4);
            }
        } catch (Throwable th) {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e5) {
                    log.info("There was an error closing the SQL result set", e5);
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e6) {
                    log.info("There was an error closing the SQL statement", e6);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e7) {
                    log.info("There was an error closing the SQL connection", e7);
                }
            }
            throw th;
        }
    }

    private void dropTables() {
        Iterator it = this.session.execute("SELECT columnfamily_name FROM system.schema_columnfamilies WHERE keyspace_name = 'rhq'").iterator();
        while (it.hasNext()) {
            String string = ((Row) it.next()).getString(0);
            if (string.equals("one_hour_metrics") || string.equals("six_hour_metrics") || string.equals("twenty_four_hour_metrics")) {
                log.info("Dropping table " + string);
                this.session.execute("DROP table rhq." + string);
            }
        }
    }

    private FutureCallback<Integer> migrationFinished(final Integer num, final Bucket bucket, final CountDownLatch countDownLatch, final MigrationLog migrationLog) {
        return new FutureCallback<Integer>() { // from class: org.rhq.cassandra.schema.MigrateAggregateMetrics.1
            public void onSuccess(Integer num2) {
                countDownLatch.countDown();
                MigrateAggregateMetrics.this.readPermits.release();
                try {
                    migrationLog.write(num.intValue());
                } catch (IOException e) {
                    MigrateAggregateMetrics.log.warn("Failed to update migration log for bucket " + bucket + " and schedule id " + num);
                }
            }

            public void onFailure(Throwable th) {
                countDownLatch.countDown();
                MigrateAggregateMetrics.this.readPermits.release();
                MigrateAggregateMetrics.this.failedMigrations.incrementAndGet();
            }
        };
    }
}
