package org.apache.flink.runtime.rpc.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.runtime.rpc.akka.SupervisorActor;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActorTest.class */
public class SupervisorActorTest extends TestLogger {

    @Rule
    public final ActorSystemResource actorSystemResource = ActorSystemResource.defaultConfiguration();

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActorTest$Fail.class */
    private static final class Fail {
        private final Throwable cause;

        private Fail(Throwable th) {
            this.cause = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Throwable getCause() {
            return this.cause;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Fail exceptionally(Throwable th) {
            return new Fail(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActorTest$SimpleActor.class */
    private static final class SimpleActor extends AbstractActor {
        private final CompletableFuture<Void> terminationFuture;

        private SimpleActor(CompletableFuture<Void> completableFuture) {
            this.terminationFuture = completableFuture;
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(Terminate.class, this::terminate).match(TerminateWithFutureCompletion.class, this::terminateActorWithFutureCompletion).match(Fail.class, this::fail).build();
        }

        private void fail(Fail fail) {
            throw new RuntimeException(fail.getCause());
        }

        private void terminate(Terminate terminate) {
            terminateActor();
        }

        private void terminateActor() {
            getContext().stop(getSelf());
        }

        private void terminateActorWithFutureCompletion(TerminateWithFutureCompletion terminateWithFutureCompletion) {
            Throwable terminationError = terminateWithFutureCompletion.getTerminationError();
            if (terminationError == null) {
                this.terminationFuture.complete(null);
            } else {
                this.terminationFuture.completeExceptionally(terminationError);
            }
            terminateActor();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActorTest$Terminate.class */
    private static final class Terminate {
        private static final Terminate INSTANCE = new Terminate();

        private Terminate() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActorTest$TerminateWithFutureCompletion.class */
    private static final class TerminateWithFutureCompletion {

        @Nullable
        private final Throwable terminationError;

        private TerminateWithFutureCompletion(@Nullable Throwable th) {
            this.terminationError = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        @Nullable
        public Throwable getTerminationError() {
            return this.terminationError;
        }

        private static TerminateWithFutureCompletion normal() {
            return new TerminateWithFutureCompletion(null);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static TerminateWithFutureCompletion exceptionally(Throwable th) {
            return new TerminateWithFutureCompletion(th);
        }

        static /* synthetic */ TerminateWithFutureCompletion access$000() {
            return normal();
        }
    }

    @Test
    public void completesTerminationFutureIfActorStops() {
        ActorSystem actorSystem = this.actorSystemResource.getActorSystem();
        SupervisorActor.ActorRegistration startAkkaRpcActor = startAkkaRpcActor(SupervisorActor.startSupervisorActor(actorSystem, actorSystem.getDispatcher()), "foobar");
        CompletableFuture terminationFuture = startAkkaRpcActor.getTerminationFuture();
        Assert.assertThat(Boolean.valueOf(terminationFuture.isDone()), CoreMatchers.is(false));
        startAkkaRpcActor.getActorRef().tell(TerminateWithFutureCompletion.access$000(), ActorRef.noSender());
        terminationFuture.join();
    }

    @Test
    public void completesTerminationFutureExceptionallyIfActorStopsExceptionally() throws Exception {
        ActorSystem actorSystem = this.actorSystemResource.getActorSystem();
        SupervisorActor.ActorRegistration startAkkaRpcActor = startAkkaRpcActor(SupervisorActor.startSupervisorActor(actorSystem, actorSystem.getDispatcher()), "foobar");
        CompletableFuture terminationFuture = startAkkaRpcActor.getTerminationFuture();
        Assert.assertThat(Boolean.valueOf(terminationFuture.isDone()), CoreMatchers.is(false));
        FlinkException flinkException = new FlinkException("Test cause.");
        startAkkaRpcActor.getActorRef().tell(TerminateWithFutureCompletion.exceptionally(flinkException), ActorRef.noSender());
        try {
            terminationFuture.get();
            Assert.fail("Expected the termination future being completed exceptionally");
        } catch (ExecutionException e) {
            ExceptionUtils.findThrowable(e, th -> {
                return th.equals(flinkException);
            }).orElseThrow(() -> {
                return new FlinkException("Unexpected exception", e);
            });
        }
    }

    @Test
    public void completesTerminationFutureExceptionallyIfActorStopsWithoutReason() throws InterruptedException {
        ActorSystem actorSystem = this.actorSystemResource.getActorSystem();
        SupervisorActor.ActorRegistration startAkkaRpcActor = startAkkaRpcActor(SupervisorActor.startSupervisorActor(actorSystem, actorSystem.getDispatcher()), "foobar");
        CompletableFuture terminationFuture = startAkkaRpcActor.getTerminationFuture();
        Assert.assertThat(Boolean.valueOf(terminationFuture.isDone()), CoreMatchers.is(false));
        startAkkaRpcActor.getActorRef().tell(Terminate.INSTANCE, ActorRef.noSender());
        try {
            terminationFuture.get();
            Assert.fail("Expected the termination future being completed exceptionally");
        } catch (ExecutionException e) {
        }
    }

    @Test
    public void completesTerminationFutureExceptionallyIfActorFails() throws Exception {
        ActorSystem actorSystem = this.actorSystemResource.getActorSystem();
        SupervisorActor.ActorRegistration startAkkaRpcActor = startAkkaRpcActor(SupervisorActor.startSupervisorActor(actorSystem, actorSystem.getDispatcher()), "foobar");
        CompletableFuture terminationFuture = startAkkaRpcActor.getTerminationFuture();
        Assert.assertThat(Boolean.valueOf(terminationFuture.isDone()), CoreMatchers.is(false));
        CompletableFuture completableFuture = actorSystem.getWhenTerminated().toCompletableFuture();
        FlinkException flinkException = new FlinkException("Test cause.");
        startAkkaRpcActor.getActorRef().tell(Fail.exceptionally(flinkException), ActorRef.noSender());
        try {
            terminationFuture.get();
            Assert.fail("Expected the termination future being completed exceptionally");
        } catch (ExecutionException e) {
            ExceptionUtils.findThrowable(e, th -> {
                return th.equals(flinkException);
            }).orElseThrow(() -> {
                return new FlinkException("Unexpected exception", e);
            });
        }
        completableFuture.join();
    }

    @Test
    public void completesTerminationFutureOfSiblingsIfActorFails() throws Exception {
        ActorSystem actorSystem = this.actorSystemResource.getActorSystem();
        ActorRef startSupervisorActor = SupervisorActor.startSupervisorActor(actorSystem, actorSystem.getDispatcher());
        SupervisorActor.ActorRegistration startAkkaRpcActor = startAkkaRpcActor(startSupervisorActor, "foobar1");
        CompletableFuture terminationFuture = startAkkaRpcActor(startSupervisorActor, "foobar2").getTerminationFuture();
        Assert.assertThat(Boolean.valueOf(terminationFuture.isDone()), CoreMatchers.is(false));
        startAkkaRpcActor.getActorRef().tell(Fail.exceptionally(new FlinkException("Test cause.")), ActorRef.noSender());
        try {
            terminationFuture.get();
            Assert.fail("Expected the termination future being completed exceptionally");
        } catch (ExecutionException e) {
        }
    }

    private SupervisorActor.ActorRegistration startAkkaRpcActor(ActorRef actorRef, String str) {
        return SupervisorActor.startAkkaRpcActor(actorRef, completableFuture -> {
            return Props.create(SimpleActor.class, new Object[]{completableFuture});
        }, str).orElseThrow(th -> {
            return new AssertionError("Expected the start to succeed.", th);
        });
    }
}
