package org.apache.flink.runtime.operators.coordination;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.class */
public class RecreateOnResetOperatorCoordinatorTest {
    private static final OperatorID OPERATOR_ID = new OperatorID(1234, 5678);
    private static final int NUM_SUBTASKS = 1;

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest$TestingCoordinatorProvider.class */
    private static class TestingCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {
        private static final long serialVersionUID = 4184184580789587013L;
        private final CountDownLatch blockOnCloseLatch;
        private final List<TestingOperatorCoordinator> createdCoordinators;

        public TestingCoordinatorProvider() {
            this(null);
        }

        public TestingCoordinatorProvider(CountDownLatch countDownLatch) {
            super(RecreateOnResetOperatorCoordinatorTest.OPERATOR_ID);
            this.blockOnCloseLatch = countDownLatch;
            this.createdCoordinators = new ArrayList();
        }

        protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
            TestingOperatorCoordinator testingOperatorCoordinator = new TestingOperatorCoordinator(context, this.blockOnCloseLatch);
            this.createdCoordinators.add(testingOperatorCoordinator);
            return testingOperatorCoordinator;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<TestingOperatorCoordinator> getCreatedCoordinators() {
            return this.createdCoordinators;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest$TestingEvent.class */
    private static class TestingEvent implements OperatorEvent {
        private static final long serialVersionUID = -3289352911927668275L;
        private final int id;

        private TestingEvent() {
            this(-1);
        }

        private TestingEvent(int i) {
            this.id = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getId() {
            return this.id;
        }
    }

    @Test
    public void testQuiesceableContextNotQuiesced() throws TaskNotRunningException {
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext(mockOperatorCoordinatorContext);
        TestingEvent testingEvent = new TestingEvent();
        quiesceableContext.sendEvent(testingEvent, 0);
        quiesceableContext.failJob(new Exception());
        Assert.assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
        Assert.assertEquals(1L, quiesceableContext.currentParallelism());
        Assert.assertEquals(Collections.singletonList(testingEvent), mockOperatorCoordinatorContext.getEventsToOperatorBySubtaskId(0));
        Assert.assertTrue(mockOperatorCoordinatorContext.isJobFailed());
    }

    @Test
    public void testQuiescedContext() throws TaskNotRunningException {
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext(mockOperatorCoordinatorContext);
        quiesceableContext.quiesce();
        quiesceableContext.sendEvent(new TestingEvent(), 0);
        quiesceableContext.failJob(new Exception());
        Assert.assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId());
        Assert.assertEquals(1L, quiesceableContext.currentParallelism());
        Assert.assertTrue(mockOperatorCoordinatorContext.getEventsToOperator().isEmpty());
        Assert.assertFalse(mockOperatorCoordinatorContext.isJobFailed());
    }

    @Test
    public void testResetToCheckpoint() throws Exception {
        RecreateOnResetOperatorCoordinator createCoordinator = createCoordinator(new TestingCoordinatorProvider(null), new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS));
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = createCoordinator.getQuiesceableContext();
        TestingOperatorCoordinator internalCoordinator = getInternalCoordinator(createCoordinator);
        byte[] bArr = new byte[0];
        createCoordinator.resetToCheckpoint(1L, bArr);
        createCoordinator.waitForAllAsyncCallsFinish();
        Assert.assertTrue(quiesceableContext.isQuiesced());
        Assert.assertNull(internalCoordinator.getLastRestoredCheckpointState());
        Assert.assertEquals(bArr, getInternalCoordinator(createCoordinator).getLastRestoredCheckpointState());
    }

    @Test
    public void testResetToCheckpointTimeout() throws Exception {
        TestingCoordinatorProvider testingCoordinatorProvider = new TestingCoordinatorProvider(new CountDownLatch(NUM_SUBTASKS));
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
        ((RecreateOnResetOperatorCoordinator) testingCoordinatorProvider.create(mockOperatorCoordinatorContext, 1L)).resetToCheckpoint(2L, new byte[0]);
        mockOperatorCoordinatorContext.getClass();
        CommonTestUtils.waitUtil(mockOperatorCoordinatorContext::isJobFailed, Duration.ofSeconds(5L), "The job should fail due to resetToCheckpoint() timeout.");
    }

