package io.syndesis.server.logging.jsondb.controller;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.syndesis.common.util.DurationConverter;
import io.syndesis.common.util.backend.BackendController;
import io.syndesis.common.util.json.JsonUtils;
import io.syndesis.common.util.thread.Threads;
import io.syndesis.server.jsondb.GetOptions;
import io.syndesis.server.jsondb.JsonDB;
import io.syndesis.server.jsondb.impl.SqlJsonDB;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.PreparedBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(value = {"controllers.dblogging.enabled"}, havingValue = "true", matchIfMissing = true)
@Service
/* loaded from: input_file:io/syndesis/server/logging/jsondb/controller/ActivityTrackingController.class */
public class ActivityTrackingController implements BackendController, Closeable {
    static final String IDLE_THREAD_NAME = "Logs Controller [idle]";
    private static final Logger LOG = LoggerFactory.getLogger(ActivityTrackingController.class);
    private final DBI dbi;
    private final KubernetesClient client;
    private final JsonDB jsondb;
    private ScheduledExecutorService scheduler;
    private ExecutorService executor;
    final KubernetesSupport kubernetesSupport;
    private SqlJsonDB.DatabaseKind databaseKind;
    private final Map<String, PodLogMonitor> podHandlers = new ConcurrentHashMap();
    protected final LinkedBlockingDeque<BatchOperation> eventQueue = new LinkedBlockingDeque<>(1000);
    protected final AtomicBoolean stopped = new AtomicBoolean();
    private int retention = 50;
    private Duration retentionTime = Duration.ofDays(1);
    private Duration cleanUpInterval = Duration.ofMinutes(15);
    private Duration startupDelay = Duration.ofSeconds(15);

    @Autowired
    public ActivityTrackingController(JsonDB jsonDB, DBI dbi, KubernetesClient kubernetesClient) {
        this.jsondb = jsonDB;
        this.dbi = dbi;
        this.client = kubernetesClient;
        this.kubernetesSupport = new KubernetesSupport(kubernetesClient);
    }

    public void start() {
        open();
    }

