package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.class */
public class SlotPoolPendingRequestFailureTest extends TestLogger {
    private TestingResourceManagerGateway resourceManagerGateway;

    @Before
    public void setup() {
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @Test
    public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest.getAllocationId());
        });
        TestingSlotPoolImpl createAndSetUpSlotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);
        Throwable th = null;
        try {
            CompletableFuture<PhysicalSlot> requestNewAllocatedSlot = SlotPoolUtils.requestNewAllocatedSlot(createAndSetUpSlotPool, new SlotRequestId());
            AllocationID allocationID = (AllocationID) completableFuture.get();
            MatcherAssert.assertThat(Boolean.valueOf(requestNewAllocatedSlot.isDone()), Matchers.is(false));
            FlinkException flinkException = new FlinkException("Fail pending slot request failure.");
            MatcherAssert.assertThat(Boolean.valueOf(createAndSetUpSlotPool.failAllocation(allocationID, flinkException).isPresent()), Matchers.is(false));
            try {
                requestNewAllocatedSlot.get();
                Assert.fail("Expected a slot allocation failure.");
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.equalTo(flinkException));
            }
            if (createAndSetUpSlotPool != null) {
                if (0 == 0) {
                    createAndSetUpSlotPool.close();
                    return;
                }
                try {
                    createAndSetUpSlotPool.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndSetUpSlotPool != null) {
                if (0 != 0) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
        ArrayList arrayList = new ArrayList();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            arrayList.add(slotRequest.getAllocationId());
        });
        TestingSlotPoolImpl createAndSetUpSlotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);
        Throwable th = null;
        try {
            try {
                CompletableFuture<PhysicalSlot> requestNewAllocatedSlot = SlotPoolUtils.requestNewAllocatedSlot(createAndSetUpSlotPool, new SlotRequestId());
                CompletableFuture<PhysicalSlot> requestNewAllocatedSlot2 = SlotPoolUtils.requestNewAllocatedSlot(createAndSetUpSlotPool, new SlotRequestId());
                AllocationID allocationID = (AllocationID) arrayList.get(0);
                AllocationID allocationID2 = (AllocationID) arrayList.get(1);
                LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
                SlotOffer slotOffer = new SlotOffer(allocationID2, 0, ResourceProfile.ANY);
                createAndSetUpSlotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
                createAndSetUpSlotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), slotOffer);
                MatcherAssert.assertThat(Boolean.valueOf(requestNewAllocatedSlot.isDone()), Matchers.is(true));
                MatcherAssert.assertThat(Boolean.valueOf(requestNewAllocatedSlot2.isDone()), Matchers.is(false));
                FlinkException flinkException = new FlinkException("Fail pending slot request failure.");
                MatcherAssert.assertThat(Boolean.valueOf(createAndSetUpSlotPool.failAllocation(allocationID, flinkException).isPresent()), Matchers.is(false));
                try {
                    requestNewAllocatedSlot2.getNow(null);
                    Assert.fail("Expected a slot allocation failure.");
                } catch (Throwable th2) {
                    MatcherAssert.assertThat(ExceptionUtils.stripCompletionException(th2), Matchers.equalTo(flinkException));
                }
                if (createAndSetUpSlotPool != null) {
                    if (0 == 0) {
                        createAndSetUpSlotPool.close();
                        return;
                    }
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createAndSetUpSlotPool != null) {
                if (th != null) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void testFailingResourceManagerRequestFailsPendingSlotRequestAndCancelsRMRequest() throws Exception {
        TestingSlotPoolImpl createAndSetUpSlotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);
        Throwable th = null;
        try {
            CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
            CompletableFuture completableFuture2 = new CompletableFuture();
            CompletableFuture completableFuture3 = new CompletableFuture();
            this.resourceManagerGateway.setRequestSlotFuture(completableFuture);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
                completableFuture3.complete(slotRequest.getAllocationId());
            });
            TestingResourceManagerGateway testingResourceManagerGateway = this.resourceManagerGateway;
            completableFuture2.getClass();
            testingResourceManagerGateway.setCancelSlotConsumer((v1) -> {
                r1.complete(v1);
            });
            CompletableFuture<PhysicalSlot> requestNewAllocatedSlot = SlotPoolUtils.requestNewAllocatedSlot(createAndSetUpSlotPool, new SlotRequestId());
            completableFuture.completeExceptionally(new FlinkException("Testing exception."));
            try {
                requestNewAllocatedSlot.get();
                Assert.fail("The slot future should not have been completed properly.");
            } catch (Exception e) {
            }
            Assert.assertEquals(completableFuture3.get(), completableFuture2.get());
            if (createAndSetUpSlotPool != null) {
                if (0 == 0) {
                    createAndSetUpSlotPool.close();
                    return;
                }
                try {
                    createAndSetUpSlotPool.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndSetUpSlotPool != null) {
                if (0 != 0) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPendingSlotRequestTimeout() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        ComponentMainThreadExecutor forSingleThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(newSingleThreadScheduledExecutor);
        TestingSlotPoolImpl build = new SlotPoolBuilder(forSingleThreadExecutor).setResourceManagerGateway(this.resourceManagerGateway).build();
        try {
            Time milliseconds = Time.milliseconds(5L);
            try {
                CompletableFuture.supplyAsync(() -> {
                    return SlotPoolUtils.requestNewAllocatedSlot(build, new SlotRequestId(), milliseconds);
                }, forSingleThreadExecutor).thenCompose(Function.identity()).get();
                Assert.fail("Expected that the future completes with a TimeoutException.");
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TimeoutException.class));
            }
        } finally {
            build.getClass();
            CompletableFuture.runAsync(ThrowingRunnable.unchecked(build::close), forSingleThreadExecutor).get();
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }
}
