package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.class */
public class StopWithSavepoint extends StateWithExecutionGraph {
    private final Context context;
    private final ClassLoader userCodeClassLoader;
    private final CompletableFuture<String> operationFuture;
    private final CheckpointScheduling checkpointScheduling;
    private boolean hasFullyFinished;

    @Nullable
    private String savepoint;

    @Nullable
    private Throwable operationFailureCause;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint$Context.class */
    public interface Context extends StateWithExecutionGraph.Context {
        Executing.FailureResult howToHandleFailure(Throwable th);

        void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler);

        void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration duration);

        void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable th);

        void goToExecuting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler);

        ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration);
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint$Factory.class */
    static class Factory implements StateFactory<StopWithSavepoint> {
        private final Context context;
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final CheckpointScheduling checkpointScheduling;
        private final Logger logger;
        private final ClassLoader userCodeClassLoader;
        private final CompletableFuture<String> savepointFuture;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, Logger logger, ClassLoader classLoader, CompletableFuture<String> completableFuture) {
            this.context = context;
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.checkpointScheduling = checkpointScheduling;
            this.logger = logger;
            this.userCodeClassLoader = classLoader;
            this.savepointFuture = completableFuture;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public Class<StopWithSavepoint> getStateClass() {
            return StopWithSavepoint.class;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public StopWithSavepoint getState() {
            return new StopWithSavepoint(this.context, this.executionGraph, this.executionGraphHandler, this.operatorCoordinatorHandler, this.checkpointScheduling, this.logger, this.userCodeClassLoader, this.savepointFuture);
        }
    }

    StopWithSavepoint(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, Logger logger, ClassLoader classLoader, CompletableFuture<String> completableFuture) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
        this.hasFullyFinished = false;
        this.savepoint = null;
        this.context = context;
        this.userCodeClassLoader = classLoader;
        this.checkpointScheduling = checkpointScheduling;
        this.operationFuture = new CompletableFuture<>();
        FutureUtils.assertNoException(completableFuture.handle((str, th) -> {
            context.runIfState(this, () -> {
                handleSavepointCompletion(str, th);
            }, Duration.ZERO);
            return null;
        }));
    }

    private void handleSavepointCompletion(@Nullable String str, @Nullable Throwable th) {
        if (this.hasFullyFinished) {
            Preconditions.checkState(th == null, "A savepoint should never fail after a job has been terminated via stop-with-savepoint.");
            completeOperationAndGoToFinished(str);
        } else {
            if (th == null) {
                this.savepoint = str;
                return;
            }
            this.operationFailureCause = th;
            this.checkpointScheduling.startCheckpointScheduler();
            this.context.goToExecuting(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler());
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph, org.apache.flink.runtime.scheduler.adaptive.State
    public void onLeave(Class<? extends State> cls) {
        this.operationFuture.completeExceptionally(new FlinkException("Stop with savepoint operation could not be completed.", this.operationFailureCause));
        super.onLeave(cls);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void cancel() {
        this.context.goToCanceling(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public JobStatus getJobStatus() {
        return JobStatus.RUNNING;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void handleGlobalFailure(Throwable th) {
        handleAnyFailure(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph
    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) {
        boolean updateState = getExecutionGraph().updateState(taskExecutionStateTransition);
        if (updateState && taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) {
            FlinkException error = taskExecutionStateTransition.getError(this.userCodeClassLoader);
            handleAnyFailure(error == null ? new FlinkException("Unknown failure cause. Probably related to FLINK-21376.") : error);
        }
        return updateState;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph
    void onGloballyTerminalState(JobStatus jobStatus) {
        if (jobStatus != JobStatus.FINISHED) {
            handleAnyFailure(new FlinkException("Job did not reach the FINISHED state while performing stop-with-savepoint."));
        } else if (this.savepoint == null) {
            this.hasFullyFinished = true;
        } else {
            completeOperationAndGoToFinished(this.savepoint);
        }
    }

    private void completeOperationAndGoToFinished(String str) {
        this.operationFuture.complete(str);
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
    }

    private void handleAnyFailure(Throwable th) {
        this.operationFailureCause = th;
        Executing.FailureResult howToHandleFailure = this.context.howToHandleFailure(th);
        if (howToHandleFailure.canRestart()) {
            getLogger().info("Restarting job.", howToHandleFailure.getFailureCause());
            this.context.goToRestarting(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), howToHandleFailure.getBackoffTime());
        } else {
            getLogger().info("Failing job.", howToHandleFailure.getFailureCause());
            this.context.goToFailing(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), howToHandleFailure.getFailureCause());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<String> getOperationFuture() {
        return this.operationFuture;
    }
}
