/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.core.service;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.hawkular.metrics.core.service.AvailabilityDataPointCollector;
import org.hawkular.metrics.core.service.DataAccess;
import org.hawkular.metrics.core.service.DataRetentionsMapper;
import org.hawkular.metrics.core.service.Functions;
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.core.service.MetricsThreadFactory;
import org.hawkular.metrics.core.service.Order;
import org.hawkular.metrics.core.service.PatternUtil;
import org.hawkular.metrics.core.service.SumNumericBucketPointCollector;
import org.hawkular.metrics.core.service.VoidSubscriber;
import org.hawkular.metrics.core.service.compress.CompressedPointContainer;
import org.hawkular.metrics.core.service.log.CoreLogger;
import org.hawkular.metrics.core.service.log.CoreLogging;
import org.hawkular.metrics.core.service.tags.ExpressionTagQueryParser;
import org.hawkular.metrics.core.service.tags.SimpleTagQueryParser;
import org.hawkular.metrics.core.service.tags.TagsConverter;
import org.hawkular.metrics.core.service.transformers.DataPointCompressTransformer;
import org.hawkular.metrics.core.service.transformers.DataPointDecompressTransformer;
import org.hawkular.metrics.core.service.transformers.MetricFromDataRowTransformer;
import org.hawkular.metrics.core.service.transformers.MetricIdentifierFromFullDataRowTransformer;
import org.hawkular.metrics.core.service.transformers.MetricsIndexRowTransformer;
import org.hawkular.metrics.core.service.transformers.NumericBucketPointTransformer;
import org.hawkular.metrics.core.service.transformers.SortedMerge;
import org.hawkular.metrics.core.service.transformers.TaggedBucketPointTransformer;
import org.hawkular.metrics.core.service.transformers.TempTableCompressTransformer;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.AvailabilityBucketPoint;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.BucketPoint;
import org.hawkular.metrics.model.Buckets;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.model.NamedDataPoint;
import org.hawkular.metrics.model.NumericBucketPoint;
import org.hawkular.metrics.model.Percentile;
import org.hawkular.metrics.model.Retention;
import org.hawkular.metrics.model.TaggedBucketPoint;
import org.hawkular.metrics.model.Tenant;
import org.hawkular.metrics.model.Utils;
import org.hawkular.metrics.model.exception.MetricAlreadyExistsException;
import org.hawkular.metrics.model.exception.RuntimeApiError;
import org.hawkular.metrics.model.exception.TenantAlreadyExistsException;
import org.hawkular.metrics.model.param.BucketConfig;
import org.hawkular.metrics.model.param.Tags;
import org.hawkular.metrics.model.param.TimeRange;
import org.hawkular.metrics.sysconfig.Configuration;
import org.hawkular.metrics.sysconfig.ConfigurationService;
import org.joda.time.Duration;
import rx.Completable;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func6;
import rx.observable.ListenableFutureObservable;
import rx.schedulers.Schedulers;

