package io.fabric8.insight.elasticsearch.impl;

import io.fabric8.insight.storage.StorageService;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.lucene.index.LogDocMergePolicy;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fabric8/insight/elasticsearch/impl/ElasticStorageImpl.class */
public class ElasticStorageImpl implements StorageService, Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticStorageImpl.class);
    private static final SimpleDateFormat indexFormat = new SimpleDateFormat("yyyy.MM.dd");
    private final Node node;
    private Thread thread;
    private volatile boolean running;
    private int max = LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS;
    private BlockingQueue<ActionRequest> queue = new LinkedBlockingQueue();

    public ElasticStorageImpl(Node node) {
        this.node = node;
    }

    public void init() {
        this.running = true;
        this.thread = new Thread(this, "ElasticStorage");
        this.thread.start();
    }

    public void destroy() {
        this.running = false;
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    public void store(String str, long j, String str2) {
        this.queue.add(new IndexRequest().index("insight-" + indexFormat.format(new Date(new Date(j).getTime() + (r0.getTimezoneOffset() * 60000)))).type(str).source(str2).create(true));
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                ActionRequest take = this.queue.take();
                BulkRequest bulkRequest = new BulkRequest();
                int i = 0;
                while (take != null && (i == 0 || i < this.max)) {
                    bulkRequest.add(take);
                    i++;
                    take = this.queue.poll();
                }
                if (bulkRequest.numberOfActions() > 0) {
                    for (BulkItemResponse bulkItemResponse : this.node.client().bulk(bulkRequest).actionGet().items()) {
                        if (bulkItemResponse.failed()) {
                            LOGGER.warn("Error executing request: {}", bulkItemResponse.getFailureMessage());
                        }
                    }
                }
            } catch (Exception e) {
                if (this.running) {
                    LOGGER.warn("Error while sending requests", e);
                }
            }
        }
    }
}
