/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.groups.OperatorCoordinatorMetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorCoordinatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.ExecutionSubtaskAccess;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OperatorCoordinatorHolder
implements OperatorCoordinatorCheckpointContext,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinatorHolder.class);
    private final OperatorCoordinator coordinator;
    private final OperatorID operatorId;
    private final LazyInitializedCoordinatorContext context;
    private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
    private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap;
    private final IncompleteFuturesTracker unconfirmedEvents;
    private final int operatorParallelism;
    private final int operatorMaxParallelism;
    private GlobalFailureHandler globalFailureHandler;
    private ComponentMainThreadExecutor mainThreadExecutor;

    private OperatorCoordinatorHolder(OperatorID operatorId, OperatorCoordinator coordinator, LazyInitializedCoordinatorContext context, SubtaskAccess.SubtaskAccessFactory taskAccesses, int operatorParallelism, int operatorMaxParallelism) {
        this.operatorId = Preconditions.checkNotNull(operatorId);
        this.coordinator = Preconditions.checkNotNull(coordinator);
        this.context = Preconditions.checkNotNull(context);
        this.taskAccesses = Preconditions.checkNotNull(taskAccesses);
        this.operatorParallelism = operatorParallelism;
        this.operatorMaxParallelism = operatorMaxParallelism;
        this.subtaskGatewayMap = new HashMap<Integer, SubtaskGatewayImpl>();
        this.unconfirmedEvents = new IncompleteFuturesTracker();
    }

    public void lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) {
        this.globalFailureHandler = globalFailureHandler;
        this.mainThreadExecutor = mainThreadExecutor;
        this.context.lazyInitialize(globalFailureHandler, (Executor)mainThreadExecutor);
        this.setupAllSubtaskGateways();
    }

    public OperatorCoordinator coordinator() {
        return this.coordinator;
    }

    @Override
    public OperatorID operatorId() {
        return this.operatorId;
    }

    @Override
    public int maxParallelism() {
        return this.operatorMaxParallelism;
    }

    @Override
    public int currentParallelism() {
        return this.operatorParallelism;
    }

    public void start() throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        Preconditions.checkState(this.context.isInitialized(), "Coordinator Context is not yet initialized");
        this.coordinator.start();
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
        this.context.unInitialize();
    }

    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (event instanceof AcknowledgeCheckpointEvent) {
            this.subtaskGatewayMap.get(subtask).openGatewayAndUnmarkCheckpoint(((AcknowledgeCheckpointEvent)event).getCheckpointID());
            return;
        }
        this.coordinator.handleEventFromOperator(subtask, attemptNumber, event);
    }

    public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.executionAttemptFailed(subtask, attemptNumber, reason);
    }

    @Override
    public void subtaskReset(int subtask, long checkpointId) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.setupSubtaskGateway(subtask);
        this.coordinator.subtaskReset(subtask, checkpointId);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.mainThreadExecutor.execute(() -> this.checkpointCoordinatorInternal(checkpointId, result));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.mainThreadExecutor.execute(() -> {
            this.subtaskGatewayMap.values().forEach(x -> x.openGatewayAndUnmarkCheckpoint(checkpointId));
            this.coordinator.notifyCheckpointComplete(checkpointId);
        });
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) {
        this.mainThreadExecutor.execute(() -> {
            this.subtaskGatewayMap.values().forEach(x -> x.openGatewayAndUnmarkCheckpoint(checkpointId));
            this.coordinator.notifyCheckpointAborted(checkpointId);
        });
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        if (this.mainThreadExecutor != null) {
            this.mainThreadExecutor.assertRunningInMainThread();
        }
        this.subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint);
        this.context.resetFailed();
        if (this.mainThreadExecutor != null) {
            this.setupAllSubtaskGateways();
        }
        this.coordinator.resetToCheckpoint(checkpointId, checkpointData);
    }

    private void checkpointCoordinatorInternal(long checkpointId, CompletableFuture<byte[]> result) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<byte[]>();
        FutureUtils.assertNoException(coordinatorCheckpoint.handleAsync((success, failure) -> {
            if (failure != null) {
                result.completeExceptionally((Throwable)failure);
            } else if (this.closeGateways(checkpointId)) {
                this.completeCheckpointOnceEventsAreDone(checkpointId, result, (byte[])success);
            } else {
                result.completeExceptionally(new FlinkException("Cannot close gateway"));
            }
            return null;
        }, (Executor)this.mainThreadExecutor));
        try {
            this.subtaskGatewayMap.forEach((subtask, gateway) -> gateway.markForCheckpoint(checkpointId));
            this.coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
            result.completeExceptionally(t);
            this.globalFailureHandler.handleGlobalFailure(t);
        }
    }

    private boolean closeGateways(long checkpointId) {
        int closedGateways = 0;
        for (SubtaskGatewayImpl gateway : this.subtaskGatewayMap.values()) {
            if (!gateway.tryCloseGateway(checkpointId)) continue;
            ++closedGateways;
        }
        if (closedGateways != 0 && closedGateways != this.subtaskGatewayMap.values().size()) {
            throw new IllegalStateException("Some subtask gateway can be closed while others cannot. There might be a bug here.");
        }
        return closedGateways != 0;
    }

    private void completeCheckpointOnceEventsAreDone(long checkpointId, CompletableFuture<byte[]> checkpointFuture, byte[] checkpointResult) {
        Collection<CompletableFuture<?>> pendingEvents = this.unconfirmedEvents.getCurrentIncompleteAndReset();
        if (pendingEvents.isEmpty()) {
            checkpointFuture.complete(checkpointResult);
            return;
        }
        LOG.info("Coordinator checkpoint {} for coordinator {} is awaiting {} pending events", new Object[]{checkpointId, this.operatorId, pendingEvents.size()});
        FutureUtils.ConjunctFuture<Void> conjunct = FutureUtils.waitForAll(pendingEvents);
        conjunct.whenComplete((success, failure) -> {
            if (failure == null) {
                checkpointFuture.complete(checkpointResult);
            } else {
                checkpointFuture.completeExceptionally(new FlinkException("Failing OperatorCoordinator checkpoint because some OperatorEvents before this checkpoint barrier were not received by the target tasks."));
            }
        });
    }

    @Override
    public void abortCurrentTriggering() {
        this.mainThreadExecutor.execute(() -> this.subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkLastCheckpointIfAny));
    }

    private void setupAllSubtaskGateways() {
        for (int i = 0; i < this.operatorParallelism; ++i) {
            this.setupSubtaskGateway(i);
        }
    }

    private void setupSubtaskGateway(int subtask) {
        for (SubtaskAccess sta : this.taskAccesses.getAccessesForSubtask(subtask)) {
            this.setupSubtaskGateway(sta);
        }
    }

    public void setupSubtaskGatewayForAttempts(int subtask, Set<Integer> attemptNumbers) {
        for (int attemptNumber : attemptNumbers) {
            this.setupSubtaskGateway(this.taskAccesses.getAccessForAttempt(subtask, attemptNumber));
        }
    }

    private void setupSubtaskGateway(SubtaskAccess sta) {
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(sta, this.mainThreadExecutor, this.unconfirmedEvents);
        if (!this.context.isConcurrentExecutionAttemptsSupported()) {
            this.subtaskGatewayMap.put(gateway.getSubtask(), gateway);
        }
        FutureUtils.assertNoException(sta.hasSwitchedToRunning().thenAccept(ignored -> {
            this.mainThreadExecutor.assertRunningInMainThread();
            if (sta.isStillRunning()) {
                this.notifySubtaskReady(gateway);
            }
        }));
    }

    private void notifySubtaskReady(OperatorCoordinator.SubtaskGateway gateway) {
        try {
            this.coordinator.executionAttemptReady(gateway.getSubtask(), gateway.getExecution().getAttemptNumber(), gateway);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
            this.globalFailureHandler.handleGlobalFailure(new FlinkException("Error from OperatorCoordinator", t));
        }
    }

    public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader, CoordinatorStore coordinatorStore, boolean supportsConcurrentExecutionAttempts, TaskInformation taskInformation, JobManagerJobMetricGroup metricGroup) throws Exception {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader);){
            OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
            OperatorID opId = provider.getOperatorId();
            ExecutionSubtaskAccess.ExecutionJobVertexSubtaskAccess taskAccesses = new ExecutionSubtaskAccess.ExecutionJobVertexSubtaskAccess(jobVertex, opId);
            OperatorCoordinatorHolder operatorCoordinatorHolder = OperatorCoordinatorHolder.create(opId, provider, coordinatorStore, jobVertex.getName(), jobVertex.getGraph().getUserClassLoader(), jobVertex.getParallelism(), jobVertex.getMaxParallelism(), taskAccesses, supportsConcurrentExecutionAttempts, taskInformation, metricGroup);
            return operatorCoordinatorHolder;
        }
    }

    @VisibleForTesting
    static OperatorCoordinatorHolder create(OperatorID opId, OperatorCoordinator.Provider coordinatorProvider, CoordinatorStore coordinatorStore, String operatorName, ClassLoader userCodeClassLoader, int operatorParallelism, int operatorMaxParallelism, SubtaskAccess.SubtaskAccessFactory taskAccesses, boolean supportsConcurrentExecutionAttempts, TaskInformation taskInformation, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        JobManagerOperatorMetricGroup parentMetricGroup = jobManagerJobMetricGroup.getOrAddOperator(taskInformation.getJobVertexId(), taskInformation.getTaskName(), opId, operatorName);
        LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, operatorName, userCodeClassLoader, operatorParallelism, coordinatorStore, supportsConcurrentExecutionAttempts, new InternalOperatorCoordinatorMetricGroup(parentMetricGroup));
        OperatorCoordinator coordinator = coordinatorProvider.create(context);
        return new OperatorCoordinatorHolder(opId, coordinator, context, taskAccesses, operatorParallelism, operatorMaxParallelism);
    }

    private static final class LazyInitializedCoordinatorContext
    implements OperatorCoordinator.Context {
        private static final Logger LOG = LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class);
        private final OperatorID operatorId;
        private final String operatorName;
        private final ClassLoader userCodeClassLoader;
        private final int operatorParallelism;
        private final CoordinatorStore coordinatorStore;
        private final boolean supportsConcurrentExecutionAttempts;
        private final OperatorCoordinatorMetricGroup metricGroup;
        private GlobalFailureHandler globalFailureHandler;
        private Executor schedulerExecutor;
        private volatile boolean failed;

        public LazyInitializedCoordinatorContext(OperatorID operatorId, String operatorName, ClassLoader userCodeClassLoader, int operatorParallelism, CoordinatorStore coordinatorStore, boolean supportsConcurrentExecutionAttempts, OperatorCoordinatorMetricGroup metricGroup) {
            this.operatorId = Preconditions.checkNotNull(operatorId);
            this.operatorName = Preconditions.checkNotNull(operatorName);
            this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
            this.operatorParallelism = operatorParallelism;
            this.coordinatorStore = Preconditions.checkNotNull(coordinatorStore);
            this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
            this.metricGroup = Preconditions.checkNotNull(metricGroup);
        }

        void lazyInitialize(GlobalFailureHandler globalFailureHandler, Executor schedulerExecutor) {
            this.globalFailureHandler = Preconditions.checkNotNull(globalFailureHandler);
            this.schedulerExecutor = Preconditions.checkNotNull(schedulerExecutor);
        }

        void unInitialize() {
            this.globalFailureHandler = null;
            this.schedulerExecutor = null;
        }

        boolean isInitialized() {
            return this.schedulerExecutor != null;
        }

        private void checkInitialized() {
            Preconditions.checkState(this.isInitialized(), "Context was not yet initialized");
        }

        void resetFailed() {
            this.failed = false;
        }

        @Override
        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        @Override
        public OperatorCoordinatorMetricGroup metricGroup() {
            return this.metricGroup;
        }

        @Override
        public void failJob(Throwable cause) {
            this.checkInitialized();
            FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" + this.operatorName + "' (operator " + this.operatorId + ").", cause);
            if (this.failed) {
                LOG.debug("Ignoring the request to fail job because the job is already failing. The ignored failure cause is", (Throwable)e);
                return;
            }
            this.failed = true;
            this.schedulerExecutor.execute(() -> this.globalFailureHandler.handleGlobalFailure(e));
        }

        @Override
        public int currentParallelism() {
            return this.operatorParallelism;
        }

        @Override
        public ClassLoader getUserCodeClassloader() {
            return this.userCodeClassLoader;
        }

        @Override
        public CoordinatorStore getCoordinatorStore() {
            return this.coordinatorStore;
        }

        @Override
        public boolean isConcurrentExecutionAttemptsSupported() {
            return this.supportsConcurrentExecutionAttempts;
        }
    }
}

