package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSlotSharingITCase.class */
public class AdaptiveSchedulerSlotSharingITCase extends TestLogger {
    private static final int NUMBER_TASK_MANAGERS = 1;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 1;
    private static final int PARALLELISM = 1;

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
        configuration.set(TaskManagerOptions.SLOT_TIMEOUT, Duration.ofSeconds(5L));
        return configuration;
    }

    @Test
    public void testSchedulingOfJobRequiringSlotSharing() throws Exception {
        runJob();
        runJob();
    }

    private void runJob() throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobGraph createJobGraphWithSlotSharingGroup = createJobGraphWithSlotSharingGroup();
        miniCluster.submitJob(createJobGraphWithSlotSharingGroup).join();
        JobResult jobResult = (JobResult) miniCluster.requestJobResult(createJobGraphWithSlotSharingGroup.getJobID()).join();
        jobResult.toJobExecutionResult(getClass().getClassLoader());
        Assert.assertTrue(jobResult.isSuccess());
    }

    private static JobGraph createJobGraphWithSlotSharingGroup() {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("Source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(1);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
    }
}
