/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job.checkpoints;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;

public class CheckpointHandlers {
    private static RestHandlerException createInternalServerError(Throwable throwable, AsynchronousJobOperationKey key, String errorMessageInfix) {
        return new RestHandlerException(String.format("Internal server error while %s checkpoint operation with triggerId=%s for job %s.", errorMessageInfix, key.getTriggerId(), key.getJobId()), HttpResponseStatus.INTERNAL_SERVER_ERROR, throwable);
    }

    public static class CheckpointStatusHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AsynchronousOperationResult<CheckpointInfo>, CheckpointStatusMessageParameters> {
        public CheckpointStatusHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
            super(leaderRetriever, timeout, responseHeaders, CheckpointStatusHeaders.getInstance());
        }

        @Override
        public CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            AsynchronousJobOperationKey key = CheckpointStatusHandler.getOperationKey(request);
            return gateway.getTriggeredCheckpointStatus(key).handle((operationResult, throwable) -> {
                if (throwable == null) {
                    switch (operationResult.getStatus()) {
                        case SUCCESS: {
                            return AsynchronousOperationResult.completed(CheckpointStatusHandler.operationResultResponse((Long)operationResult.getResult()));
                        }
                        case FAILURE: {
                            return AsynchronousOperationResult.completed(CheckpointStatusHandler.exceptionalOperationResultResponse(operationResult.getThrowable()));
                        }
                        case IN_PROGRESS: {
                            return AsynchronousOperationResult.inProgress();
                        }
                    }
                    throw new IllegalStateException("No handler for operation status " + (Object)((Object)operationResult.getStatus()) + ", encountered for key " + key);
                }
                throw new CompletionException(CheckpointStatusHandler.maybeCreateNotFoundError(throwable, key).orElseGet(() -> CheckpointHandlers.createInternalServerError(throwable, key, "retrieving status of")));
            });
        }

        private static Optional<RestHandlerException> maybeCreateNotFoundError(Throwable throwable, AsynchronousJobOperationKey key) {
            if (ExceptionUtils.findThrowable(throwable, UnknownOperationKeyException.class).isPresent()) {
                return Optional.of(new RestHandlerException(String.format("There is no checkpoint operation with triggerId=%s for job %s.", key.getTriggerId(), key.getJobId()), HttpResponseStatus.NOT_FOUND));
            }
            return Optional.empty();
        }

        private static AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody> request) {
            TriggerId triggerId = (TriggerId)request.getPathParameter(TriggerIdPathParameter.class);
            JobID jobId = (JobID)request.getPathParameter(JobIDPathParameter.class);
            return AsynchronousJobOperationKey.of(triggerId, jobId);
        }

        private static CheckpointInfo exceptionalOperationResultResponse(Throwable throwable) {
            return new CheckpointInfo(null, new SerializedThrowable(throwable));
        }

        private static CheckpointInfo operationResultResponse(Long checkpointId) {
            return new CheckpointInfo(checkpointId, null);
        }
    }

    public static class CheckpointTriggerHandler
    extends AbstractRestHandler<RestfulGateway, CheckpointTriggerRequestBody, TriggerResponse, CheckpointTriggerMessageParameters> {
        public CheckpointTriggerHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
            super(leaderRetriever, timeout, responseHeaders, CheckpointTriggerHeaders.getInstance());
        }

        private static AsynchronousJobOperationKey createOperationKey(HandlerRequest<CheckpointTriggerRequestBody> request) {
            JobID jobId = (JobID)request.getPathParameter(JobIDPathParameter.class);
            return AsynchronousJobOperationKey.of(request.getRequestBody().getTriggerId().orElseGet(TriggerId::new), jobId);
        }

        @Override
        protected CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<CheckpointTriggerRequestBody> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            AsynchronousJobOperationKey operationKey = CheckpointTriggerHandler.createOperationKey(request);
            return gateway.triggerCheckpoint(operationKey, request.getRequestBody().getCheckpointType(), RpcUtils.INF_TIMEOUT).handle((acknowledge, throwable) -> {
                if (throwable == null) {
                    return new TriggerResponse(operationKey.getTriggerId());
                }
                throw new CompletionException(CheckpointHandlers.createInternalServerError(throwable, operationKey, "triggering"));
            });
        }
    }
}

