package org.hawkular.metrics.clients.ptrans.backend;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import java.net.URI;
import java.util.List;
import org.hawkular.metrics.client.common.Batcher;
import org.hawkular.metrics.client.common.MetricBuffer;
import org.hawkular.metrics.client.common.SingleMetric;
import org.hawkular.metrics.clients.ptrans.Configuration;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/hawkular/metrics/clients/ptrans/backend/MetricsSender.class */
public class MetricsSender extends AbstractVerticle {
    private static final Logger log = Logger.getLogger((Class<?>) MetricsSender.class);
    private final String host;
    private final int port;
    private final String postUri;
    private final CharSequence hostHeader;
    private final CharSequence tenant;
    private final MetricBuffer buffer;
    private final int batchSize;
    private final int maxConnections;
    private HttpClient httpClient;
    private int connectionsUsed;
    private boolean flushScheduled;
    private long flushScheduleId;

    /* loaded from: input_file:org/hawkular/metrics/clients/ptrans/backend/MetricsSender$SingleMetricCodec.class */
    private static class SingleMetricCodec implements MessageCodec<SingleMetric, SingleMetric> {
        private SingleMetricCodec() {
        }

        @Override // io.vertx.core.eventbus.MessageCodec
        public void encodeToWire(Buffer buffer, SingleMetric singleMetric) {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.vertx.core.eventbus.MessageCodec
        public SingleMetric decodeFromWire(int i, Buffer buffer) {
            return null;
        }

        @Override // io.vertx.core.eventbus.MessageCodec
        public SingleMetric transform(SingleMetric singleMetric) {
            return singleMetric;
        }

        @Override // io.vertx.core.eventbus.MessageCodec
        public String name() {
            return SingleMetricCodec.class.getCanonicalName();
        }

        @Override // io.vertx.core.eventbus.MessageCodec
        public byte systemCodecID() {
            return (byte) -1;
        }
    }

    public MetricsSender(Configuration configuration) {
        URI restUrl = configuration.getRestUrl();
        URI httpProxy = configuration.getHttpProxy();
        if (httpProxy == null) {
            this.host = restUrl.getHost();
            this.port = restUrl.getPort();
            this.postUri = restUrl.getPath();
        } else {
            this.host = httpProxy.getHost();
            this.port = httpProxy.getPort();
            this.postUri = restUrl.toString();
        }
        this.hostHeader = HttpHeaders.createOptimized(restUrl.getHost());
        this.tenant = HttpHeaders.createOptimized(configuration.getTenant());
        this.buffer = new MetricBuffer(configuration.getBufferCapacity());
        this.batchSize = configuration.getBatchSize();
        this.maxConnections = configuration.getRestMaxConnections();
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void start(Future<Void> future) throws Exception {
        this.httpClient = this.vertx.createHttpClient(new HttpClientOptions().setDefaultHost(this.host).setDefaultPort(this.port).setKeepAlive(true).setTryUseCompression(true).setMaxPoolSize(this.maxConnections));
        this.connectionsUsed = 0;
        this.flushScheduled = false;
        this.vertx.eventBus().registerDefaultCodec(SingleMetric.class, new SingleMetricCodec());
        this.vertx.eventBus().localConsumer(Constants.METRIC_ADDRESS, this::handleMetric).completionHandler(asyncResult -> {
            future.complete();
        });
    }

    private void handleMetric(Message<SingleMetric> message) {
        this.buffer.insert(message.body());
        metricInserted();
    }

    private void metricInserted() {
        sendBatches(false);
        scheduleFlush();
    }

    private void sendBatches(boolean z) {
        int size = this.buffer.size();
        while (true) {
            int i = size;
            if ((!z && i < this.batchSize) || i < 1 || this.connectionsUsed >= this.maxConnections) {
                return;
            }
            send(this.buffer.remove(Math.min(i, this.batchSize)));
            size = this.buffer.size();
        }
    }

    private void scheduleFlush() {
        if (this.flushScheduled) {
            this.vertx.cancelTimer(this.flushScheduleId);
        } else {
            this.flushScheduled = true;
        }
        this.flushScheduleId = this.vertx.setTimer(10L, l -> {
            this.flushScheduled = false;
            sendBatches(true);
        });
    }

    private void send(List<SingleMetric> list) {
        this.connectionsUsed++;
        Buffer buffer = Buffer.buffer(Batcher.metricListToJson(list));
        HttpClientRequest post = this.httpClient.post(this.postUri, httpClientResponse -> {
            this.connectionsUsed--;
            if (httpClientResponse.statusCode() == 200) {
                scheduleFlush();
                return;
            }
            if (log.isTraceEnabled()) {
                httpClientResponse.bodyHandler(buffer2 -> {
                    log.trace("Could not send metrics: " + httpClientResponse.statusCode() + " : " + buffer2.toString());
                });
            }
            this.buffer.reInsert(list);
            metricInserted();
        });
        post.putHeader(HttpHeaders.HOST, this.hostHeader);
        post.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()));
        post.putHeader(HttpHeaders.CONTENT_TYPE, Constants.APPLICATION_JSON);
        post.putHeader(Constants.TENANT_HEADER_NAME, this.tenant);
        post.exceptionHandler(th -> {
            this.connectionsUsed--;
            log.trace("Could not send metrics", th);
            this.buffer.reInsert(list);
            metricInserted();
        });
        post.write(buffer);
        post.end();
    }

    @Override // io.vertx.core.AbstractVerticle
    public void stop() throws Exception {
        if (this.flushScheduled) {
            this.vertx.cancelTimer(this.flushScheduleId);
        }
    }
}
