package org.apache.flink.runtime.jobmanager;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.class */
public class DefaultJobGraphStoreTest extends TestLogger {
    private final JobGraph testingJobGraph = JobGraphTestUtils.emptyJobGraph();
    private final long timeout = 100;
    private TestingStateHandleStore.Builder<JobGraph> builder;
    private TestingRetrievableStateStorageHelper<JobGraph> jobGraphStorageHelper;
    private TestingJobGraphStoreWatcher testingJobGraphStoreWatcher;
    private TestingJobGraphListener testingJobGraphListener;

    @Before
    public void setup() {
        this.builder = TestingStateHandleStore.builder();
        this.testingJobGraphStoreWatcher = new TestingJobGraphStoreWatcher();
        this.testingJobGraphListener = new TestingJobGraphListener();
        this.jobGraphStorageHelper = new TestingRetrievableStateStorageHelper<>();
    }

    @After
    public void teardown() {
        if (this.testingJobGraphStoreWatcher != null) {
            this.testingJobGraphStoreWatcher.stop();
        }
    }

    @Test
    public void testRecoverJobGraph() throws Exception {
        RetrievableStateHandle<JobGraph> store = this.jobGraphStorageHelper.store(this.testingJobGraph);
        JobGraph recoverJobGraph = createAndStartJobGraphStore(this.builder.setGetFunction(str -> {
            return store;
        }).build()).recoverJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat(recoverJobGraph, Matchers.is(Matchers.notNullValue()));
        Assert.assertThat(recoverJobGraph.getJobID(), Matchers.is(this.testingJobGraph.getJobID()));
    }

    @Test
    public void testRecoverJobGraphWhenNotExist() throws Exception {
        Assert.assertThat(createAndStartJobGraphStore(this.builder.setGetFunction(str -> {
            throw new StateHandleStore.NotExistException("Not exist exception.");
        }).build()).recoverJobGraph(this.testingJobGraph.getJobID()), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void testRecoverJobGraphFailedShouldReleaseHandle() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        FlinkException flinkException = new FlinkException("Test exception.");
        TestingStateHandleStore.Builder<JobGraph> getFunction = this.builder.setGetFunction(str -> {
            throw flinkException;
        });
        completableFuture.getClass();
        try {
            createAndStartJobGraphStore(getFunction.setReleaseConsumer((v1) -> {
                r1.complete(v1);
            }).build()).recoverJobGraph(this.testingJobGraph.getJobID());
            Assert.fail("recoverJobGraph should fail when there is exception in getting the state handle.");
        } catch (Exception e) {
            Assert.assertThat(e, FlinkMatchers.containsCause(flinkException));
            Assert.assertThat((String) completableFuture.get(100L, TimeUnit.MILLISECONDS), Matchers.is(this.testingJobGraph.getJobID().toString()));
        }
    }

    @Test
    public void testPutJobGraphWhenNotExist() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createAndStartJobGraphStore(this.builder.setExistsFunction(str -> {
            return IntegerResourceVersion.notExisting();
        }).setAddFunction((str2, jobGraph) -> {
            completableFuture.complete(jobGraph);
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build()).putJobGraph(this.testingJobGraph);
        Assert.assertThat(((JobGraph) completableFuture.get(100L, TimeUnit.MILLISECONDS)).getJobID(), Matchers.is(this.testingJobGraph.getJobID()));
    }

    @Test
    public void testPutJobGraphWhenAlreadyExist() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingStateHandleStore.Builder<JobGraph> addFunction = this.builder.setExistsFunction(str -> {
            if (atomicBoolean.get()) {
                return IntegerResourceVersion.valueOf(100);
            }
            atomicBoolean.set(true);
            return IntegerResourceVersion.notExisting();
        }).setAddFunction((str2, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        });
        completableFuture.getClass();
        JobGraphStore createAndStartJobGraphStore = createAndStartJobGraphStore(addFunction.setReplaceConsumer((v1) -> {
            r1.complete(v1);
        }).build());
        createAndStartJobGraphStore.putJobGraph(this.testingJobGraph);
        createAndStartJobGraphStore.putJobGraph(this.testingJobGraph);
        Tuple3 tuple3 = (Tuple3) completableFuture.get(100L, TimeUnit.MILLISECONDS);
        Assert.assertThat(tuple3.f0, Matchers.is(this.testingJobGraph.getJobID().toString()));
        Assert.assertThat(tuple3.f1, Matchers.is(IntegerResourceVersion.valueOf(100)));
        Assert.assertThat(((JobGraph) tuple3.f2).getJobID(), Matchers.is(this.testingJobGraph.getJobID()));
    }

    @Test
    public void testRemoveJobGraph() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobGraphStore createAndStartJobGraphStore = createAndStartJobGraphStore(this.builder.setAddFunction((str, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).setRemoveFunction(str2 -> {
            return Boolean.valueOf(completableFuture.complete(JobID.fromHexString(str2)));
        }).build());
        createAndStartJobGraphStore.putJobGraph(this.testingJobGraph);
        createAndStartJobGraphStore.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((JobID) completableFuture.get(100L, TimeUnit.MILLISECONDS), Matchers.is(this.testingJobGraph.getJobID()));
    }

    @Test
    public void testRemoveJobGraphWithNonExistName() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createAndStartJobGraphStore(this.builder.setRemoveFunction(str -> {
            return Boolean.valueOf(completableFuture.complete(JobID.fromHexString(str)));
        }).build()).removeJobGraph(this.testingJobGraph.getJobID());
        try {
            completableFuture.get(100L, TimeUnit.MILLISECONDS);
            Assert.fail("We should get an expected timeout because we are removing a non-existed job graph.");
        } catch (TimeoutException e) {
        }
        Assert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
    }

