/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class ExecutionFailureHandler {
    private final SchedulingTopology schedulingTopology;
    private final FailoverStrategy failoverStrategy;
    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
    private long numberOfRestarts;
    private final FailureEnricher.Context taskFailureCtx;
    private final FailureEnricher.Context globalFailureCtx;
    private final Collection<FailureEnricher> failureEnrichers;
    private final ComponentMainThreadExecutor mainThreadExecutor;

    public ExecutionFailureHandler(SchedulingTopology schedulingTopology, FailoverStrategy failoverStrategy, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ComponentMainThreadExecutor mainThreadExecutor, Collection<FailureEnricher> failureEnrichers, FailureEnricher.Context taskFailureCtx, FailureEnricher.Context globalFailureCtx) {
        this.schedulingTopology = Preconditions.checkNotNull(schedulingTopology);
        this.failoverStrategy = Preconditions.checkNotNull(failoverStrategy);
        this.restartBackoffTimeStrategy = Preconditions.checkNotNull(restartBackoffTimeStrategy);
        this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
        this.failureEnrichers = Preconditions.checkNotNull(failureEnrichers);
        this.taskFailureCtx = taskFailureCtx;
        this.globalFailureCtx = globalFailureCtx;
    }

    public FailureHandlingResult getFailureHandlingResult(Execution failedExecution, Throwable cause, long timestamp) {
        return this.handleFailure(failedExecution, cause, timestamp, this.failoverStrategy.getTasksNeedingRestart(failedExecution.getVertex().getID(), cause), false);
    }

    public FailureHandlingResult getGlobalFailureHandlingResult(Throwable cause, long timestamp) {
        return this.handleFailure(null, cause, timestamp, IterableUtils.toStream(this.schedulingTopology.getVertices()).map(Vertex::getId).collect(Collectors.toSet()), true);
    }

    private CompletableFuture<Map<String, String>> labelFailure(Throwable cause, boolean isGlobal) {
        if (this.failureEnrichers.isEmpty()) {
            return FailureEnricherUtils.EMPTY_FAILURE_LABELS;
        }
        FailureEnricher.Context ctx = isGlobal ? this.globalFailureCtx : this.taskFailureCtx;
        return FailureEnricherUtils.labelFailure(cause, ctx, (Executor)this.mainThreadExecutor, this.failureEnrichers);
    }

    private FailureHandlingResult handleFailure(@Nullable Execution failedExecution, Throwable cause, long timestamp, Set<ExecutionVertexID> verticesToRestart, boolean globalFailure) {
        CompletableFuture<Map<String, String>> failureLabels = this.labelFailure(cause, globalFailure);
        if (ExecutionFailureHandler.isUnrecoverableError(cause)) {
            return FailureHandlingResult.unrecoverable(failedExecution, new JobException("The failure is not recoverable", cause), timestamp, failureLabels, globalFailure);
        }
        this.restartBackoffTimeStrategy.notifyFailure(cause);
        if (this.restartBackoffTimeStrategy.canRestart()) {
            ++this.numberOfRestarts;
            return FailureHandlingResult.restartable(failedExecution, cause, timestamp, failureLabels, verticesToRestart, this.restartBackoffTimeStrategy.getBackoffTime(), globalFailure);
        }
        return FailureHandlingResult.unrecoverable(failedExecution, new JobException("Recovery is suppressed by " + this.restartBackoffTimeStrategy, cause), timestamp, failureLabels, globalFailure);
    }

    public static boolean isUnrecoverableError(Throwable cause) {
        Optional<Throwable> unrecoverableError = ThrowableClassifier.findThrowableOfThrowableType(cause, ThrowableType.NonRecoverableError);
        return unrecoverableError.isPresent();
    }

    public long getNumberOfRestarts() {
        return this.numberOfRestarts;
    }
}