    @Test
    public void testMethodCallsOnLongResetToCheckpoint() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(NUM_SUBTASKS);
        TestingCoordinatorProvider testingCoordinatorProvider = new TestingCoordinatorProvider(countDownLatch);
        RecreateOnResetOperatorCoordinator recreateOnResetOperatorCoordinator = (RecreateOnResetOperatorCoordinator) testingCoordinatorProvider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS), Long.MAX_VALUE);
        byte[] bArr = new byte[0];
        TestingEvent testingEvent = new TestingEvent();
        recreateOnResetOperatorCoordinator.resetToCheckpoint(2L, bArr);
        recreateOnResetOperatorCoordinator.handleEventFromOperator(NUM_SUBTASKS, testingEvent);
        recreateOnResetOperatorCoordinator.subtaskFailed(NUM_SUBTASKS, new Exception("Subtask Failure Exception."));
        recreateOnResetOperatorCoordinator.notifyCheckpointComplete(1234L);
        Assert.assertEquals(1L, testingCoordinatorProvider.getCreatedCoordinators().size());
        countDownLatch.countDown();
        CompletableFuture completableFuture = new CompletableFuture();
        recreateOnResetOperatorCoordinator.checkpointCoordinator(5678L, completableFuture);
        recreateOnResetOperatorCoordinator.waitForAllAsyncCallsFinish();
        TestingOperatorCoordinator internalCoordinator = getInternalCoordinator(recreateOnResetOperatorCoordinator);
        Assert.assertEquals(completableFuture, internalCoordinator.getLastTriggeredCheckpoint());
        Assert.assertEquals(testingCoordinatorProvider.getCreatedCoordinators().get(NUM_SUBTASKS), internalCoordinator);
        Assert.assertEquals(bArr, internalCoordinator.getLastRestoredCheckpointState());
        Assert.assertEquals(testingEvent, internalCoordinator.getNextReceivedOperatorEvent());
        Assert.assertEquals(Collections.singletonList(Integer.valueOf(NUM_SUBTASKS)), internalCoordinator.getFailedTasks());
        Assert.assertEquals(1234L, internalCoordinator.getLastCheckpointComplete());
    }

    @Test(timeout = 30000)
    public void testConsecutiveResetToCheckpoint() throws Exception {
        TestingCoordinatorProvider testingCoordinatorProvider = new TestingCoordinatorProvider();
        RecreateOnResetOperatorCoordinator recreateOnResetOperatorCoordinator = (RecreateOnResetOperatorCoordinator) testingCoordinatorProvider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS), Long.MAX_VALUE);
        for (int i = 0; i < 1000; i += NUM_SUBTASKS) {
            recreateOnResetOperatorCoordinator.handleEventFromOperator(NUM_SUBTASKS, new TestingEvent(i));
            recreateOnResetOperatorCoordinator.subtaskFailed(i, new Exception());
            CompletableFuture completedFuture = CompletableFuture.completedFuture(new byte[i]);
            recreateOnResetOperatorCoordinator.checkpointCoordinator(i, completedFuture);
            int i2 = i;
            completedFuture.thenRun(() -> {
                recreateOnResetOperatorCoordinator.notifyCheckpointComplete(i2);
            });
            recreateOnResetOperatorCoordinator.resetToCheckpoint(i, new byte[i + NUM_SUBTASKS]);
        }
        recreateOnResetOperatorCoordinator.waitForAllAsyncCallsFinish();
        for (TestingOperatorCoordinator testingOperatorCoordinator : testingCoordinatorProvider.getCreatedCoordinators()) {
            byte[] lastRestoredCheckpointState = testingOperatorCoordinator.getLastRestoredCheckpointState();
            int length = lastRestoredCheckpointState != null ? lastRestoredCheckpointState.length : 0;
            TestingEvent testingEvent = (TestingEvent) testingOperatorCoordinator.getNextReceivedOperatorEvent();
            List<Integer> failedTasks = testingOperatorCoordinator.getFailedTasks();
            Assert.assertTrue(testingEvent == null || testingEvent.getId() == length);
            Assert.assertTrue(failedTasks.isEmpty() || (failedTasks.size() == NUM_SUBTASKS && failedTasks.get(0).intValue() == length));
            Assert.assertTrue(!testingOperatorCoordinator.hasCompleteCheckpoint() || testingOperatorCoordinator.getLastCheckpointComplete() == ((long) length));
            Assert.assertTrue(!testingOperatorCoordinator.hasTriggeredCheckpoint() || testingOperatorCoordinator.getLastTriggeredCheckpoint().get().length == length);
        }
        recreateOnResetOperatorCoordinator.close();
        TestingOperatorCoordinator internalCoordinator = getInternalCoordinator(recreateOnResetOperatorCoordinator);
        internalCoordinator.getClass();
        CommonTestUtils.waitUtil(internalCoordinator::isClosed, Duration.ofSeconds(5L), "Timed out when waiting for the coordinator to close.");
    }

    public void testFailureInCreateCoordinator() {
    }

    private RecreateOnResetOperatorCoordinator createCoordinator(TestingCoordinatorProvider testingCoordinatorProvider, OperatorCoordinator.Context context) throws Exception {
        return testingCoordinatorProvider.create(context);
    }

    private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator recreateOnResetOperatorCoordinator) throws Exception {
        return (TestingOperatorCoordinator) recreateOnResetOperatorCoordinator.getInternalCoordinator();
    }
}