    @Test
    public void testGetJobIds() throws Exception {
        List asList = Arrays.asList(new JobID(0L, 0L), new JobID(0L, 1L));
        Assert.assertThat(createAndStartJobGraphStore(this.builder.setGetAllHandlesSupplier(() -> {
            return (List) asList.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList());
        }).build()).getJobIds(), Matchers.contains(asList.toArray()));
    }

    @Test
    public void testOnAddedJobGraphShouldNotProcessKnownJobGraphs() throws Exception {
        createAndStartJobGraphStore(this.builder.setAddFunction((str, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build()).putJobGraph(this.testingJobGraph);
        this.testingJobGraphStoreWatcher.addJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat(Integer.valueOf(this.testingJobGraphListener.getAddedJobGraphs().size()), Matchers.is(0));
    }

    @Test
    public void testOnAddedJobGraphShouldOnlyProcessUnknownJobGraphs() throws Exception {
        RetrievableStateHandle<JobGraph> store = this.jobGraphStorageHelper.store(this.testingJobGraph);
        createAndStartJobGraphStore(this.builder.setGetFunction(str -> {
            return store;
        }).setAddFunction((str2, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build()).recoverJobGraph(this.testingJobGraph.getJobID());
        this.testingJobGraphStoreWatcher.addJobGraph(this.testingJobGraph.getJobID());
        JobID generate = JobID.generate();
        this.testingJobGraphStoreWatcher.addJobGraph(generate);
        Assert.assertThat(Integer.valueOf(this.testingJobGraphListener.getAddedJobGraphs().size()), Matchers.is(1));
        Assert.assertThat(this.testingJobGraphListener.getAddedJobGraphs(), Matchers.contains(new JobID[]{generate}));
    }

    @Test
    public void testOnRemovedJobGraphShouldOnlyProcessKnownJobGraphs() throws Exception {
        createAndStartJobGraphStore(this.builder.setAddFunction((str, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build()).putJobGraph(this.testingJobGraph);
        this.testingJobGraphStoreWatcher.removeJobGraph(JobID.generate());
        this.testingJobGraphStoreWatcher.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat(Integer.valueOf(this.testingJobGraphListener.getRemovedJobGraphs().size()), Matchers.is(1));
        Assert.assertThat(this.testingJobGraphListener.getRemovedJobGraphs(), Matchers.contains(new JobID[]{this.testingJobGraph.getJobID()}));
    }

    @Test
    public void testOnRemovedJobGraphShouldNotProcessUnknownJobGraphs() throws Exception {
        createAndStartJobGraphStore(this.builder.setAddFunction((str, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build());
        this.testingJobGraphStoreWatcher.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat(Integer.valueOf(this.testingJobGraphListener.getRemovedJobGraphs().size()), Matchers.is(0));
    }

    @Test
    public void testOnAddedJobGraphIsIgnoredAfterBeingStop() throws Exception {
        createAndStartJobGraphStore(this.builder.setAddFunction((str, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build()).stop();
        this.testingJobGraphStoreWatcher.addJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat(Integer.valueOf(this.testingJobGraphListener.getAddedJobGraphs().size()), Matchers.is(0));
    }

    @Test
    public void testOnRemovedJobGraphIsIgnoredAfterBeingStop() throws Exception {
        JobGraphStore createAndStartJobGraphStore = createAndStartJobGraphStore(this.builder.setAddFunction((str, jobGraph) -> {
            return this.jobGraphStorageHelper.store(jobGraph);
        }).build());
        createAndStartJobGraphStore.putJobGraph(this.testingJobGraph);
        createAndStartJobGraphStore.stop();
        this.testingJobGraphStoreWatcher.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat(Integer.valueOf(this.testingJobGraphListener.getRemovedJobGraphs().size()), Matchers.is(0));
    }

    @Test
    public void testStoppingJobGraphStoreShouldReleaseAllHandles() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createAndStartJobGraphStore(this.builder.setReleaseAllHandlesRunnable(() -> {
            completableFuture.complete(null);
        }).build()).stop();
        Assert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(true));
    }

    @Test
    public void testReleasingJobGraphShouldReleaseHandle() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingStateHandleStore.Builder<JobGraph> builder = this.builder;
        completableFuture.getClass();
        JobGraphStore createAndStartJobGraphStore = createAndStartJobGraphStore(builder.setReleaseConsumer((v1) -> {
            r1.complete(v1);
        }).build());
        createAndStartJobGraphStore.putJobGraph(this.testingJobGraph);
        createAndStartJobGraphStore.releaseJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((String) completableFuture.get(), Matchers.is(this.testingJobGraph.getJobID().toString()));
    }

    private JobGraphStore createAndStartJobGraphStore(TestingStateHandleStore<JobGraph> testingStateHandleStore) throws Exception {
        DefaultJobGraphStore defaultJobGraphStore = new DefaultJobGraphStore(testingStateHandleStore, this.testingJobGraphStoreWatcher, new JobGraphStoreUtil() { // from class: org.apache.flink.runtime.jobmanager.DefaultJobGraphStoreTest.1
            public String jobIDToName(JobID jobID) {
                return jobID.toString();
            }

            public JobID nameToJobID(String str) {
                return JobID.fromHexString(str);
            }
        });
        defaultJobGraphStore.start(this.testingJobGraphListener);
        return defaultJobGraphStore;
    }
}
