/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.agent.monitor.storage;

import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.hawkular.agent.monitor.diagnostics.Diagnostics;
import org.hawkular.agent.monitor.log.AgentLoggers;
import org.hawkular.agent.monitor.log.MsgLogger;
import org.hawkular.agent.monitor.scheduler.SchedulerConfiguration;
import org.hawkular.agent.monitor.storage.MetricDataPoint;
import org.hawkular.agent.monitor.storage.StorageAdapter;
import org.hawkular.agent.monitor.util.Consumer;

public class MetricBufferedStorageDispatcher
implements Consumer<MetricDataPoint> {
    private static final MsgLogger log = AgentLoggers.getLogger(MetricBufferedStorageDispatcher.class);
    private final int maxBatchSize;
    private final int bufferSize;
    private final StorageAdapter storageAdapter;
    private final Diagnostics diagnostics;
    private final BlockingQueue<MetricDataPoint> queue;
    private final Worker worker;

    public MetricBufferedStorageDispatcher(SchedulerConfiguration config, StorageAdapter storageAdapter, Diagnostics diagnostics) {
        this.maxBatchSize = config.getMetricDispatcherMaxBatchSize();
        this.bufferSize = config.getMetricDispatcherBufferSize();
        this.storageAdapter = storageAdapter;
        this.diagnostics = diagnostics;
        this.queue = new ArrayBlockingQueue<MetricDataPoint>(this.bufferSize);
        this.worker = new Worker(this.queue);
    }

    public void start() {
        this.worker.start();
    }

    public void shutdown() {
        this.worker.setKeepRunning(false);
        this.worker.interrupt();
        try {
            this.worker.join(60000L);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void accept(MetricDataPoint sample) {
        if (this.queue.remainingCapacity() <= 0) {
            throw new RuntimeException("Metric dispatcher buffer capacity has been exceeded [" + this.bufferSize + "]");
        }
        log.debugf("Metric collected: [%s]->[%f]", sample.getKey(), sample.getValue());
        this.diagnostics.getMetricsStorageBufferSize().inc();
        this.queue.add(sample);
    }

    @Override
    public void report(Throwable e) {
        log.errorMetricCollectionFailed(e);
    }

    public class Worker
    extends Thread {
        private final BlockingQueue<MetricDataPoint> queue;
        private boolean keepRunning;

        public Worker(BlockingQueue<MetricDataPoint> queue) {
            super("Hawkular-WildFly-Agent-Storage-Dispatcher-Metric");
            this.keepRunning = true;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (this.keepRunning) {
                    MetricDataPoint sample = this.queue.take();
                    HashSet<MetricDataPoint> samples = new HashSet<MetricDataPoint>();
                    this.queue.drainTo(samples, MetricBufferedStorageDispatcher.this.maxBatchSize);
                    samples.add(sample);
                    MetricBufferedStorageDispatcher.this.diagnostics.getMetricsStorageBufferSize().dec((long)samples.size());
                    MetricBufferedStorageDispatcher.this.storageAdapter.storeMetrics(samples, 0L);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        public void setKeepRunning(boolean keepRunning) {
            this.keepRunning = keepRunning;
        }
    }
}

