/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.clients.ptrans.backend;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import java.net.URI;
import java.util.List;
import org.hawkular.metrics.client.common.Batcher;
import org.hawkular.metrics.client.common.MetricBuffer;
import org.hawkular.metrics.client.common.SingleMetric;
import org.hawkular.metrics.clients.ptrans.Configuration;
import org.hawkular.metrics.clients.ptrans.backend.Constants;
import org.jboss.logging.Logger;

public class MetricsSender
extends AbstractVerticle {
    private static final Logger log = Logger.getLogger(MetricsSender.class);
    private final String host;
    private final int port;
    private final String postUri;
    private final CharSequence hostHeader;
    private final CharSequence tenant;
    private final MetricBuffer buffer;
    private final int batchSize;
    private final int maxConnections;
    private HttpClient httpClient;
    private int connectionsUsed;
    private boolean flushScheduled;
    private long flushScheduleId;

    public MetricsSender(Configuration configuration) {
        URI restUrl = configuration.getRestUrl();
        URI httpProxy = configuration.getHttpProxy();
        if (httpProxy == null) {
            this.host = restUrl.getHost();
            this.port = restUrl.getPort();
            this.postUri = restUrl.getPath();
        } else {
            this.host = httpProxy.getHost();
            this.port = httpProxy.getPort();
            this.postUri = restUrl.toString();
        }
        this.hostHeader = HttpHeaders.createOptimized((String)restUrl.getHost());
        this.tenant = HttpHeaders.createOptimized((String)configuration.getTenant());
        this.buffer = new MetricBuffer(configuration.getBufferCapacity());
        this.batchSize = configuration.getBatchSize();
        this.maxConnections = configuration.getRestMaxConnections();
    }

    public void start(Future<Void> startFuture) throws Exception {
        HttpClientOptions httpClientOptions = new HttpClientOptions().setDefaultHost(this.host).setDefaultPort(this.port).setKeepAlive(true).setTryUseCompression(true).setMaxPoolSize(this.maxConnections);
        this.httpClient = this.vertx.createHttpClient(httpClientOptions);
        this.connectionsUsed = 0;
        this.flushScheduled = false;
        this.vertx.eventBus().registerDefaultCodec(SingleMetric.class, (MessageCodec)new SingleMetricCodec());
        this.vertx.eventBus().localConsumer("singlemetric", this::handleMetric).completionHandler(v -> startFuture.complete());
    }

    private void handleMetric(Message<SingleMetric> metricMessage) {
        this.buffer.insert((SingleMetric)metricMessage.body());
        this.metricInserted();
    }

    private void metricInserted() {
        this.sendBatches(false);
        this.scheduleFlush();
    }

    private void sendBatches(boolean force) {
        int bufferSize = this.buffer.size();
        while ((force || bufferSize >= this.batchSize) && bufferSize >= 1 && this.connectionsUsed < this.maxConnections) {
            List metrics = this.buffer.remove(Math.min(bufferSize, this.batchSize));
            this.send(metrics);
            bufferSize = this.buffer.size();
        }
    }

    private void scheduleFlush() {
        if (this.flushScheduled) {
            this.vertx.cancelTimer(this.flushScheduleId);
        } else {
            this.flushScheduled = true;
        }
        this.flushScheduleId = this.vertx.setTimer(10L, h -> {
            this.flushScheduled = false;
            this.sendBatches(true);
        });
    }

    private void send(List<SingleMetric> metrics) {
        ++this.connectionsUsed;
        String json = Batcher.metricListToJson(metrics);
        Buffer data = Buffer.buffer((String)json);
        HttpClientRequest req = this.httpClient.post(this.postUri, response -> {
            --this.connectionsUsed;
            if (response.statusCode() != 200) {
                if (log.isTraceEnabled()) {
                    response.bodyHandler(msg -> log.trace((Object)("Could not send metrics: " + response.statusCode() + " : " + msg.toString())));
                }
                this.buffer.reInsert(metrics);
                this.metricInserted();
            } else {
                this.scheduleFlush();
            }
        });
        req.putHeader(HttpHeaders.HOST, this.hostHeader);
        req.putHeader(HttpHeaders.CONTENT_LENGTH, (CharSequence)String.valueOf(data.length()));
        req.putHeader(HttpHeaders.CONTENT_TYPE, Constants.APPLICATION_JSON);
        req.putHeader(Constants.TENANT_HEADER_NAME, this.tenant);
        req.exceptionHandler(err -> {
            --this.connectionsUsed;
            log.trace((Object)"Could not send metrics", err);
            this.buffer.reInsert(metrics);
            this.metricInserted();
        });
        req.write(data);
        req.end();
    }

    public void stop() throws Exception {
        if (this.flushScheduled) {
            this.vertx.cancelTimer(this.flushScheduleId);
        }
    }

    private static class SingleMetricCodec
    implements MessageCodec<SingleMetric, SingleMetric> {
        private SingleMetricCodec() {
        }

        public void encodeToWire(Buffer buffer, SingleMetric singleMetric) {
        }

        public SingleMetric decodeFromWire(int pos, Buffer buffer) {
            return null;
        }

        public SingleMetric transform(SingleMetric singleMetric) {
            return singleMetric;
        }

        public String name() {
            return SingleMetricCodec.class.getCanonicalName();
        }

        public byte systemCodecID() {
            return -1;
        }
    }
}

