blob: 959a5dfeb86bdf93d432bbdf206f0bb4a46e5546 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.scheduling;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
/** IT case for pipelined region scheduling. */
public class PipelinedRegionSchedulingITCase extends TestLogger {
@Test
public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() throws Exception {
final JobResult jobResult = executeSchedulingTest(2);
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
}
@Test
public void testFailsOnInsufficientSlots() throws Exception {
final JobResult jobResult = executeSchedulingTest(1);
assertThat(jobResult.getSerializedThrowable().isPresent(), is(true));
final Throwable jobFailure =
jobResult
.getSerializedThrowable()
.get()
.deserializeError(ClassLoader.getSystemClassLoader());
final Optional<NoResourceAvailableException> cause =
ExceptionUtils.findThrowable(jobFailure, NoResourceAvailableException.class);
assertThat(cause.isPresent(), is(true));
assertThat(
cause.get().getMessage(), containsString("Slot request bulk is not fulfillable!"));
}
@Test(timeout = 120000)
public void testRecoverFromPartitionException() throws Exception {
final Configuration configuration = new Configuration();
configuration.setString(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
OneTimeFailingReceiverWithPartitionException.hasFailed.set(false);
final JobResult jobResult =
executeSchedulingTest(createJobGraphWithThreeStages(2), 2, configuration);
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
}
private JobResult executeSchedulingTest(int numSlots) throws Exception {
return executeSchedulingTest(createJobGraph(2), numSlots, new Configuration());
}
private JobResult executeSchedulingTest(
JobGraph jobGraph, int numSlots, Configuration configuration) throws Exception {
configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
final MiniClusterConfiguration miniClusterConfiguration =
new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(numSlots)
.build();
try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
miniCluster.start();
final MiniClusterClient miniClusterClient =
new MiniClusterClient(configuration, miniCluster);
// wait for the submission to succeed
final JobID jobID = miniClusterClient.submitJob(jobGraph).get();
final CompletableFuture<JobResult> resultFuture =
miniClusterClient.requestJobResult(jobID);
final JobResult jobResult = resultFuture.get();
return jobResult;
}
}
private JobGraph createJobGraph(final int parallelism) {
final SlotSharingGroup group1 = new SlotSharingGroup();
final JobVertex source1 = new JobVertex("source1");
source1.setInvokableClass(PipelinedSender.class);
source1.setParallelism(parallelism * 2);
source1.setSlotSharingGroup(group1);
final SlotSharingGroup group2 = new SlotSharingGroup();
final JobVertex source2 = new JobVertex("source2");
source2.setInvokableClass(NoOpInvokable.class);
source2.setParallelism(parallelism);
source2.setSlotSharingGroup(group2);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(Receiver.class);
sink.setParallelism(parallelism);
sink.setSlotSharingGroup(group1);
sink.connectNewDataSetAsInput(
source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
sink.connectNewDataSetAsInput(
source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
return JobGraphTestUtils.batchJobGraph(source1, source2, sink);
}
private JobGraph createJobGraphWithThreeStages(final int parallelism) {
final SlotSharingGroup group1 = new SlotSharingGroup();
final JobVertex source = new JobVertex("source");
source.setInvokableClass(NoOpInvokable.class);
source.setParallelism(parallelism);
source.setSlotSharingGroup(group1);
final SlotSharingGroup group2 = new SlotSharingGroup();
final JobVertex map = new JobVertex("map");
map.setInvokableClass(NoOpInvokable.class);
map.setParallelism(parallelism);
map.setSlotSharingGroup(group2);
final SlotSharingGroup group3 = new SlotSharingGroup();
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(OneTimeFailingReceiverWithPartitionException.class);
sink.setParallelism(parallelism);
sink.setSlotSharingGroup(group3);
map.connectNewDataSetAsInput(
source, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
sink.connectNewDataSetAsInput(
map, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
return JobGraphTestUtils.batchJobGraph(source, map, sink);
}
/**
* This invokable is used by source1. It sends data to trigger the scheduling of the sink task.
* It will also wait for a bit time before finishing itself, so that the scheduled sink task can
* directly use its slot.
*/
public static class PipelinedSender extends AbstractInvokable {
public PipelinedSender(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
if (getEnvironment().getAllWriters().length < 1) {
throw new IllegalStateException();
}
final RecordWriter<IntValue> writer =
new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
try {
writer.emit(new IntValue(42));
writer.flushAll();
} finally {
writer.close();
}
if (getIndexInSubtaskGroup() == 0) {
Thread.sleep(2000);
}
}
}
/**
* This invokable finishes only after all its upstream task finishes. Unexpected result
* partition errors can happen if a task finished later than its consumer task.
*/
public static class Receiver extends AbstractInvokable {
public Receiver(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
if (getEnvironment().getAllInputGates().length < 2) {
throw new IllegalStateException();
}
final String[] tmpDirs = getEnvironment().getTaskManagerInfo().getTmpDirectories();
final List<RecordReader<IntValue>> readers =
Arrays.asList(getEnvironment().getAllInputGates()).stream()
.map(
inputGate ->
new RecordReader<>(inputGate, IntValue.class, tmpDirs))
.collect(Collectors.toList());
for (RecordReader<IntValue> reader : readers) {
while (reader.hasNext()) {
reader.next();
}
}
}
}
/** Invokable which fails exactly once with a {@link PartitionException}. */
public static class OneTimeFailingReceiverWithPartitionException extends AbstractInvokable {
private static final AtomicBoolean hasFailed = new AtomicBoolean(false);
public OneTimeFailingReceiverWithPartitionException(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
if (hasFailed.compareAndSet(false, true)) {
throw new PartitionNotFoundException(
getEnvironment().getInputGate(0).getChannel(1).getPartitionId());
}
}
}
}