/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.cli;

import java.io.File;
import java.io.FileNotFoundException;
import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.DynamicPropertiesUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.ProgramOptionsUtils;
import org.apache.flink.client.cli.SavepointOptions;
import org.apache.flink.client.cli.StopOptions;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CliFrontend {
    private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
    private static final int INITIAL_RET_CODE = 31;
    private static final String ACTION_RUN = "run";
    private static final String ACTION_RUN_APPLICATION = "run-application";
    private static final String ACTION_INFO = "info";
    private static final String ACTION_LIST = "list";
    private static final String ACTION_CANCEL = "cancel";
    private static final String ACTION_STOP = "stop";
    private static final String ACTION_SAVEPOINT = "savepoint";
    private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
    private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
    private final Configuration configuration;
    private final List<CustomCommandLine> customCommandLines;
    private final Options customCommandLineOptions;
    private final ClusterClientServiceLoader clusterClientServiceLoader;

    public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
        this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
    }

    public CliFrontend(Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader, List<CustomCommandLine> customCommandLines) {
        this.configuration = Preconditions.checkNotNull(configuration);
        this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
        this.clusterClientServiceLoader = Preconditions.checkNotNull(clusterClientServiceLoader);
        FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
        this.customCommandLineOptions = new Options();
        for (CustomCommandLine customCommandLine : customCommandLines) {
            customCommandLine.addGeneralOptions(this.customCommandLineOptions);
            customCommandLine.addRunOptions(this.customCommandLineOptions);
        }
    }

    public Configuration getConfiguration() {
        Configuration copiedConfiguration = new Configuration();
        copiedConfiguration.addAll(this.configuration);
        return copiedConfiguration;
    }

    public Options getCustomCommandLineOptions() {
        return this.customCommandLineOptions;
    }

    protected void runApplication(String[] args) throws Exception {
        Configuration effectiveConfiguration;
        ProgramOptions programOptions;
        LOG.info("Running 'run-application' command.");
        Options commandOptions = CliFrontendParser.getRunCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, true);
        if (commandLine.hasOption(CliFrontendParser.HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRunApplication(this.customCommandLines);
            return;
        }
        CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(Preconditions.checkNotNull(commandLine));
        ApplicationClusterDeployer deployer = new ApplicationClusterDeployer(this.clusterClientServiceLoader);
        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            effectiveConfiguration = this.getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, Collections.emptyList());
        } else {
            programOptions = new ProgramOptions(commandLine);
            programOptions.validate();
            URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            effectiveConfiguration = this.getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
        }
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");
        Options commandOptions = CliFrontendParser.getRunCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, true);
        if (commandLine.hasOption(CliFrontendParser.HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(this.customCommandLines);
            return;
        }
        CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(Preconditions.checkNotNull(commandLine));
        ProgramOptions programOptions = ProgramOptions.create(commandLine);
        List<URL> jobJars = this.getJobJarAndDependencies(programOptions);
        Configuration effectiveConfiguration = this.getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
        LOG.debug("Effective executor configuration: {}", (Object)effectiveConfiguration);
        try (PackagedProgram program = this.getPackagedProgram(programOptions, effectiveConfiguration);){
            this.executeProgram(effectiveConfiguration, program);
        }
    }

    private List<URL> getJobJarAndDependencies(ProgramOptions programOptions) throws CliArgsException {
        String entryPointClass = programOptions.getEntryPointClassName();
        String jarFilePath = programOptions.getJarFilePath();
        try {
            File jarFile = jarFilePath != null ? this.getJarFile(jarFilePath) : null;
            return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
        }
        catch (FileNotFoundException | ProgramInvocationException e) {
            throw new CliArgsException("Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
        }
    }

    private PackagedProgram getPackagedProgram(ProgramOptions programOptions, Configuration effectiveConfiguration) throws ProgramInvocationException, CliArgsException {
        PackagedProgram program;
        try {
            LOG.info("Building program from JAR file");
            program = this.buildProgram(programOptions, effectiveConfiguration);
        }
        catch (FileNotFoundException e) {
            throw new CliArgsException("Could not build the program from JAR file: " + e.getMessage(), e);
        }
        return program;
    }

    private <T> Configuration getEffectiveConfiguration(CustomCommandLine activeCustomCommandLine, CommandLine commandLine) throws FlinkException {
        Configuration effectiveConfiguration = new Configuration(this.configuration);
        Configuration commandLineConfiguration = Preconditions.checkNotNull(activeCustomCommandLine).toConfiguration(commandLine);
        effectiveConfiguration.addAll(commandLineConfiguration);
        return effectiveConfiguration;
    }

    private <T> Configuration getEffectiveConfiguration(CustomCommandLine activeCustomCommandLine, CommandLine commandLine, ProgramOptions programOptions, List<T> jobJars) throws FlinkException {
        Configuration effectiveConfiguration = this.getEffectiveConfiguration(activeCustomCommandLine, commandLine);
        ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(Preconditions.checkNotNull(programOptions), Preconditions.checkNotNull(jobJars));
        executionParameters.applyToConfiguration(effectiveConfiguration);
        LOG.debug("Effective configuration after Flink conf, custom commandline, and program options: {}", (Object)effectiveConfiguration);
        return effectiveConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void info(String[] args) throws Exception {
        LOG.info("Running 'info' command.");
        Options commandOptions = CliFrontendParser.getInfoCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, true);
        ProgramOptions programOptions = ProgramOptions.create(commandLine);
        if (commandLine.hasOption(CliFrontendParser.HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForInfo();
            return;
        }
        LOG.info("Building program from JAR file");
        try (PackagedProgram program = null;){
            LOG.info("Creating program plan dump");
            CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(Preconditions.checkNotNull(commandLine));
            Configuration effectiveConfiguration = this.getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, this.getJobJarAndDependencies(programOptions));
            program = this.buildProgram(programOptions, effectiveConfiguration);
            int parallelism = programOptions.getParallelism();
            if (-1 == parallelism) {
                parallelism = this.getDefaultParallelism(effectiveConfiguration);
            }
            Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
            String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(program.getUserCodeClassLoader(), pipeline);
            if (jsonPlan != null) {
                System.out.println("----------------------- Execution Plan -----------------------");
                System.out.println(jsonPlan);
                System.out.println("--------------------------------------------------------------");
            } else {
                System.out.println("JSON plan could not be generated.");
            }
            String description = program.getDescription();
            if (description != null) {
                System.out.println();
                System.out.println(description);
            } else {
                System.out.println();
                System.out.println("No description provided.");
            }
        }
    }

    protected void list(String[] args) throws Exception {
        boolean showAll;
        boolean showScheduled;
        boolean showRunning;
        LOG.info("Running 'list' command.");
        Options commandOptions = CliFrontendParser.getListCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, false);
        ListOptions listOptions = new ListOptions(commandLine);
        if (listOptions.isPrintHelp()) {
            CliFrontendParser.printHelpForList(this.customCommandLines);
            return;
        }
        if (!(listOptions.showRunning() || listOptions.showScheduled() || listOptions.showAll())) {
            showRunning = true;
            showScheduled = true;
            showAll = false;
        } else {
            showRunning = listOptions.showRunning();
            showScheduled = listOptions.showScheduled();
            showAll = listOptions.showAll();
        }
        CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(commandLine);
        this.runClusterAction(activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> this.listJobs(clusterClient, showRunning, showScheduled, showAll));
    }

    private <ClusterID> void listJobs(ClusterClient<ClusterID> clusterClient, boolean showRunning, boolean showScheduled, boolean showAll) throws FlinkException {
        Collection<JobStatusMessage> jobDetails;
        try {
            CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
            CliFrontend.logAndSysout("Waiting for response...");
            jobDetails = jobDetailsFuture.get();
        }
        catch (Exception e) {
            Throwable cause = ExceptionUtils.stripExecutionException(e);
            throw new FlinkException("Failed to retrieve job list.", cause);
        }
        LOG.info("Successfully retrieved list of jobs");
        ArrayList<JobStatusMessage> runningJobs = new ArrayList<JobStatusMessage>();
        ArrayList<JobStatusMessage> scheduledJobs = new ArrayList<JobStatusMessage>();
        ArrayList<JobStatusMessage> terminatedJobs = new ArrayList<JobStatusMessage>();
        jobDetails.forEach(details -> {
            if (details.getJobState() == JobStatus.CREATED || details.getJobState() == JobStatus.INITIALIZING) {
                scheduledJobs.add((JobStatusMessage)details);
            } else if (!details.getJobState().isGloballyTerminalState()) {
                runningJobs.add((JobStatusMessage)details);
            } else {
                terminatedJobs.add((JobStatusMessage)details);
            }
        });
        if (showRunning || showAll) {
            if (runningJobs.size() == 0) {
                System.out.println("No running jobs.");
            } else {
                System.out.println("------------------ Running/Restarting Jobs -------------------");
                CliFrontend.printJobStatusMessages(runningJobs);
                System.out.println("--------------------------------------------------------------");
            }
        }
        if (showScheduled || showAll) {
            if (scheduledJobs.size() == 0) {
                System.out.println("No scheduled jobs.");
            } else {
                System.out.println("----------------------- Scheduled Jobs -----------------------");
                CliFrontend.printJobStatusMessages(scheduledJobs);
                System.out.println("--------------------------------------------------------------");
            }
        }
        if (showAll && terminatedJobs.size() != 0) {
            System.out.println("---------------------- Terminated Jobs -----------------------");
            CliFrontend.printJobStatusMessages(terminatedJobs);
            System.out.println("--------------------------------------------------------------");
        }
    }

    private static void printJobStatusMessages(List<JobStatusMessage> jobs) {
        SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
        CliFrontend.sortJobStatusMessages(jobs).forEachOrdered(job -> System.out.println(dateFormat.format(new Date(job.getStartTime())) + " : " + job.getJobId() + " : " + job.getJobName() + " (" + (Object)((Object)job.getJobState()) + ")"));
    }

    @VisibleForTesting
    static Stream<JobStatusMessage> sortJobStatusMessages(List<JobStatusMessage> jobs) {
        Comparator statusComparator = (o1, o2) -> String.CASE_INSENSITIVE_ORDER.compare(((JobStatus)((Object)((Object)o1.getKey()))).toString(), ((JobStatus)((Object)((Object)o2.getKey()))).toString());
        Map<JobStatus, List<JobStatusMessage>> jobsByState = jobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState));
        return jobsByState.entrySet().stream().sorted(statusComparator).map(Map.Entry::getValue).flatMap(Collection::stream).sorted(Comparator.comparing(JobStatusMessage::getStartTime));
    }

    protected void stop(String[] args) throws Exception {
        LOG.info("Running 'stop-with-savepoint' command.");
        Options commandOptions = CliFrontendParser.getStopCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, false);
        StopOptions stopOptions = new StopOptions(commandLine);
        if (stopOptions.isPrintHelp()) {
            CliFrontendParser.printHelpForStop(this.customCommandLines);
            return;
        }
        String[] cleanedArgs = stopOptions.getArgs();
        String targetDirectory = stopOptions.hasSavepointFlag() && cleanedArgs.length > 0 ? stopOptions.getTargetDirectory() : null;
        JobID jobId = cleanedArgs.length != 0 ? this.parseJobId(cleanedArgs[0]) : this.parseJobId(stopOptions.getTargetDirectory());
        boolean advanceToEndOfEventTime = stopOptions.shouldAdvanceToEndOfEventTime();
        SavepointFormatType formatType = stopOptions.getFormatType();
        CliFrontend.logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a " + formatType + " savepoint.");
        CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(commandLine);
        this.runClusterAction(activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> {
            String savepointPath;
            try {
                savepointPath = clusterClient.stopWithSavepoint(jobId, advanceToEndOfEventTime, targetDirectory, formatType).get(this.getClientTimeout(effectiveConfiguration).toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                throw new FlinkException("Could not stop with a savepoint job \"" + jobId + "\".", e);
            }
            CliFrontend.logAndSysout("Savepoint completed. Path: " + savepointPath);
        });
    }

    protected void cancel(String[] args) throws Exception {
        LOG.info("Running 'cancel' command.");
        Options commandOptions = CliFrontendParser.getCancelCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, false);
        CancelOptions cancelOptions = new CancelOptions(commandLine);
        if (cancelOptions.isPrintHelp()) {
            CliFrontendParser.printHelpForCancel(this.customCommandLines);
            return;
        }
        CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(commandLine);
        String[] cleanedArgs = cancelOptions.getArgs();
        if (cancelOptions.isWithSavepoint()) {
            String targetDirectory;
            JobID jobId;
            CliFrontend.logAndSysout("DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use \"stop\" instead.");
            if (cleanedArgs.length > 0) {
                jobId = this.parseJobId(cleanedArgs[0]);
                targetDirectory = cancelOptions.getSavepointTargetDirectory();
            } else {
                jobId = this.parseJobId(cancelOptions.getSavepointTargetDirectory());
                targetDirectory = null;
            }
            SavepointFormatType formatType = cancelOptions.getFormatType();
            if (targetDirectory == null) {
                CliFrontend.logAndSysout("Cancelling job " + jobId + " with " + formatType + " savepoint to default savepoint directory.");
            } else {
                CliFrontend.logAndSysout("Cancelling job " + jobId + " with " + formatType + " savepoint to " + targetDirectory + '.');
            }
            this.runClusterAction(activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> {
                String savepointPath;
                try {
                    savepointPath = clusterClient.cancelWithSavepoint(jobId, targetDirectory, formatType).get(this.getClientTimeout(effectiveConfiguration).toMillis(), TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    throw new FlinkException("Could not cancel job " + jobId + '.', e);
                }
                CliFrontend.logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.');
            });
        } else {
            if (cleanedArgs.length <= 0) {
                throw new CliArgsException("Missing JobID. Specify a JobID to cancel a job.");
            }
            JobID jobId = this.parseJobId(cleanedArgs[0]);
            CliFrontend.logAndSysout("Cancelling job " + jobId + '.');
            this.runClusterAction(activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> {
                try {
                    clusterClient.cancel(jobId).get(this.getClientTimeout(effectiveConfiguration).toMillis(), TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    throw new FlinkException("Could not cancel job " + jobId + '.', e);
                }
            });
            CliFrontend.logAndSysout("Cancelled job " + jobId + '.');
        }
    }

    public CommandLine getCommandLine(Options commandOptions, String[] args, boolean stopAtNonOptions) throws CliArgsException {
        Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, this.customCommandLineOptions);
        return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions);
    }

    protected void savepoint(String[] args) throws Exception {
        LOG.info("Running 'savepoint' command.");
        Options commandOptions = CliFrontendParser.getSavepointCommandOptions();
        CommandLine commandLine = this.getCommandLine(commandOptions, args, false);
        SavepointOptions savepointOptions = new SavepointOptions(commandLine);
        if (savepointOptions.isPrintHelp()) {
            CliFrontendParser.printHelpForSavepoint(this.customCommandLines);
            return;
        }
        CustomCommandLine activeCommandLine = this.validateAndGetActiveCommandLine(commandLine);
        if (savepointOptions.isDispose()) {
            this.runClusterAction(activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> this.disposeSavepoint(clusterClient, savepointOptions.getSavepointPath(), this.getClientTimeout(effectiveConfiguration)));
        } else {
            String[] cleanedArgs = savepointOptions.getArgs();
            if (cleanedArgs.length < 1) {
                throw new CliArgsException("Missing JobID. Specify a Job ID to trigger a savepoint.");
            }
            String jobIdString = cleanedArgs[0];
            JobID jobId = this.parseJobId(jobIdString);
            String savepointDirectory = cleanedArgs.length >= 2 ? cleanedArgs[1] : null;
            if (cleanedArgs.length >= 3) {
                CliFrontend.logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
            }
            this.runClusterAction(activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> this.triggerSavepoint(clusterClient, jobId, savepointDirectory, savepointOptions.getFormatType(), this.getClientTimeout(effectiveConfiguration)));
        }
    }

    private void triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, String savepointDirectory, SavepointFormatType formatType, Duration clientTimeout) throws FlinkException {
        CliFrontend.logAndSysout("Triggering savepoint for job " + jobId + '.');
        CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory, formatType);
        CliFrontend.logAndSysout("Waiting for response...");
        try {
            String savepointPath = savepointPathFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
            CliFrontend.logAndSysout("Savepoint completed. Path: " + savepointPath);
            CliFrontend.logAndSysout("You can resume your program from this savepoint with the run command.");
        }
        catch (Exception e) {
            Throwable cause = ExceptionUtils.stripExecutionException(e);
            throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
        }
    }

    private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath, Duration clientTimeout) throws FlinkException {
        Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. Usage: bin/flink savepoint -d <savepoint-path>");
        CliFrontend.logAndSysout("Disposing savepoint '" + savepointPath + "'.");
        CompletableFuture<Acknowledge> disposeFuture = clusterClient.disposeSavepoint(savepointPath);
        CliFrontend.logAndSysout("Waiting for response...");
        try {
            disposeFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new FlinkException("Disposing the savepoint '" + savepointPath + "' failed.", e);
        }
        CliFrontend.logAndSysout("Savepoint '" + savepointPath + "' disposed.");
    }

    protected void executeProgram(Configuration configuration, PackagedProgram program) throws ProgramInvocationException {
        ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
    }

    PackagedProgram buildProgram(ProgramOptions runOptions) throws FileNotFoundException, ProgramInvocationException, CliArgsException {
        return this.buildProgram(runOptions, this.configuration);
    }

    PackagedProgram buildProgram(ProgramOptions runOptions, Configuration configuration) throws FileNotFoundException, ProgramInvocationException, CliArgsException {
        runOptions.validate();
        String[] programArgs = runOptions.getProgramArgs();
        String jarFilePath = runOptions.getJarFilePath();
        List<URL> classpaths = runOptions.getClasspaths();
        String entryPointClass = runOptions.getEntryPointClassName();
        File jarFile = jarFilePath != null ? this.getJarFile(jarFilePath) : null;
        return PackagedProgram.newBuilder().setJarFile(jarFile).setUserClassPaths(classpaths).setEntryPointClassName(entryPointClass).setConfiguration(configuration).setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()).setArguments(programArgs).build();
    }

    private File getJarFile(String jarFilePath) throws FileNotFoundException {
        File jarFile = new File(jarFilePath);
        if (!jarFile.exists()) {
            throw new FileNotFoundException("JAR file does not exist: " + jarFile);
        }
        if (!jarFile.isFile()) {
            throw new FileNotFoundException("JAR file is not a file: " + jarFile);
        }
        return jarFile;
    }

    private static int handleArgException(CliArgsException e) {
        LOG.error("Invalid command line arguments.", (Throwable)e);
        System.out.println(e.getMessage());
        System.out.println();
        System.out.println("Use the help option (-h or --help) to get help on the command.");
        return 1;
    }

    private static int handleParametrizationException(ProgramParametrizationException e) {
        LOG.error("Program has not been parametrized properly.", (Throwable)e);
        System.err.println(e.getMessage());
        return 1;
    }

    private static int handleMissingJobException() {
        System.err.println();
        System.err.println("The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.");
        return 1;
    }

    private static int handleError(Throwable t) {
        LOG.error("Error while running the command.", t);
        System.err.println();
        System.err.println("------------------------------------------------------------");
        System.err.println(" The program finished with the following exception:");
        System.err.println();
        if (t.getCause() instanceof InvalidProgramException) {
            StackTraceElement[] trace;
            System.err.println(t.getCause().getMessage());
            for (StackTraceElement ele : trace = t.getCause().getStackTrace()) {
                System.err.println("\t" + ele);
                if (!ele.getMethodName().equals("main")) {
                    continue;
                }
                break;
            }
        } else {
            t.printStackTrace();
        }
        return 1;
    }

    private static void logAndSysout(String message) {
        LOG.info(message);
        System.out.println(message);
    }

    private JobID parseJobId(String jobIdString) throws CliArgsException {
        JobID jobId;
        if (jobIdString == null) {
            throw new CliArgsException("Missing JobId");
        }
        try {
            jobId = JobID.fromHexString(jobIdString);
        }
        catch (IllegalArgumentException e) {
            throw new CliArgsException(e.getMessage());
        }
        return jobId;
    }

    private <ClusterID> void runClusterAction(CustomCommandLine activeCommandLine, CommandLine commandLine, ClusterAction<ClusterID> clusterAction) throws FlinkException {
        Configuration effectiveConfiguration = this.getEffectiveConfiguration(activeCommandLine, commandLine);
        LOG.debug("Effective configuration after Flink conf, and custom commandline: {}", (Object)effectiveConfiguration);
        ClusterClientFactory clusterClientFactory = this.clusterClientServiceLoader.getClusterClientFactory(effectiveConfiguration);
        Object clusterId = clusterClientFactory.getClusterId(effectiveConfiguration);
        if (clusterId == null) {
            throw new FlinkException("No cluster id was specified. Please specify a cluster to which you would like to connect.");
        }
        try (ClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(effectiveConfiguration);
             ClusterClient clusterClient = clusterDescriptor.retrieve(clusterId).getClusterClient();){
            clusterAction.runAction(clusterClient, effectiveConfiguration);
        }
    }

    public int parseAndRun(String[] args) {
        if (args.length < 1) {
            CliFrontendParser.printHelp(this.customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }
        String action = args[0];
        String[] params = Arrays.copyOfRange(args, 1, args.length);
        try {
            switch (action) {
                case "run": {
                    this.run(params);
                    return 0;
                }
                case "run-application": {
                    this.runApplication(params);
                    return 0;
                }
                case "list": {
                    this.list(params);
                    return 0;
                }
                case "info": {
                    this.info(params);
                    return 0;
                }
                case "cancel": {
                    this.cancel(params);
                    return 0;
                }
                case "stop": {
                    this.stop(params);
                    return 0;
                }
                case "savepoint": {
                    this.savepoint(params);
                    return 0;
                }
                case "-h": 
                case "--help": {
                    CliFrontendParser.printHelp(this.customCommandLines);
                    return 0;
                }
                case "-v": 
                case "--version": {
                    String version = EnvironmentInformation.getVersion();
                    String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                    System.out.print("Version: " + version);
                    System.out.println(commitID.equals("<unknown>") ? "" : ", Commit ID: " + commitID);
                    return 0;
                }
            }
            System.out.printf("\"%s\" is not a valid action.\n", action);
            System.out.println();
            System.out.println("Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
            System.out.println();
            System.out.println("Specify the version option (-v or --version) to print Flink version.");
            System.out.println();
            System.out.println("Specify the help option (-h or --help) to get help on the command.");
            return 1;
        }
        catch (CliArgsException ce) {
            return CliFrontend.handleArgException(ce);
        }
        catch (ProgramParametrizationException ppe) {
            return CliFrontend.handleParametrizationException(ppe);
        }
        catch (ProgramMissingJobException pmje) {
            return CliFrontend.handleMissingJobException();
        }
        catch (Exception e) {
            return CliFrontend.handleError(e);
        }
    }

    public static void main(String[] args) {
        int retCode = 31;
        try {
            retCode = CliFrontend.mainInternal(args);
        }
        finally {
            System.exit(retCode);
        }
    }

    @VisibleForTesting
    static int mainInternal(String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
        String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
        Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
        List<CustomCommandLine> customCommandLines = CliFrontend.loadCustomCommandLines(configuration, configurationDirectory);
        int retCode = 31;
        try {
            CliFrontend cli = new CliFrontend(configuration, customCommandLines);
            CommandLine commandLine = cli.getCommandLine(new Options(), Arrays.copyOfRange(args, Math.min(args.length, 1), args.length), true);
            Configuration securityConfig = new Configuration(cli.configuration);
            DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
            SecurityUtils.install(new SecurityConfiguration(securityConfig));
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        }
        catch (Throwable t) {
            Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        }
        return retCode;
    }

    public static String getConfigurationDirectoryFromEnv() {
        String location = System.getenv("FLINK_CONF_DIR");
        if (location != null) {
            if (new File(location).exists()) {
                return location;
            }
            throw new RuntimeException("The configuration directory '" + location + "', specified in the '" + "FLINK_CONF_DIR" + "' environment variable, does not exist.");
        }
        if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_1;
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_2;
        } else {
            throw new RuntimeException("The configuration directory was not specified. Please specify the directory containing the configuration file through the 'FLINK_CONF_DIR' environment variable.");
        }
        return location;
    }

    static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
        config.setString(JobManagerOptions.ADDRESS, address.getHostString());
        config.setInteger(JobManagerOptions.PORT, address.getPort());
        config.setString(RestOptions.ADDRESS, address.getHostString());
        config.setInteger(RestOptions.PORT, address.getPort());
    }

    public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
        ArrayList<CustomCommandLine> customCommandLines = new ArrayList<CustomCommandLine>();
        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
        String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {
            customCommandLines.add(CliFrontend.loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", configuration, configurationDirectory, "y", "yarn"));
        }
        catch (Exception | NoClassDefFoundError e) {
            String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try {
                LOG.info("Loading FallbackYarnSessionCli");
                customCommandLines.add(CliFrontend.loadCustomCommandLine("org.apache.flink.yarn.cli.FallbackYarnSessionCli", configuration));
            }
            catch (Exception exception) {
                LOG.warn("Could not load CLI class {}.", (Object)"org.apache.flink.yarn.cli.FlinkYarnSessionCli", (Object)e);
            }
        }
        customCommandLines.add(new DefaultCLI());
        return customCommandLines;
    }

    public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
        LOG.debug("Custom commandlines: {}", this.customCommandLines);
        for (CustomCommandLine cli : this.customCommandLines) {
            LOG.debug("Checking custom commandline {}, isActive: {}", (Object)cli, (Object)cli.isActive(commandLine));
            if (!cli.isActive(commandLine)) continue;
            return cli;
        }
        throw new IllegalStateException("No valid command-line found.");
    }

    private static CustomCommandLine loadCustomCommandLine(String className, Object ... params) throws Exception {
        Class<CustomCommandLine> customCliClass = Class.forName(className).asSubclass(CustomCommandLine.class);
        Class[] types = new Class[params.length];
        for (int i = 0; i < params.length; ++i) {
            Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
            types[i] = params[i].getClass();
        }
        Constructor<CustomCommandLine> constructor = customCliClass.getConstructor(types);
        return constructor.newInstance(params);
    }

    private Duration getClientTimeout(Configuration effectiveConfiguration) {
        return effectiveConfiguration.get(ClientOptions.CLIENT_TIMEOUT);
    }

    private int getDefaultParallelism(Configuration effectiveConfiguration) {
        return effectiveConfiguration.get(CoreOptions.DEFAULT_PARALLELISM);
    }

    @FunctionalInterface
    private static interface ClusterAction<ClusterID> {
        public void runAction(ClusterClient<ClusterID> var1, Configuration var2) throws FlinkException;
    }
}

