blob: 992d2c47ab4a58fe561bf6b87a5881027883b817 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import static org.junit.Assert.assertTrue;
/** Integration tests for the adaptive scheduler. */
public class AdaptiveSchedulerSimpleITCase extends TestLogger {
private static final int NUMBER_TASK_MANAGERS = 2;
private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
private static final int PARALLELISM = 10;
private static final Configuration configuration = getConfiguration();
private static Configuration getConfiguration() {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
configuration.set(
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L));
return configuration;
}
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
.build());
@Test
public void testSchedulingOfSimpleJob() throws Exception {
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
final JobGraph jobGraph = createJobGraph();
miniCluster.submitJob(jobGraph).join();
final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
final JobExecutionResult jobExecutionResult =
jobResult.toJobExecutionResult(getClass().getClassLoader());
assertTrue(jobResult.isSuccess());
}
private JobGraph createJobGraph() {
final JobVertex source = new JobVertex("Source");
source.setInvokableClass(NoOpInvokable.class);
source.setParallelism(PARALLELISM);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(NoOpInvokable.class);
sink.setParallelism(PARALLELISM);
sink.connectNewDataSetAsInput(
source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
return JobGraphTestUtils.streamingJobGraph(source, sink);
}
@Test
public void testJobCancellationWhileRestartingSucceeds() throws Exception {
final long timeInRestartingState = 10000L;
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
final JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
alwaysFailingOperator.setParallelism(1);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
ExecutionConfig executionConfig = new ExecutionConfig();
// configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
executionConfig.setRestartStrategy(
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
jobGraph.setExecutionConfig(executionConfig);
miniCluster.submitJob(jobGraph).join();
// wait until we are in RESTARTING state
CommonTestUtils.waitUntilCondition(
() -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING,
Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
5);
// now cancel while in RESTARTING state
miniCluster.cancelJob(jobGraph.getJobID()).get();
}
@Test
public void testGlobalFailoverIfTaskFails() throws Throwable {
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
final JobGraph jobGraph = createOnceFailingJobGraph();
miniCluster.submitJob(jobGraph).join();
final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
if (!jobResult.isSuccess()) {
throw jobResult
.getSerializedThrowable()
.get()
.deserializeError(ClassLoader.getSystemClassLoader());
}
}
private JobGraph createOnceFailingJobGraph() throws IOException {
final JobVertex onceFailingOperator = new JobVertex("Once failing operator");
OnceFailingInvokable.reset();
onceFailingOperator.setInvokableClass(OnceFailingInvokable.class);
onceFailingOperator.setParallelism(1);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(onceFailingOperator);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
jobGraph.setExecutionConfig(executionConfig);
return jobGraph;
}
/** Once failing {@link AbstractInvokable}. */
public static final class OnceFailingInvokable extends AbstractInvokable {
private static volatile boolean hasFailed = false;
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public OnceFailingInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
if (!hasFailed && getIndexInSubtaskGroup() == 0) {
hasFailed = true;
throw new FlinkRuntimeException("Test failure.");
}
}
private static void reset() {
hasFailed = false;
}
}
/** Always failing {@link AbstractInvokable}. */
public static final class AlwaysFailingInvokable extends AbstractInvokable {
public AlwaysFailingInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
throw new FlinkRuntimeException("Test failure.");
}
}
}