    @PostConstruct
    public void open() {
        this.scheduler = Executors.newScheduledThreadPool(1, Threads.newThreadFactory("Logs Controller Scheduler"));
        this.executor = Executors.newCachedThreadPool(Threads.newThreadFactory("Logs Controller"));
        this.stopped.set(false);
        this.executor.execute(this::processEventQueue);
        this.scheduler.scheduleWithFixedDelay(this::pollPods, this.startupDelay.getSeconds(), 5L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(this::cleanupLogs, this.startupDelay.toMillis(), this.cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.dbi.inTransaction((handle, transactionStatus) -> {
            this.databaseKind = SqlJsonDB.DatabaseKind.valueOf(handle.getConnection().getMetaData().getDatabaseProductName());
            if (this.databaseKind != SqlJsonDB.DatabaseKind.PostgreSQL || !((String) handle.createQuery("SELECT VERSION()").mapTo(String.class).first()).startsWith("CockroachDB")) {
                return null;
            }
            this.databaseKind = SqlJsonDB.DatabaseKind.CockroachDB;
            return null;
        });
    }

    public void cleanupLogs() {
        Thread.currentThread().setName("Logs Controller Scheduler [running]: cleanupLogs");
        try {
            try {
                LOG.info("Purging old activity logs");
                Map map = (Map) dbGet(HashMap.class, "/activity/integrations");
                if (map != null) {
                    for (String str : map.keySet()) {
                        LOG.info("deleted {} transactions for integration: {}", Integer.valueOf(deleteKeepingRetention("/activity/exchanges/" + str + "/%")), str);
                    }
                }
                Thread.currentThread().setName("Logs Controller Scheduler [idle]");
            } catch (IOException e) {
                LOG.error("Unexpected Error occurred.", e);
                Thread.currentThread().setName("Logs Controller Scheduler [idle]");
            }
        } catch (Throwable th) {
            Thread.currentThread().setName("Logs Controller Scheduler [idle]");
            throw th;
        }
    }

    int deleteKeepingRetention(String str) {
        return ((Integer) this.dbi.inTransaction((handle, transactionStatus) -> {
            return Integer.valueOf(handle.update("DELETE FROM jsondb WHERE path IN (SELECT path FROM jsondb WHERE path LIKE ? ORDER BY path DESC " + (this.databaseKind == SqlJsonDB.DatabaseKind.PostgreSQL ? "FOR KEY SHARE SKIP LOCKED " : "") + "OFFSET ? ROWS)", new Object[]{str, Integer.valueOf(this.retention)}));
        })).intValue();
    }

    private void writeBatch(Map<String, Object> map) {
        this.dbi.inTransaction((handle, transactionStatus) -> {
            PreparedBatch prepareBatch = handle.prepareBatch(this.databaseKind == SqlJsonDB.DatabaseKind.PostgreSQL ? "INSERT into jsondb (path, value, ovalue) values (:path, :value, :ovalue) ON CONFLICT (path) DO UPDATE SET value = :value, ovalue = :ovalue" : this.databaseKind == SqlJsonDB.DatabaseKind.H2 ? "MERGE INTO jsondb (path, value, ovalue) VALUES (:path, :value, :ovalue)" : "INSERT into jsondb (path, value, ovalue) values (:path, :value, :ovalue)");
            for (Map.Entry entry : map.entrySet()) {
                String str = "/activity" + ((String) entry.getKey()) + "/";
                String str2 = null;
                String str3 = null;
                if (str.startsWith("/activity/exchanges")) {
                    str2 = '`' + ((String) entry.getValue());
                } else if (str.startsWith("/activity/integrations")) {
                    str3 = "true";
                    str2 = String.valueOf((char) 2);
                } else if (str.startsWith("/activity/pods")) {
                    str = str + "time/";
                    str2 = '`' + ((PodLogState) entry.getValue()).time;
                }
                prepareBatch.bind("path", str).bind("value", str2).bind("ovalue", str3).add();
            }
            return prepareBatch.execute();
        });
    }

    public void stop() {
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        boolean awaitTermination;
        if (this.stopped.getAndSet(true)) {
            return;
        }
        this.kubernetesSupport.cancelAllRequests();
        this.scheduler.shutdownNow();
        this.executor.shutdown();
        do {
            try {
                boolean awaitTermination2 = this.scheduler.awaitTermination(10L, TimeUnit.SECONDS);
                awaitTermination = this.executor.awaitTermination(10L, TimeUnit.SECONDS);
                if (awaitTermination2) {
                    break;
                }
            } catch (InterruptedException e) {
                LOG.warn("Unable to cleanly stop: {}", e.getMessage());
                LOG.debug("Interrupted while stopping", e);
                return;
            }
        } while (!awaitTermination);
        this.scheduler = null;
        this.executor = null;
    }

    private void pollPods() {
        Thread.currentThread().setName("Logs Controller Scheduler [running]: pollPods");
        try {
            try {
                Iterator<PodLogMonitor> it = this.podHandlers.values().iterator();
                while (it.hasNext()) {
                    it.next().markInOpenshift.set(false);
                }
                for (Pod pod : listPods().getItems()) {
                    if ("Running".equals(pod.getStatus().getPhase())) {
                        PodLogMonitor computeIfAbsent = this.podHandlers.computeIfAbsent(pod.getMetadata().getName(), str -> {
                            return createLogMonitor(pod);
                        });
                        try {
                            computeIfAbsent.markInOpenshift.set(true);
                            computeIfAbsent.start();
                        } catch (IOException e) {
                            LOG.error("Unexpected Error", e);
                        }
                    }
                }
                Iterator<Map.Entry<String, PodLogMonitor>> it2 = this.podHandlers.entrySet().iterator();
                while (it2.hasNext()) {
                    Map.Entry<String, PodLogMonitor> next = it2.next();
                    if (!next.getValue().markInOpenshift.get()) {
                        LOG.info("Pod not tracked by openshift anymore: {}", next.getValue().podName);
                        next.getValue().keepTrying.set(false);
                        it2.remove();
                    }
                }
                Map map = (Map) dbGet(HashMap.class, "/activity/pods");
                if (map != null) {
                    map.keySet().removeAll(this.podHandlers.keySet());
                    for (String str2 : map.keySet()) {
                        this.jsondb.delete("/activity/pods/" + str2);
                        LOG.info("Pod state removed from db: {}", str2);
                    }
                }
                Thread.currentThread().setName("Logs Controller Scheduler [idle]");
            } catch (IOException | RuntimeException e2) {
                LOG.error("Unexpected Error occurred.", e2);
                Thread.currentThread().setName("Logs Controller Scheduler [idle]");
            }
        } catch (Throwable th) {
            Thread.currentThread().setName("Logs Controller Scheduler [idle]");
            throw th;
        }
    }

    protected PodLogMonitor createLogMonitor(Pod pod) {
        PodLogMonitor podLogMonitor = new PodLogMonitor(this, pod);
        LOG.info("Created log monitor for pod: {}", podLogMonitor.podName);
        return podLogMonitor;
    }

    protected PodList listPods() {
        return (PodList) ((FilterWatchListDeletable) this.client.pods().withLabel("syndesis.io/component", "integration")).list();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isPodRunning(String str) {
        Pod pod = (Pod) ((PodResource) this.client.pods().withName(str)).get();
        if (pod == null) {
            return false;
        }
        return "Running".equals(pod.getStatus().getPhase());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void watchLog(String str, Consumer<InputStream> consumer, String str2) throws IOException {
        this.kubernetesSupport.watchLog(str, consumer, str2, this.executor);
    }

    public void deletePodLogState(String str) {
        this.jsondb.delete("/activity/pods/" + str);
    }

    public void setPodLogState(String str, PodLogState podLogState) throws IOException {
        this.jsondb.set("/activity/pods/" + str, JsonUtils.writer().writeValueAsBytes(podLogState));
    }

    public PodLogState getPodLogState(String str) throws IOException {
        return (PodLogState) dbGet(PodLogState.class, "/activity/pods/" + str);
    }

    private <T> T dbGet(Class<T> cls, String str) throws IOException {
        return (T) dbGet(cls, str, null);
    }

    private <T> T dbGet(Class<T> cls, String str, GetOptions getOptions) throws IOException {
        byte[] asByteArray = this.jsondb.getAsByteArray(str, getOptions);
        if (asByteArray == null) {
            return null;
        }
        return (T) JsonUtils.reader().forType(cls).readValue(asByteArray);
    }

    private void processEventQueue() {
        Thread.currentThread().setName("Logs Controller [running]: processEventQueue");
        try {
            try {
                LOG.info("Batch ingestion work thread started.");
                while (!this.stopped.get()) {
                    BatchOperation pollFirst = this.eventQueue.pollFirst(1L, TimeUnit.SECONDS);
                    if (pollFirst != null) {
                        TreeMap treeMap = new TreeMap();
                        long currentTimeMillis = System.currentTimeMillis();
                        int i = 0;
                        while (!this.stopped.get() && pollFirst != null) {
                            try {
                                i++;
                                pollFirst.apply(treeMap);
                                long currentTimeMillis2 = 1000 - (System.currentTimeMillis() - currentTimeMillis);
                                pollFirst = (treeMap.size() >= 1000 || currentTimeMillis2 <= 0) ? null : this.eventQueue.poll(currentTimeMillis2, TimeUnit.MILLISECONDS);
                            } catch (IOException e) {
                                LOG.error("Unexpected Error", e);
                            }
                        }
                        try {
                            writeBatch(treeMap);
                        } catch (RuntimeException e2) {
                            LOG.warn("Unable to write batch of events: {}", e2.getMessage());
                            LOG.debug("Unable to write batch of events: ", e2);
                        }
                        LOG.debug("Batch ingested {} log events", Integer.valueOf(i));
                    }
                }
                Thread.currentThread().setName(IDLE_THREAD_NAME);
            } catch (InterruptedException e3) {
                LOG.error("Interrupted", e3);
                Thread.currentThread().setName(IDLE_THREAD_NAME);
            }
            LOG.info("Batch ingestion work thread done.");
        } catch (Throwable th) {
            Thread.currentThread().setName(IDLE_THREAD_NAME);
            throw th;
        }
    }

    @Value("${controllers.dblogging.retention:50}")
    public void setRetention(int i) {
        this.retention = i;
    }

    @Value("${controllers.dblogging.retentionTime:1 day}")
    public void setRetentionTime(String str) {
        this.retentionTime = new DurationConverter().convert(str);
    }

    @Value("${controllers.dblogging.cleanUpPeriod:15 minutes}")
    public void setCleanUpInterval(String str) {
        this.cleanUpInterval = new DurationConverter().convert(str);
    }

    @Value("${controllers.dblogging.startupDelay:15 seconds}")
    public void setStartupDelay(String str) {
        this.startupDelay = new DurationConverter().convert(str);
    }

    public int getRetention() {
        return this.retention;
    }

    public Duration getRetentionTime() {
        return this.retentionTime;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void execute(String str, Runnable runnable) {
        if (this.stopped.get()) {
            LOG.warn("Not executing task: {}, for pod {}, the activity tracking is stopping", runnable, str);
        } else {
            this.executor.execute(() -> {
                Thread.currentThread().setName("Logs Controller [running], pod: " + str);
                try {
                    runnable.run();
                    Thread.currentThread().setName(IDLE_THREAD_NAME);
                } catch (Throwable th) {
                    Thread.currentThread().setName(IDLE_THREAD_NAME);
                    throw th;
                }
            });
        }
    }
}