public class MetricsServiceImpl
implements MetricsService {
    private static final CoreLogger log = CoreLogging.getCoreLogger(MetricsServiceImpl.class);
    private static final long DAY_TO_MILLIS = 86400000L;
    public static final String SYSTEM_TENANT_ID = Functions.makeSafe("sysconfig");
    private final Map<DataRetentionKey, Integer> dataRetentions = new ConcurrentHashMap<DataRetentionKey, Integer>();
    private ListeningExecutorService metricsTasks;
    private DataAccess dataAccess;
    private ConfigurationService configurationService;
    private MetricRegistry metricRegistry;
    private Map<MetricType<?>, Func1<Observable<? extends Metric<?>>, Observable<Integer>>> pointsInserter;
    private Map<MetricType<?>, Func6<? extends MetricId<?>, Long, Long, Integer, Order, Integer, Observable<Row>>> dataPointFinders;
    private Map<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> dataPointMappers;
    private Map<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> tempDataPointMappers;
    private boolean disableACostOptimization;
    private SimpleTagQueryParser tagQueryParser;
    private ExpressionTagQueryParser expresssionTagQueryParser;
    private int defaultTTL = Duration.standardDays(7L).toStandardSeconds().getSeconds();
    private int DEFAULT_RETENTION = (int)Duration.standardSeconds(this.defaultTTL).getStandardDays();
    private int maxStringSize;
    private long insertRetryMaxDelay;
    private int insertMaxRetries;
    private int defaultPageSize;

    public void startUp(Session session, String keyspace, boolean resetDb, MetricRegistry metricRegistry) {
        this.startUp(session, keyspace, resetDb, true, metricRegistry);
    }

    public void startUp(Session session, String keyspace, boolean resetDb, boolean createSchema, MetricRegistry metricRegistry) {
        session.execute("USE " + keyspace);
        log.infoKeyspaceUsed(keyspace);
        this.metricsTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory()));
        this.loadDataRetentions();
        this.metricRegistry = metricRegistry;
        this.pointsInserter = ImmutableMap.builder().put(MetricType.GAUGE, metric -> {
            Observable gauge = metric;
            return this.dataAccess.insertData(gauge);
        }).put(MetricType.COUNTER, metric -> {
            Observable counter = metric;
            return this.dataAccess.insertData(counter);
        }).put(MetricType.AVAILABILITY, metric -> {
            Observable avail = metric;
            return this.dataAccess.insertData(avail);
        }).put(MetricType.STRING, metric -> {
            Observable string = metric;
            return this.dataAccess.insertStringDatas((Observable<Metric<String>>)string, this::getTTL, this.maxStringSize);
        }).build();
        this.dataPointFinders = ImmutableMap.builder().put(MetricType.STRING, (metricId, start, end, limit, order, pageSize) -> {
            MetricId stringId = metricId;
            return this.dataAccess.findStringData(stringId, (long)start, (long)end, (int)limit, (Order)((Object)order), (int)pageSize);
        }).build();
        this.dataPointMappers = ImmutableMap.builder().put(MetricType.GAUGE, Functions::getGaugeDataPoint).put(MetricType.AVAILABILITY, Functions::getAvailabilityDataPoint).put(MetricType.COUNTER, Functions::getCounterDataPoint).put(MetricType.STRING, Functions::getStringDataPoint).build();
        this.tempDataPointMappers = ImmutableMap.builder().put(MetricType.GAUGE, Functions::getTempGaugeDataPoint).put(MetricType.COUNTER, Functions::getTempCounterDataPoint).put(MetricType.AVAILABILITY, Functions::getTempAvailabilityDataPoint).build();
        this.initConfiguration(session);
        this.setDefaultTTL(session, keyspace);
        this.verifyAndCreateTempTables();
        int defaultPageSize = session.getCluster().getConfiguration().getQueryOptions().getFetchSize();
        int pageThreshold = Integer.getInteger("hawkular.metrics.page-threshold", 10);
        this.tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this, this.disableACostOptimization, defaultPageSize, pageThreshold);
        this.expresssionTagQueryParser = new ExpressionTagQueryParser(this.dataAccess, this);
    }

    void loadDataRetentions() {
        List<String> tenantIds = this.loadTenantIds();
        CountDownLatch latch = new CountDownLatch(tenantIds.size() * 2);
        for (String tenantId : tenantIds) {
            DataRetentionsMapper gaugeMapper = new DataRetentionsMapper(tenantId, MetricType.GAUGE);
            DataRetentionsMapper availMapper = new DataRetentionsMapper(tenantId, MetricType.AVAILABILITY);
            ResultSetFuture gaugeFuture = this.dataAccess.findDataRetentions(tenantId, MetricType.GAUGE);
            ResultSetFuture availabilityFuture = this.dataAccess.findDataRetentions(tenantId, MetricType.AVAILABILITY);
            ListenableFuture<Set<Retention>> gaugeRetentions = Futures.transform(gaugeFuture, gaugeMapper, (Executor)this.metricsTasks);
            ListenableFuture<Set<Retention>> availabilityRetentions = Futures.transform(availabilityFuture, availMapper, (Executor)this.metricsTasks);
            Futures.addCallback(gaugeRetentions, new DataRetentionsLoadedCallback(tenantId, MetricType.GAUGE, latch));
            Futures.addCallback(availabilityRetentions, new DataRetentionsLoadedCallback(tenantId, MetricType.AVAILABILITY, latch));
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    void unloadDataRetentions() {
        this.dataRetentions.clear();
    }

    private Meter getDataPointsInserted() {
        return this.metricRegistry.meter("DataPointsInserted");
    }

    private Timer getRawDataReadLatency() {
        return this.metricRegistry.timer("RawDataReadLatency");
    }

    private Timer getMetricsTagsQueryLatency() {
        return this.metricRegistry.timer("MetricTagsQueryLatency");
    }

    private void initConfiguration(Session session) {
        Configuration configuration = (Configuration)this.configurationService.load("org.hawkular.metrics").toBlocking().lastOrDefault(null);
        String configMaxStringSize = configuration.get("string-size");
        this.maxStringSize = configMaxStringSize == null ? -1 : Integer.parseInt(configMaxStringSize);
        log.infoMaxSizeStringMetrics(this.maxStringSize);
        this.insertRetryMaxDelay = Long.parseLong(configuration.get("ingestion.retry.max-delay", "30000"));
        this.insertMaxRetries = Integer.parseInt(configuration.get("ingestion.retry.max-retries", "5"));
        log.infoInsertRetryConfig(this.insertMaxRetries, this.insertRetryMaxDelay);
        this.defaultPageSize = Integer.parseInt(configuration.get("page-size", "5000"));
        this.disableACostOptimization = Boolean.parseBoolean(configuration.get("disable.parser.optimization", "false"));
    }

    private void setDefaultTTL(Session session, String keyspace) {
        ResultSet resultSet = session.execute("select default_time_to_live from system_schema.tables where keyspace_name = '" + keyspace + "' and table_name = 'data'");
        List rows = resultSet.all();
        if (rows.isEmpty()) {
            throw new IllegalStateException("Failed to find " + keyspace + ".data in system_schema.tables. Default data retention cannot be configured.");
        }
        int defaultTTL = ((Row)rows.get(0)).getInt(0);
        if (defaultTTL != this.defaultTTL) {
            session.execute("alter table " + keyspace + ".data with default_time_to_live = " + this.defaultTTL);
        }
        log.infoDefaultDataRetention(this.defaultTTL);
    }

    DataAccess getDataAccess() {
        return this.dataAccess;
    }

    public void setDataAccess(DataAccess dataAccess) {
        this.dataAccess = dataAccess;
    }

    public void setConfigurationService(ConfigurationService configurationService) {
        this.configurationService = configurationService;
    }

    public void setDefaultTTL(int defaultTTL) {
        this.defaultTTL = Duration.standardDays(defaultTTL).toStandardSeconds().getSeconds();
    }

    @Override
    public Observable<Void> createTenant(Tenant tenant, boolean overwrite) {
        return Observable.create(subscriber -> {
            Observable updates = this.dataAccess.insertTenant(tenant, overwrite).flatMap(resultSet -> {
                if (!resultSet.wasApplied()) {
                    throw new TenantAlreadyExistsException(tenant.getId());
                }
                Observable retentionUpdates = Observable.from(tenant.getRetentionSettings().entrySet()).flatMap(entry -> this.dataAccess.updateRetentionsIndex(tenant.getId(), (MetricType)entry.getKey(), ImmutableMap.of(Functions.makeSafe(((MetricType)entry.getKey()).getText()), entry.getValue()))).map(rs -> null);
                return retentionUpdates;
            });
            updates.subscribe(resultSet -> {}, arg_0 -> ((Subscriber)subscriber).onError(arg_0), () -> ((Subscriber)subscriber).onCompleted());
        });
    }

    @Override
    public Observable<Tenant> getTenants() {
        return this.dataAccess.findAllTenantIds().map(row -> row.getString(0)).distinct().flatMap(id -> this.dataAccess.findTenant((String)id).map(Functions::getTenant).switchIfEmpty(Observable.just((Object)new Tenant((String)id))));
    }

    private List<String> loadTenantIds() {
        Iterable tenantIds = this.dataAccess.findAllTenantIds().map(row -> row.getString(0)).distinct().toBlocking().toIterable();
        return ImmutableList.copyOf(tenantIds);
    }

    @Override
    public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
        MetricType<?> metricType = metric.getMetricId().getType();
        if (!metricType.isUserType()) {
            throw new IllegalArgumentException(metric + " cannot be created. " + metricType + " metrics are internally generated metrics and cannot be created by clients.");
        }
        ResultSetFuture future = this.dataAccess.insertMetricInMetricsIndex(metric, overwrite);
        Observable indexUpdated = ListenableFutureObservable.from((ListenableFuture)future, (Executor)this.metricsTasks);
        return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
            if (!overwrite && !resultSet.wasApplied()) {
                subscriber.onError((Throwable)new MetricAlreadyExistsException(metric));
            } else {
                ArrayList<Observable<ResultSet>> updates = new ArrayList<Observable<ResultSet>>();
                updates.add(this.dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()));
                if (metric.getDataRetention() != null) {
                    updates.add(this.updateRetentionsIndex(metric));
                }
                Observable.merge(updates).subscribe(new VoidSubscriber((Subscriber<? super Void>)subscriber));
            }
        }));
    }

    private Observable<ResultSet> updateRetentionsIndex(Metric<?> metric) {
        return ListenableFutureObservable.from((ListenableFuture)this.dataAccess.updateRetentionsIndex(metric), (Executor)this.metricsTasks).doOnCompleted(() -> this.dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention()));
    }

    @Override
    public Observable<MetricId<?>> findAllMetricIdentifiers() {
        return this.dataAccess.findAllMetricIdentifiersInData().compose((Observable.Transformer)new MetricIdentifierFromFullDataRowTransformer(this.defaultTTL)).distinct();
    }

    public <T> Observable.Transformer<MetricId<T>, Metric<T>> enrichToMetric() {
        return t -> t.flatMap(id -> this.dataAccess.findMetricInMetricsIndex(id).compose(new MetricsIndexRowTransformer(id.getTenantId(), id.getType(), this.defaultTTL)).switchIfEmpty(this.dataAccess.findMetricInData(id).compose(new MetricFromDataRowTransformer(id.getTenantId(), id.getType(), this.defaultTTL))));
    }

    @Override
    public <T> Observable<Metric<T>> findMetric(MetricId<T> id) {
        return Observable.just(id).compose(this.enrichToMetric());
    }

    @Override
    public <T> Observable<Metric<T>> findMetrics(String tenantId, MetricType<T> metricType) {
        Observable setFromMetricsIndex = null;
        Observable setFromData = this.dataAccess.findAllMetricIdentifiersInData().doOnError(Throwable::printStackTrace).filter(row -> tenantId.equals(row.getString(0))).compose((Observable.Transformer)new MetricIdentifierFromFullDataRowTransformer(this.defaultTTL)).distinct().map(m -> new Metric(m, this.DEFAULT_RETENTION));
        if (metricType == null) {
            setFromMetricsIndex = Observable.from(MetricType.userTypes()).map(type -> type).flatMap(type -> this.dataAccess.findMetricsInMetricsIndex(tenantId, type).compose(new MetricsIndexRowTransformer(tenantId, type, this.defaultTTL)));
        } else {
            setFromMetricsIndex = this.dataAccess.findMetricsInMetricsIndex(tenantId, metricType).compose(new MetricsIndexRowTransformer<T>(tenantId, metricType, this.defaultTTL));
            setFromData = setFromData.filter(m -> metricType.equals(m.getType()));
        }
        return setFromMetricsIndex.concatWith(setFromData).distinct(Metric::getMetricId);
    }

    @Override
    public <T> Observable<Metric<T>> scanAllMetricIndexes() {
        return this.dataAccess.scanMetricsInMetricsIndex().flatMap(r -> r).flatMap(r -> {
            MetricId metricId = new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)), r.getString(2));
            Metric metric = new Metric(metricId, r.getMap(3, String.class, String.class));
            return Observable.just(metric);
        });
    }

    @Override
    public <T> Observable<MetricId<T>> findMetricIdentifiersWithFilters(String tenantId, MetricType<T> metricType, String tags) {
        Observable results;
        Timer.Context context = this.getMetricsTagsQueryLatency().time();
        try {
            results = this.expresssionTagQueryParser.parse(tenantId, metricType, tags).map(tMetric -> tMetric);
        }
        catch (Exception e1) {
            try {
                Tags parsedSimpleTagQuery = TagsConverter.fromString(tags);
                results = this.tagQueryParser.findMetricIdentifiersWithFilters(tenantId, metricType, parsedSimpleTagQuery.getTags()).map(m -> m);
            }
            catch (Exception e2) {
                results = Observable.error((Throwable)new RuntimeApiError("Unparseable tag query expression.", e2));
            }
        }
        return results.doOnCompleted(context::stop);
    }

    @Override
    public <T> Func1<MetricId<T>, Boolean> idFilter(String regexp) {
        if (Strings.isNullOrEmpty(regexp)) {
            return tMetric -> true;
        }
        boolean positive = !regexp.startsWith("!");
        Pattern p = PatternUtil.filterPattern(regexp);
        return tMetric -> positive == p.matcher(tMetric.getName()).matches();
    }

    @Override
    public Observable<Map<String, Set<String>>> getTagValues(String tenantId, MetricType<?> metricType, Map<String, String> tagsQueries) {
        return this.tagQueryParser.getTagValues(tenantId, metricType, tagsQueries);
    }

    @Override
    public Observable<Map<String, String>> getMetricTags(MetricId<?> id) {
        return this.dataAccess.getMetricTags(id).take(1).map(row -> row.getMap(0, String.class, String.class)).defaultIfEmpty(new HashMap());
    }

    @Override
    public Observable<String> getTagNames(String tenantId, MetricType<?> metricType, String filter) {
        return this.tagQueryParser.getTagNames(tenantId, metricType, filter);
    }

    @Override
    public Observable<Void> addTags(Metric<?> metric, Map<String, String> tags) {
        try {
            Preconditions.checkArgument(tags != null, "Missing tags");
            Preconditions.checkArgument(Functions.isValidTagMap(tags), "Invalid tags; tag key is required");
        }
        catch (Exception e) {
            return Observable.error((Throwable)e);
        }
        return this.dataAccess.insertIntoMetricsTagsIndex(metric, tags).concatWith(this.dataAccess.addTags(metric, tags)).toList().map(l -> null);
    }

    @Override
    public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
        return this.getMetricTags(metric.getMetricId()).map(loadedTags -> {
            loadedTags.keySet().retainAll(tags);
            return loadedTags;
        }).flatMap(tagsToDelete -> this.dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(this.dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), (Map<String, String>)tagsToDelete)).toList().map(r -> null));
    }

    @Override
    public <T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<Metric<T>> metrics) {
        Preconditions.checkArgument(metricType != null, "metricType is null");
        return ((Observable)this.pointsInserter.get(metricType).call((Object)metrics.filter(metric -> !metric.getDataPoints().isEmpty()))).doOnNext(this.getDataPointsInserted()::mark).map(i -> null);
    }

    @Override
    public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit, Order order) {
        return this.findDataPoints(metricId, start, end, limit, order, this.defaultPageSize);
    }

    @Override
    public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit, Order order, int pageSize) {
        Timer.Context context = this.getRawDataReadLatency().time();
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        Order safeOrder = null == order ? Order.ASC : order;
        MetricType<T> metricType = metricId.getType();
        Func1<Row, DataPoint<T>> mapper = this.getDataPointMapper(metricType);
        if (metricType == MetricType.GAUGE || metricType == MetricType.AVAILABILITY || metricType == MetricType.COUNTER) {
            long sliceStart = DateTimeService.getTimeSlice(start, Duration.standardHours(2L));
            Func1<Row, ? extends DataPoint<?>> tempMapper = this.tempDataPointMappers.get(metricType);
            Observable compressedPoints = this.dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder).compose(new DataPointDecompressTransformer<T>(metricType, safeOrder, limit, start, end));
            Observable tempStoragePoints = this.dataAccess.findTempData(metricId, start, end, limit, safeOrder, pageSize).map(tempMapper);
            Comparator comparator = this.getDataPointComparator(safeOrder);
            ArrayList sources = new ArrayList(3);
            sources.add(compressedPoints);
            sources.add(tempStoragePoints);
            Observable dataPoints = SortedMerge.create(sources, comparator, false).distinctUntilChanged((tDataPoint, tDataPoint2) -> comparator.compare((DataPoint)tDataPoint, (DataPoint)tDataPoint2) == 0);
            if (limit > 0) {
                dataPoints = dataPoints.take(limit);
            }
            return dataPoints;
        }
        Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder = this.getDataPointFinder(metricType);
        Observable results = ((Observable)finder.call(metricId, (Object)start, (Object)end, (Object)limit, (Object)safeOrder, (Object)pageSize)).map(mapper);
        return results.doOnCompleted(context::stop);
    }

    private <T> Comparator<DataPoint<T>> getDataPointComparator(Order safeOrder) {
        Comparator comparator;
        switch (safeOrder) {
            case ASC: {
                comparator = (tDataPoint, t1) -> (int)(tDataPoint.getTimestamp() - t1.getTimestamp());
                break;
            }
            case DESC: {
                comparator = (tDataPoint, t1) -> (int)(t1.getTimestamp() - tDataPoint.getTimestamp());
                break;
            }
            default: {
                throw new RuntimeException(safeOrder.toString() + " is not correct sorting order");
            }
        }
        return comparator;
    }

    private <T> Observable.Transformer<T, T> applyRetryPolicy() {
        return tObservable -> tObservable.retryWhen(observable -> {
            Observable range = Observable.range((int)1, (int)Integer.MAX_VALUE);
            Observable zipWith = observable.zipWith(range, (t, i) -> {
                log.debug("Attempt #" + i + " to retry the operation after Cassandra client exception");
                if (t instanceof DriverException) {
                    return Observable.timer((long)i.intValue(), (TimeUnit)TimeUnit.SECONDS).onBackpressureDrop();
                }
                return Observable.error((Throwable)t);
            });
            return Observable.merge((Observable)zipWith);
        }).doOnError(t -> log.error("Failure while trying to apply compression, skipping block", (Throwable)t)).onErrorResumeNext(Observable.empty());
    }

    public void verifyAndCreateTempTables() {
        ZonedDateTime currentBlock = ZonedDateTime.ofInstant(Instant.ofEpochMilli(DateTimeService.now.get().getMillis()), ZoneOffset.UTC).with(DateTimeService.startOfPreviousEvenHour());
        ZonedDateTime lastStartupBlock = currentBlock.plus(6L, ChronoUnit.HOURS);
        this.verifyAndCreateTempTables(currentBlock, lastStartupBlock).await();
    }

    @Override
    public Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateTime endTime) {
        HashSet<Long> timestamps = new HashSet<Long>();
        while (startTime.isBefore(endTime)) {
            timestamps.add(startTime.toInstant().toEpochMilli());
            startTime = startTime.plus(2L, ChronoUnit.HOURS);
        }
        return Completable.fromObservable(this.dataAccess.createTempTablesIfNotExists(timestamps));
    }

    @Override
    public Completable compressBlock(long jobStartTimeSlice, int pageSize, int maxConcurrency) {
        return Completable.fromObservable((Observable)Observable.from(this.dataAccess.findExpiredTables(jobStartTimeSlice)).concatMap(startTimeSlice -> this.dataAccess.findAllDataFromBucket((long)startTimeSlice, pageSize, maxConcurrency).switchIfEmpty(Observable.empty()).flatMap(rows -> rows.publish(p -> p.window(p.map(Row::getPartitionKeyToken).distinctUntilChanged())).concatMap(o -> {
            Observable sharedRows = o.share();
            Observable compressed = sharedRows.compose((Observable.Transformer)new TempTableCompressTransformer((long)startTimeSlice));
            Observable keyTake = sharedRows.take(1);
            return compressed.zipWith(keyTake, (cpc, r) -> {
                MetricId metricId = new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)), r.getString(2));
                return this.dataAccess.insertCompressedData(metricId, (long)startTimeSlice, (CompressedPointContainer)cpc, this.getTTL(metricId));
            });
        }), maxConcurrency).flatMap(rs -> rs).doOnCompleted(() -> this.dataAccess.dropTempTable((long)startTimeSlice).compose(this.applyRetryPolicy()).subscribeOn(Schedulers.io()).subscribe())));
    }

    @Override
    @Deprecated
    public Completable compressBlock(Observable<? extends MetricId<?>> metrics, long startTimeSlice, long endTimeSlice, int pageSize) {
        return Completable.fromObservable((Observable)metrics.compose(this.applyRetryPolicy()).concatMap(metricId -> this.findDataPoints((MetricId)metricId, startTimeSlice, endTimeSlice, 0, Order.ASC, pageSize).compose(this.applyRetryPolicy()).compose(new DataPointCompressTransformer(metricId.getType(), startTimeSlice)).concatMap(cpc -> this.dataAccess.deleteAndInsertCompressedGauge(metricId, startTimeSlice, (CompressedPointContainer)cpc, startTimeSlice, endTimeSlice, this.getTTL((MetricId<?>)metricId)).compose(this.applyRetryPolicy()))));
    }

    @Override
    public <T> Observable<NamedDataPoint<T>> findDataPoints(List<MetricId<T>> metricIds, long start, long end, int limit, Order order) {
        return Observable.from(metricIds).concatMap(id -> this.findDataPoints((MetricId)id, start, end, limit, order).map(dataPoint -> new NamedDataPoint(id.getName(), dataPoint)));
    }

    @Override
    public <T> Observable<NamedDataPoint<T>> findDataPoints(String tenantId, MetricType<T> metricType, String tagFilters, long start, long end, int limit, Order order) {
        return this.findMetricIdentifiersWithFilters(tenantId, metricType, tagFilters).concatMap(id -> this.findDataPoints((MetricId)id, start, end, limit, order).map(dataPoint -> new NamedDataPoint(id.getName(), dataPoint)));
    }

    private <T> Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> getDataPointFinder(MetricType<T> metricType) {
        Func6<? extends MetricId<?>, Long, Long, Integer, Order, Integer, Observable<Row>> finder = this.dataPointFinders.get(metricType);
        if (finder == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return finder;
    }

    private <T> Func1<Row, DataPoint<T>> getDataPointMapper(MetricType<T> metricType) {
        Func1<Row, ? extends DataPoint<?>> mapper = this.dataPointMappers.get(metricType);
        if (mapper == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return mapper;
    }

    @Override
    public Observable<DataPoint<Double>> findRateData(MetricId<? extends Number> id, long start, long end, int limit, Order order) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        Preconditions.checkArgument(id.getType() == MetricType.COUNTER || id.getType() == MetricType.GAUGE, "Unsupported metric type: %s", id.getType());
        Observable dataPoints = this.findDataPoints(id, start, end, 0, order).buffer(2, 1).map(l -> order == Order.ASC ? l : Lists.reverse(l)).filter(l -> l.size() == 2).filter(l -> id.getType() != MetricType.COUNTER || ((Number)((DataPoint)l.get(1)).getValue()).longValue() >= ((Number)((DataPoint)l.get(0)).getValue()).longValue()).map(l -> {
            DataPoint point1 = (DataPoint)l.get(0);
            DataPoint point2 = (DataPoint)l.get(1);
            long timestamp = point2.getTimestamp();
            double value_diff = ((Number)point2.getValue()).doubleValue() - ((Number)point1.getValue()).doubleValue();
            double time_diff = point2.getTimestamp() - point1.getTimestamp();
            double rate = 60000.0 * value_diff / time_diff;
            return new DataPoint<Double>(timestamp, rate);
        });
        return limit <= 0 ? dataPoints : dataPoints.take(limit);
    }

    @Override
    public <T extends Number> Observable<NamedDataPoint<Double>> findRateData(List<MetricId<T>> ids, long start, long end, int limit, Order order) {
        return Observable.from(ids).concatMap(id -> this.findRateData((MetricId<? extends Number>)id, start, end, limit, order).map(dataPoint -> new NamedDataPoint(id.getName(), dataPoint)));
    }

    @Override
    public Observable<List<NumericBucketPoint>> findRateStats(MetricId<? extends Number> id, BucketConfig bucketConfig, List<Percentile> percentiles) {
        TimeRange timeRange = bucketConfig.getTimeRange();
        Preconditions.checkArgument(Utils.isValidTimeRange(timeRange.getStart(), timeRange.getEnd()), "Invalid time range");
        return this.findRateData(id, timeRange.getStart(), timeRange.getEnd(), 0, Order.ASC).compose((Observable.Transformer)new NumericBucketPointTransformer(bucketConfig.getBuckets(), percentiles));
    }

    @Override
    public <T> Observable<T> findGaugeData(MetricId<Double> id, long start, long end, Func1<Observable<DataPoint<Double>>, Observable<T>> ... funcs) {
        Observable dataCache = this.findDataPoints(id, start, end, 0, Order.DESC).cache();
        return Observable.from((Object[])funcs).flatMap(fn -> (Observable)fn.call((Object)dataCache));
    }

    @Override
    public Observable<List<NumericBucketPoint>> findGaugeStats(MetricId<Double> metricId, BucketConfig bucketConfig, List<Percentile> percentiles) {
        TimeRange timeRange = bucketConfig.getTimeRange();
        Preconditions.checkArgument(Utils.isValidTimeRange(timeRange.getStart(), timeRange.getEnd()), "Invalid time range");
        return this.findDataPoints(metricId, timeRange.getStart(), timeRange.getEnd(), 0, Order.DESC).compose((Observable.Transformer)new NumericBucketPointTransformer(bucketConfig.getBuckets(), percentiles));
    }

    @Override
    public Observable<Map<String, TaggedBucketPoint>> findGaugeStats(MetricId<Double> metricId, Map<String, String> tags, long start, long end, List<Percentile> percentiles) {
        return this.findDataPoints(metricId, start, end, 0, Order.DESC).compose((Observable.Transformer)new TaggedBucketPointTransformer(tags, percentiles));
    }

    @Override
    public <T extends Number> Observable<List<NumericBucketPoint>> findNumericStats(List<MetricId<T>> metrics, long start, long end, Buckets buckets, List<Percentile> percentiles, boolean stacked, boolean isRate) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        if (!stacked) {
            if (!isRate) {
                return Observable.from(metrics).flatMap(metricId -> this.findDataPoints((MetricId)metricId, start, end, 0, Order.DESC)).compose((Observable.Transformer)new NumericBucketPointTransformer(buckets, percentiles));
            }
            return Observable.from(metrics).flatMap(metricId -> this.findRateData((MetricId<? extends Number>)metricId, start, end, 0, Order.ASC)).compose((Observable.Transformer)new NumericBucketPointTransformer(buckets, percentiles));
        }
        Observable individualStats = !isRate ? Observable.from(metrics).map(metricId -> this.findDataPoints((MetricId)metricId, start, end, 0, Order.DESC).compose((Observable.Transformer)new NumericBucketPointTransformer(buckets, percentiles)).flatMap(Observable::from)) : Observable.from(metrics).map(metricId -> this.findRateData((MetricId<? extends Number>)metricId, start, end, 0, Order.ASC).compose((Observable.Transformer)new NumericBucketPointTransformer(buckets, percentiles)).flatMap(Observable::from));
        return Observable.merge((Observable)individualStats).groupBy(BucketPoint::getStart).flatMap(group -> group.collect(SumNumericBucketPointCollector::new, SumNumericBucketPointCollector::increment)).map(SumNumericBucketPointCollector::toBucketPoint).toMap(BucketPoint::getStart).map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
    }

    @Override
    public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start, long end, boolean distinct, int limit, Order order) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        if (distinct) {
            Observable availabilityData = this.findDataPoints(id, start, end, 0, order).distinctUntilChanged(DataPoint::getValue);
            if (limit <= 0) {
                return availabilityData;
            }
            return availabilityData.limit(limit);
        }
        return this.findDataPoints(id, start, end, limit, order);
    }

    @Override
    public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId, long start, long end, Buckets buckets) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.findDataPoints(metricId, start, end, 0, Order.ASC).groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp())).flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, (Integer)group.getKey()), AvailabilityDataPointCollector::increment)).map(AvailabilityDataPointCollector::toBucketPoint).toMap(BucketPoint::getStart).map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
    }

    @Override
    public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct, int limit, Order order) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end));
        if (distinct) {
            return this.findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
        }
        return this.findDataPoints(id, start, end, limit, order);
    }

    @Override
    public Observable<List<NumericBucketPoint>> findCounterStats(MetricId<Long> id, BucketConfig bucketConfig, List<Percentile> percentiles) {
        TimeRange timeRange = bucketConfig.getTimeRange();
        Preconditions.checkArgument(Utils.isValidTimeRange(timeRange.getStart(), timeRange.getEnd()), "Invalid time range");
        return this.findDataPoints(id, timeRange.getStart(), timeRange.getEnd(), 0, Order.ASC).doOnError(Throwable::printStackTrace).compose((Observable.Transformer)new NumericBucketPointTransformer(bucketConfig.getBuckets(), percentiles));
    }

    @Override
    public Observable<Map<String, TaggedBucketPoint>> findCounterStats(MetricId<Long> metricId, Map<String, String> tags, long start, long end, List<Percentile> percentiles) {
        return this.findDataPoints(metricId, start, end, 0, Order.ASC).compose((Observable.Transformer)new TaggedBucketPointTransformer(tags, percentiles));
    }

    @Override
    public Observable<List<long[]>> getPeriods(MetricId<Double> id, Predicate<Double> predicate, long start, long end) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.findDataPoints(id, start, end, 0, Order.ASC).toList().map(data -> {
            ArrayList<long[]> periods = new ArrayList<long[]>(data.size());
            long[] period = null;
            DataPoint previous = null;
            for (DataPoint d : data) {
                if (predicate.test((Double)d.getValue())) {
                    if (period == null) {
                        period = new long[2];
                        period[0] = d.getTimestamp();
                    }
                    previous = d;
                    continue;
                }
                if (period == null) continue;
                period[1] = previous.getTimestamp();
                periods.add(period);
                period = null;
                previous = null;
            }
            if (period != null) {
                period[1] = previous.getTimestamp();
                periods.add(period);
            }
            return periods;
        });
    }

    private int getTTL(MetricId<?> metricId) {
        Integer ttl = this.dataRetentions.get(new DataRetentionKey(metricId));
        ttl = ttl == null ? this.dataRetentions.getOrDefault(new DataRetentionKey(metricId.getTenantId(), metricId.getType()), this.defaultTTL) : Integer.valueOf(Duration.standardDays(ttl.intValue()).toStandardSeconds().getSeconds());
        return ttl;
    }

    public void shutdown() {
        this.metricsTasks.shutdown();
        this.unloadDataRetentions();
    }

    private <T> T time(Timer timer, Callable<T> callable) {
        try {
            return timer.time(callable);
        }
        catch (Exception e) {
            throw new RuntimeException("There was an error during a timed event", e);
        }
    }

    @Override
    public <T> Observable<Void> deleteMetric(MetricId<T> id) {
        Observable result = this.dataAccess.getMetricTags(id).map(row -> row.getMap(0, String.class, String.class)).defaultIfEmpty(new HashMap()).flatMap(map -> this.dataAccess.deleteFromMetricsTagsIndex(id, (Map<String, String>)map)).map(r -> null);
        Observable indexes = Observable.merge(this.dataAccess.deleteMetricFromMetricsIndex(id), this.dataAccess.deleteMetricData(id), this.dataAccess.deleteMetricFromRetentionIndex(id)).map(r -> null);
        return result.concatWith(indexes);
    }

    private class DataRetentionsLoadedCallback
    implements FutureCallback<Set<Retention>> {
        private final String tenantId;
        private final MetricType<?> type;
        private final CountDownLatch latch;

        public DataRetentionsLoadedCallback(String tenantId, MetricType<?> type, CountDownLatch latch) {
            this.tenantId = tenantId;
            this.type = type;
            this.latch = latch;
        }

        @Override
        public void onSuccess(Set<Retention> dataRetentionsSet) {
            for (Retention r : dataRetentionsSet) {
                MetricsServiceImpl.this.dataRetentions.put(new DataRetentionKey(r.getId()), r.getValue());
            }
            this.latch.countDown();
        }

        @Override
        public void onFailure(Throwable t) {
            log.warnDataRetentionLoadingFailure(this.tenantId, this.type, t);
            this.latch.countDown();
        }
    }

    private static class DataRetentionKey {
        private final MetricId<?> metricId;

        public DataRetentionKey(String tenantId, MetricType<?> type) {
            this.metricId = new MetricId(tenantId, type, Functions.makeSafe(type.getText()));
        }

        public DataRetentionKey(MetricId<?> metricId) {
            this.metricId = metricId;
        }

        public DataRetentionKey(Metric<?> metric) {
            this.metricId = metric.getMetricId();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            DataRetentionKey that = (DataRetentionKey)o;
            return this.metricId.equals(that.metricId);
        }

        public int hashCode() {
            return this.metricId.hashCode();
        }
    }
}

