/*
 * Decompiled with CFR 0.152.
 */
package org.rhq.plugins.hadoop;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.rhq.core.domain.content.transfer.ResourcePackageDetails;
import org.rhq.core.domain.measurement.DataType;
import org.rhq.core.domain.measurement.MeasurementDefinition;
import org.rhq.core.domain.measurement.MeasurementReport;
import org.rhq.core.domain.measurement.MeasurementScheduleRequest;
import org.rhq.core.domain.measurement.calltime.CallTimeData;
import org.rhq.core.domain.resource.CreateResourceStatus;
import org.rhq.core.pluginapi.content.ContentContext;
import org.rhq.core.pluginapi.content.ContentServices;
import org.rhq.core.pluginapi.event.EventContext;
import org.rhq.core.pluginapi.event.EventPoller;
import org.rhq.core.pluginapi.event.log.LogEntryProcessor;
import org.rhq.core.pluginapi.event.log.LogFileEventPoller;
import org.rhq.core.pluginapi.inventory.CreateChildResourceFacet;
import org.rhq.core.pluginapi.inventory.CreateResourceReport;
import org.rhq.core.pluginapi.inventory.InvalidPluginConfigurationException;
import org.rhq.core.pluginapi.inventory.ResourceContext;
import org.rhq.plugins.hadoop.HadoopServerComponent;
import org.rhq.plugins.hadoop.calltime.HadoopEventAndCalltimeDelegate;
import org.rhq.plugins.hadoop.calltime.JobSummary;

