package org.apache.flink.runtime.source.coordinator;

import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.class */
public class SourceCoordinatorTest extends SourceCoordinatorTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest$ClassLoaderTestEnumerator.class */
    private static final class ClassLoaderTestEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<ClassLoader> threadClassLoader = new CompletableFuture<>();
        final ClassLoader constructorClassLoader = Thread.currentThread().getContextClassLoader();

        public void start() {
            this.threadClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        public void handleSplitRequest(int i, @Nullable String str) {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List<MockSourceSplit> list, int i) {
            throw new UnsupportedOperationException();
        }

        public void addReader(int i) {
            throw new UnsupportedOperationException();
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Set<MockSourceSplit> m463snapshotState(long j) throws Exception {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest$EnumeratorCreatingSource.class */
    private static final class EnumeratorCreatingSource<T, EnumT extends SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>> implements Source<T, MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<EnumT> createEnumeratorFuture = new CompletableFuture<>();
        final CompletableFuture<EnumT> restoreEnumeratorFuture = new CompletableFuture<>();
        private final Supplier<EnumT> enumeratorFactory;

        public EnumeratorCreatingSource(Supplier<EnumT> supplier) {
            this.enumeratorFactory = supplier;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<T, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
            throw new UnsupportedOperationException();
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext) {
            EnumT enumt = this.enumeratorFactory.get();
            this.createEnumeratorFuture.complete(enumt);
            return enumt;
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext, Set<MockSourceSplit> set) {
            EnumT enumt = this.enumeratorFactory.get();
            this.restoreEnumeratorFuture.complete(enumt);
            return enumt;
        }

        public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
            return new MockSourceSplitSerializer();
        }

        public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new MockSplitEnumeratorCheckpointSerializer();
        }

        public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
            return restoreEnumerator((SplitEnumeratorContext<MockSourceSplit>) splitEnumeratorContext, (Set<MockSourceSplit>) obj);
        }
    }

    @Test
    public void testThrowExceptionWhenNotStarted() {
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.notifyCheckpointComplete(100L);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent) null);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
    }

    @Test
    public void testRestCheckpointAfterCoordinatorStarted() throws Exception {
        this.sourceCoordinator.start();
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.resetToCheckpoint(0L, (byte[]) null);
        }, "Reset to checkpoint should fail after the coordinator has started", "The coordinator can only be reset if it was not yet started");
    }

    @Test
    public void testStart() throws Exception {
        this.sourceCoordinator.start();
        waitForCoordinatorToProcessActions();
        Assert.assertTrue(getEnumerator().isStarted());
    }

    @Test
    public void testClosed() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.close();
        Assert.assertTrue(getEnumerator().isClosed());
    }

    @Test
    public void testHandleSourceEvent() throws Exception {
        sourceReady();
        SourceEvent sourceEvent = new SourceEvent() { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.1
        };
        this.sourceCoordinator.handleEventFromOperator(0, new SourceEventWrapper(sourceEvent));
        waitForCoordinatorToProcessActions();
        Assert.assertEquals(1L, getEnumerator().getHandledSourceEvent().size());
        Assert.assertEquals(sourceEvent, getEnumerator().getHandledSourceEvent().get(0));
    }

    @Test
    public void testCheckpointCoordinatorAndRestore() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        registerReader(0);
        getEnumerator().executeAssignOneSplit(0);
        getEnumerator().executeAssignOneSplit(0);
        CompletableFuture completableFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, completableFuture);
        byte[] bArr = (byte[]) completableFuture.get();
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> newSourceCoordinator = getNewSourceCoordinator();
        newSourceCoordinator.resetToCheckpoint(100L, bArr);
        TestingSplitEnumerator testingSplitEnumerator = (TestingSplitEnumerator) newSourceCoordinator.getEnumerator();
        SourceCoordinatorContext context = newSourceCoordinator.getContext();
        Assert.assertEquals("2 splits should have been assigned to reader 0", 4L, testingSplitEnumerator.getUnassignedSplits().size());
        Assert.assertTrue(testingSplitEnumerator.getContext().registeredReaders().isEmpty());
        Assert.assertEquals("Registered readers should not be recovered by restoring", 0L, context.registeredReaders().size());
    }

    @Test
    public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        registerReader(0);
        getEnumerator().executeAssignOneSplit(0);
        getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        getEnumerator().addNewSplits(new MockSourceSplit(6));
        getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(101L, new CompletableFuture());
        waitForCoordinatorToProcessActions();
        Assert.assertEquals(4L, getEnumerator().getUnassignedSplits().size());
        Assert.assertTrue(this.splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "1"), (Collection) ((Map) this.splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L)).get(0));
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("2"), (Collection) this.splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
        this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        waitForCoordinatorToProcessActions();
        Assert.assertFalse("Reader 0 should have been unregistered.", this.context.registeredReaders().containsKey(0));
        Iterator it = this.splitSplitAssignmentTracker.assignmentsByCheckpointId().values().iterator();
        while (it.hasNext()) {
            Assert.assertFalse("Assignment in uncompleted checkpoint should have been reverted.", ((Map) it.next()).containsKey(0));
        }
        Assert.assertFalse(this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
        Assert.assertEquals(7L, getEnumerator().getUnassignedSplits().size());
    }

    @Test
    public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        registerReader(0);
        getEnumerator().executeAssignOneSplit(0);
        getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.sourceCoordinator.notifyCheckpointComplete(100L);
        this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        waitForCoordinatorToProcessActions();
        Assert.assertEquals(100L, getEnumerator().getSuccessfulCheckpoints().get(0).longValue());
        Assert.assertFalse(this.context.registeredReaders().containsKey(0));
        Assert.assertEquals(4L, getEnumerator().getUnassignedSplits().size());
        Assert.assertFalse(this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
        Assert.assertTrue(this.splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
    }

    @Test
    public void testFailJobWhenExceptionThrownFromStart() throws Exception {
        final RuntimeException runtimeException = new RuntimeException("Artificial Exception");
        MockSplitEnumerator mockSplitEnumerator = new MockSplitEnumerator(1, new MockSplitEnumeratorContext(1)) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.2
            public void start() {
                throw runtimeException;
            }
        };
        new SourceCoordinator("TestOperator", this.coordinatorExecutor, new EnumeratorCreatingSource(() -> {
            return mockSplitEnumerator;
        }), this.context).start();
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(this.operatorCoordinatorContext.isJobFailed());
        }, Duration.ofSeconds(10L), "The job should have failed due to the artificial exception.");
        Assert.assertEquals(runtimeException, this.operatorCoordinatorContext.getJobFailureReason());
    }

    @Test
    public void testErrorThrownFromSplitEnumerator() throws Exception {
        final Error error = new Error("Test Error");
        MockSplitEnumerator mockSplitEnumerator = new MockSplitEnumerator(1, new MockSplitEnumeratorContext(1)) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.3
            public void handleSourceEvent(int i, SourceEvent sourceEvent) {
                throw error;
            }
        };
        SourceCoordinator sourceCoordinator = new SourceCoordinator("TestOperator", this.coordinatorExecutor, new EnumeratorCreatingSource(() -> {
            return mockSplitEnumerator;
        }), this.context);
        sourceCoordinator.start();
        sourceCoordinator.handleEventFromOperator(1, new SourceEventWrapper(new SourceEvent() { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.4
        }));
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(this.operatorCoordinatorContext.isJobFailed());
        }, Duration.ofSeconds(10L), "The job should have failed due to the artificial exception.");
        Assert.assertEquals(error, this.operatorCoordinatorContext.getJobFailureReason());
    }

    @Test
    public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
        URLClassLoader uRLClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), uRLClassLoader);
        EnumeratorCreatingSource enumeratorCreatingSource = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        OperatorCoordinator coordinator = new SourceCoordinatorProvider("testOperator", mockOperatorCoordinatorContext.getOperatorId(), enumeratorCreatingSource, 1).getCoordinator(mockOperatorCoordinatorContext);
        coordinator.start();
        ClassLoaderTestEnumerator classLoaderTestEnumerator = (ClassLoaderTestEnumerator) enumeratorCreatingSource.createEnumeratorFuture.get();
        Assert.assertSame(uRLClassLoader, classLoaderTestEnumerator.constructorClassLoader);
        Assert.assertSame(uRLClassLoader, classLoaderTestEnumerator.threadClassLoader.get());
        coordinator.close();
    }

    @Test
    public void testUserClassLoaderWhenRestoringEnumerator() throws Exception {
        URLClassLoader uRLClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), uRLClassLoader);
        EnumeratorCreatingSource enumeratorCreatingSource = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        OperatorCoordinator coordinator = new SourceCoordinatorProvider("testOperator", mockOperatorCoordinatorContext.getOperatorId(), enumeratorCreatingSource, 1).getCoordinator(mockOperatorCoordinatorContext);
        coordinator.resetToCheckpoint(1L, createEmptyCheckpoint());
        coordinator.start();
        ClassLoaderTestEnumerator classLoaderTestEnumerator = (ClassLoaderTestEnumerator) enumeratorCreatingSource.restoreEnumeratorFuture.get();
        Assert.assertSame(uRLClassLoader, classLoaderTestEnumerator.constructorClassLoader);
        Assert.assertSame(uRLClassLoader, classLoaderTestEnumerator.threadClassLoader.get());
        coordinator.close();
    }

    @Test
    public void testSerdeBackwardCompatibility() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        TestingSplitEnumerator<MockSourceSplit> enumerator = getEnumerator();
        HashSet hashSet = new HashSet();
        enumerator.runInEnumThreadAndSync(() -> {
            hashSet.addAll(enumerator.m465snapshotState(1L));
        });
        byte[] createCheckpointDataWithSerdeV0 = createCheckpointDataWithSerdeV0(hashSet);
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> newSourceCoordinator = getNewSourceCoordinator();
        newSourceCoordinator.resetToCheckpoint(15213L, createCheckpointDataWithSerdeV0);
        TestingSplitEnumerator testingSplitEnumerator = (TestingSplitEnumerator) newSourceCoordinator.getEnumerator();
        SourceCoordinatorContext context = newSourceCoordinator.getContext();
        Assert.assertEquals(hashSet, testingSplitEnumerator.getUnassignedSplits());
        Assert.assertTrue(testingSplitEnumerator.getHandledSourceEvent().isEmpty());
        Assert.assertEquals(0L, context.registeredReaders().size());
    }

    private byte[] createCheckpointDataWithSerdeV0(Set<MockSourceSplit> set) throws Exception {
        MockSplitEnumeratorCheckpointSerializer mockSplitEnumeratorCheckpointSerializer = new MockSplitEnumeratorCheckpointSerializer();
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(32);
        dataOutputSerializer.writeInt(0);
        dataOutputSerializer.writeInt(mockSplitEnumeratorCheckpointSerializer.getVersion());
        byte[] serialize = mockSplitEnumeratorCheckpointSerializer.serialize(set);
        dataOutputSerializer.writeInt(serialize.length);
        dataOutputSerializer.write(serialize);
        dataOutputSerializer.writeInt(0);
        dataOutputSerializer.writeInt(0);
        dataOutputSerializer.writeInt(0);
        return dataOutputSerializer.getCopyOfBuffer();
    }

    private void check(Runnable runnable) {
        try {
            this.coordinatorExecutor.submit(runnable).get();
        } catch (Exception e) {
            Assert.fail("Test failed due to " + e);
        }
    }

    private static byte[] createEmptyCheckpoint() throws Exception {
        return SourceCoordinator.writeCheckpointBytes(Collections.emptySet(), new MockSplitEnumeratorCheckpointSerializer());
    }
}
