package io.syndesis.server.logging.jsondb.controller;

import com.fasterxml.jackson.databind.JsonNode;
import io.fabric8.kubernetes.api.model.Pod;
import io.syndesis.common.model.action.Action;
import io.syndesis.common.util.Json;
import io.syndesis.common.util.KeyGenerator;
import io.syndesis.server.endpoint.v1.handler.activity.Activity;
import io.syndesis.server.endpoint.v1.handler.activity.ActivityStep;
import io.syndesis.server.jsondb.JsonDBException;
import io.syndesis.server.jsondb.impl.JsonRecordSupport;
import io.syndesis.server.openshift.OpenShiftService;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.tags.BindTag;

/* loaded from: input_file:BOOT-INF/lib/server-logging-jsondb-1.3.12.fuse-000001-redhat-2.jar:io/syndesis/server/logging/jsondb/controller/PodLogMonitor.class */
class PodLogMonitor implements Consumer<InputStream> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ActivityTrackingController.class);
    private final ActivityTrackingController logsController;
    protected final String podName;
    protected final String integrationId;
    protected final String deploymentVersion;
    protected PodLogState state;
    protected final AtomicBoolean markInOpenshift = new AtomicBoolean(true);
    protected final AtomicBoolean keepTrying = new AtomicBoolean(true);
    protected HashMap<String, InflightData> inflightActivities = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/server-logging-jsondb-1.3.12.fuse-000001-redhat-2.jar:io/syndesis/server/logging/jsondb/controller/PodLogMonitor$InflightData.class */
    public static class InflightData {
        Activity activity;
        Map<String, ActivityStep> steps;
        Map<String, Object> metadata;

        private InflightData() {
            this.activity = new Activity();
            this.steps = new LinkedHashMap();
            this.metadata = new HashMap();
        }

        public ActivityStep getStep(String str, String str2) throws IOException {
            ActivityStep activityStep = this.steps.get(str);
            if (activityStep == null) {
                activityStep = new ActivityStep();
                activityStep.setId(str);
                activityStep.setAt(Long.valueOf(KeyGenerator.getKeyTimeMillis(str2)));
                this.steps.put(str, activityStep);
            }
            return activityStep;
        }
    }

    public PodLogMonitor(ActivityTrackingController activityTrackingController, Pod pod) throws IOException {
        this.logsController = activityTrackingController;
        this.podName = pod.getMetadata().getName();
        if (this.podName == null) {
            throw new IOException("Could not determine the pod name");
        }
        Map<String, String> labels = pod.getMetadata().getLabels();
        this.integrationId = labels.get(OpenShiftService.INTEGRATION_ID_LABEL);
        if (this.integrationId == null) {
            throw new IOException("Could not determine the integration id that is being run on the pod: " + this.podName);
        }
        this.deploymentVersion = labels.get(OpenShiftService.DEPLOYMENT_VERSION_LABEL);
        if (this.deploymentVersion == null) {
            throw new IOException("Could not determine the deployment version that is being run on the pod: " + this.podName);
        }
    }

    public void start() throws IOException {
        this.state = this.logsController.getPodLogState(this.podName);
        if (this.state == null) {
            this.state = new PodLogState();
            this.logsController.setPodLogState(this.podName, this.state);
        }
        LOG.info("Recovered state: {}", this.state);
        this.logsController.executor.execute(this::run);
    }

    public void run() {
        if (!this.logsController.stopped.get() && this.keepTrying.get() && this.logsController.isPodRunning(this.podName)) {
            LOG.info("Getting controller for pod: {}", this.podName);
            try {
                this.logsController.watchLog(this.podName, this, this.state.time);
            } catch (IOException e) {
                LOG.info("Failure occurred while processing controller for pod: {}", this.podName, e);
                this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
            }
        }
    }

    @Override // java.util.function.Consumer
    public void accept(InputStream inputStream) {
        if (inputStream == null) {
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
            return;
        }
        try {
            try {
                processLogStream(inputStream);
                inputStream.close();
            } catch (Throwable th) {
                inputStream.close();
                throw th;
            }
        } catch (IOException | InterruptedException e) {
            LOG.info("Failure occurred while processing controller for pod: {}", this.podName, e);
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
        }
    }

    private void processLogStream(InputStream inputStream) throws IOException, InterruptedException {
        int read;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while (!this.logsController.stopped.get() && (read = inputStream.read()) >= 0) {
            byteArrayOutputStream.write(read);
            if (read == 10) {
                processLine(byteArrayOutputStream.toByteArray());
                byteArrayOutputStream.reset();
            }
            if (byteArrayOutputStream.size() > 10240) {
                byteArrayOutputStream.reset();
            }
        }
        if (this.logsController.stopped.get()) {
            return;
        }
        if (!this.logsController.isPodRunning(this.podName)) {
            LOG.info("End of Log stream for terminated pod: {}", this.podName);
        } else {
            LOG.info("End of Log stream for running pod: {}", this.podName);
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
        }
    }

    InflightData getInflightData(String str, String str2) throws IOException {
        InflightData inflightData = this.inflightActivities.get(str);
        if (inflightData == null) {
            inflightData = new InflightData();
            inflightData.activity.setPod(this.podName);
            inflightData.activity.setVer(this.deploymentVersion);
            inflightData.activity.setId(str);
            inflightData.activity.setAt(Long.valueOf(KeyGenerator.getKeyTimeMillis(str)));
            inflightData.activity.setLogts(str2);
            this.inflightActivities.put(str, inflightData);
        }
        return inflightData;
    }

    private void processLine(byte[] bArr) throws IOException {
        if (bArr.length >= 32 && bArr[30] == 32 && bArr[31] == 123) {
            String str = new String(bArr, 0, 30, StandardCharsets.US_ASCII);
            try {
                Map<String, Object> map = (Map) Json.reader().forType(HashMap.class).readValue(bArr, 31, bArr.length - 31);
                String validate = validate((String) map.remove("id"));
                String validate2 = validate((String) map.remove("exchange"));
                InflightData inflightData = getInflightData(validate2, str);
                String str2 = (String) map.remove(Action.TYPE_STEP);
                if (str2 == null) {
                    Boolean bool = (Boolean) map.remove("failed");
                    if (bool != null) {
                        inflightData.activity.setFailed(bool);
                    }
                    String str3 = (String) map.remove(BindTag.STATUS_VARIABLE_NAME);
                    inflightData.metadata.putAll(map);
                    if (str3 != null) {
                        inflightData.activity.setStatus(str3);
                        if ("done".equals(str3)) {
                            inflightData.activity.setSteps(new ArrayList(inflightData.steps.values()));
                            if (!inflightData.metadata.isEmpty()) {
                                inflightData.activity.setMetadata(toJsonNode(inflightData.metadata));
                            }
                            String writeValueAsString = Json.writer().writeValueAsString(inflightData.activity);
                            String format = String.format("/exchanges/%s/%s", this.integrationId, validate2);
                            this.inflightActivities.remove(validate2);
                            this.logsController.eventQueue.put(map2 -> {
                                map2.put(format, writeValueAsString);
                                trackState(str, map2);
                            });
                        }
                    }
                } else {
                    ActivityStep step = inflightData.getStep(str2, validate);
                    String str4 = (String) map.remove("message");
                    if (str4 != null) {
                        if (step.getMessages() == null) {
                            step.setMessages(new ArrayList());
                        }
                        step.getMessages().add(str4);
                    }
                    String str5 = (String) map.remove("failure");
                    if (str5 != null) {
                        step.setFailure(str5);
                    }
                    Number number = (Number) map.remove("duration");
                    if (number != null) {
                        step.setDuration(Long.valueOf(number.longValue()));
                    }
                    if (!map.isEmpty()) {
                        if (step.getEvents() == null) {
                            step.setEvents(new ArrayList());
                        }
                        step.getEvents().add(toJsonNode(map));
                    }
                }
            } catch (JsonDBException | IOException | ClassCastException e) {
            } catch (InterruptedException e2) {
                InterruptedIOException interruptedIOException = new InterruptedIOException(e2.getMessage());
                interruptedIOException.initCause(e2);
                throw interruptedIOException;
            }
        }
    }

    private JsonNode toJsonNode(Map<String, Object> map) throws IOException {
        return Json.reader().readTree(Json.writer().writeValueAsString(map));
    }

    private void trackState(String str, Map<String, Object> map) {
        this.state.time = str;
        map.put("/pods/" + this.podName, this.state);
        map.put("/integrations/" + this.integrationId, Boolean.TRUE);
    }

    private String validate(String str) {
        if (str == null) {
            return null;
        }
        return JsonRecordSupport.validateKey(str);
    }
}