public class JobTrackerServerComponent
extends HadoopServerComponent
implements CreateChildResourceFacet {
    private static final String SYNTHETIC_METRICS_PREFIX = "_synthetic_";
    private static final String JOB_DURATION_METRIC_NAME = "_synthetic_jobDuration";
    private static final String JOB_PRE_START_DELAY_METRIC_NAME = "_synthetic_jobPreStartDelay";
    private static final String JOB_SUCCESS_RATE_METRIC_NAME = "_synthetic_jobSuccessRate";
    private static final String DEFAULT_JOB_STORAGE_NAME = "__dataDir";
    private static final String JOB_STORAGE_PROP_NAME = "jobStorage";
    private Map<String, Map<String, Set<JobSummary>>> unprocessedCalltimeMeasurements = new HashMap<String, Map<String, Set<JobSummary>>>();
    private HadoopEventAndCalltimeDelegate logProcessor;

    @Override
    public void start(ResourceContext context) throws InvalidPluginConfigurationException, Exception {
        super.start(context);
        Set measDefinitions = context.getResourceType().getMetricDefinitions();
        for (MeasurementDefinition measDefinition : measDefinitions) {
            if (measDefinition.getDataType() != DataType.CALLTIME) continue;
            this.unprocessedCalltimeMeasurements.put(measDefinition.getName(), new HashMap());
        }
    }

    @Override
    public void stop() {
        super.stop();
    }

    public File getJobJarDataDir() {
        String dataDirName = this.getResourceContext().getPluginConfiguration().getSimpleValue(JOB_STORAGE_PROP_NAME, DEFAULT_JOB_STORAGE_NAME);
        File dataDir = null;
        if (DEFAULT_JOB_STORAGE_NAME.equals(dataDirName)) {
            dataDir = new File(this.getResourceContext().getDataDirectory(), "jobJars");
            dataDir.mkdirs();
        } else {
            dataDir = new File(dataDirName);
            if (!dataDir.isAbsolute()) {
                File hadoopHome = this.getHomeDir();
                dataDir = new File(hadoopHome, dataDirName);
            }
        }
        return dataDir;
    }

    public CreateResourceReport createResource(CreateResourceReport report) {
        if (!"jobJar".equals(report.getPackageDetails().getKey().getPackageTypeName())) {
            report.setStatus(CreateResourceStatus.FAILURE);
            report.setErrorMessage("Unknown content type");
            return report;
        }
        File dataDir = this.getJobJarDataDir();
        ResourcePackageDetails packageDetails = report.getPackageDetails();
        File jobJar = new File(dataDir, packageDetails.getFileName());
        FileOutputStream jobJarStream = null;
        try {
            jobJarStream = new FileOutputStream(jobJar);
        }
        catch (FileNotFoundException e) {
            report.setErrorMessage("Could not create the job jar file on the agent: " + e.getMessage());
            return report;
        }
        ContentContext contentContext = this.getResourceContext().getContentContext();
        ContentServices contentServices = contentContext.getContentServices();
        contentServices.downloadPackageBitsForChildResource(contentContext, "Job Jar", packageDetails.getKey(), (OutputStream)jobJarStream);
        try {
            jobJarStream.close();
        }
        catch (IOException e) {
            // empty catch block
        }
        report.setResourceKey(jobJar.getAbsolutePath());
        report.setResourceName(jobJar.getName());
        report.setStatus(CreateResourceStatus.SUCCESS);
        return report;
    }

    public ResourceContext<?> getResourceContext() {
        return super.getResourceContext();
    }

    @Override
    protected void handleMetric(MeasurementReport report, MeasurementScheduleRequest request) throws Exception {
        if (request.getName().startsWith(SYNTHETIC_METRICS_PREFIX)) {
            if (this.logProcessor != null) {
                this.updateUnprocessedMeasurements();
                Map<String, Set<JobSummary>> pendingJobs = this.unprocessedCalltimeMeasurements.get(request.getName());
                report.addData(this.createCalltimeData(request, pendingJobs));
                pendingJobs.clear();
            }
        } else {
            super.handleMetric(report, request);
        }
    }

    @Override
    protected EventPoller createNewEventPoller(EventContext eventContext, File logFile) {
        this.logProcessor = new HadoopEventAndCalltimeDelegate("logEntry", logFile);
        return new LogFileEventPoller(eventContext, "logEntry", logFile, (LogEntryProcessor)this.logProcessor);
    }

    @Override
    protected void discardPoller() {
        this.logProcessor = null;
    }

    private void updateUnprocessedMeasurements() {
        if (this.logProcessor == null) {
            return;
        }
        Set<JobSummary> newJobs = this.logProcessor.drainAccumulatedJobs();
        for (Map.Entry<String, Map<String, Set<JobSummary>>> e : this.unprocessedCalltimeMeasurements.entrySet()) {
            Map<String, Set<JobSummary>> jobsByName = e.getValue();
            for (JobSummary newJob : newJobs) {
                Set<JobSummary> unprocessed = jobsByName.get(newJob.getJobName());
                if (unprocessed == null) {
                    unprocessed = new HashSet<JobSummary>();
                    jobsByName.put(newJob.getJobName(), unprocessed);
                }
                unprocessed.add(newJob);
            }
        }
    }

    private CallTimeData createCalltimeData(MeasurementScheduleRequest request, Map<String, Set<JobSummary>> pendingJobs) {
        CallTimeData ret = new CallTimeData(request);
        String metricName = request.getName();
        for (Map.Entry<String, Set<JobSummary>> e : pendingJobs.entrySet()) {
            String jobName = e.getKey();
            Set<JobSummary> jobs = e.getValue();
            if (JOB_DURATION_METRIC_NAME.equals(metricName)) {
                this.initJobDurationData(ret, jobName, jobs);
                continue;
            }
            if (JOB_PRE_START_DELAY_METRIC_NAME.equals(metricName)) {
                this.initPreStartDelayData(ret, jobName, jobs);
                continue;
            }
            if (!JOB_SUCCESS_RATE_METRIC_NAME.equals(metricName)) continue;
            this.initSuccessRateData(ret, jobName, jobs);
        }
        return ret;
    }

    private void initJobDurationData(CallTimeData data, String jobName, Set<JobSummary> pendingJobs) {
        long beginTime = Long.MAX_VALUE;
        long endTime = 0L;
        long totalTime = 0L;
        long minTime = Long.MAX_VALUE;
        long maxTime = 0L;
        long count = pendingJobs.size();
        for (JobSummary job : pendingJobs) {
            if (job.getStartTime() < beginTime) {
                beginTime = job.getStartTime();
            }
            if (job.getEndTime() > endTime) {
                endTime = job.getEndTime();
            }
            long duration = job.getEndTime() - job.getStartTime();
            totalTime += duration;
            if (duration < minTime) {
                minTime = duration;
            }
            if (duration <= maxTime) continue;
            maxTime = duration;
        }
        data.addAggregatedCallData(jobName, new Date(beginTime), new Date(endTime), (double)minTime, (double)maxTime, (double)totalTime, count);
    }

    private void initPreStartDelayData(CallTimeData data, String jobName, Set<JobSummary> pendingJobs) {
        long beginTime = Long.MAX_VALUE;
        long endTime = 0L;
        long totalTime = 0L;
        long minTime = Long.MAX_VALUE;
        long maxTime = 0L;
        long count = pendingJobs.size();
        for (JobSummary job : pendingJobs) {
            if (job.getSubmitTime() < beginTime) {
                beginTime = job.getSubmitTime();
            }
            if (job.getStartTime() > endTime) {
                endTime = job.getStartTime();
            }
            long duration = job.getStartTime() - job.getSubmitTime();
            totalTime += duration;
            if (duration < minTime) {
                minTime = duration;
            }
            if (duration <= maxTime) continue;
            maxTime = duration;
        }
        data.addAggregatedCallData(jobName, new Date(beginTime), new Date(endTime), (double)minTime, (double)maxTime, (double)totalTime, count);
    }

    private void initSuccessRateData(CallTimeData data, String jobName, Set<JobSummary> pendingJobs) {
        long beginTime = Long.MAX_VALUE;
        long endTime = 0L;
        long totalTime = 0L;
        long minTime = Long.MAX_VALUE;
        long maxTime = 0L;
        long count = pendingJobs.size();
        for (JobSummary job : pendingJobs) {
            if (job.getStartTime() < beginTime) {
                beginTime = job.getStartTime();
            }
            if (job.getEndTime() > endTime) {
                endTime = job.getEndTime();
            }
            long duration = job.isSucceeded() ? 1L : 0L;
            totalTime += duration;
            if (duration < minTime) {
                minTime = duration;
            }
            if (duration <= maxTime) continue;
            maxTime = duration;
        }
        data.addAggregatedCallData(jobName, new Date(beginTime), new Date(endTime), (double)minTime, (double)maxTime, (double)totalTime, count);
    }
}

