blob: 68651b78fa19cccf2c6eabeb3f6bfc0307b4d503 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.TestLogger;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.Arrays;
import java.util.Collections;
import static org.apache.flink.test.util.TestUtils.tryExecute;
/**
* To test approximate downstream failover.
*
* <p>If a task fails, all its downstream tasks restart, including itself. - Test the failed task
* can reconnect successfully. - Test partial records are cleaned up correctly. - Test only
* downstream are tasks restart.
*/
@Ignore("Approximate local recovery has currently no scheduler support")
public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
private static final int BUFFER_SIZE = 4096;
@Rule
public MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(createConfig())
.setNumberTaskManagers(4)
.setNumberSlotsPerTaskManager(1)
.build());
@Rule public final Timeout timeout = Timeout.millis(300000L);
/**
* Test the following topology.
*
* <pre>
* (source1/1) -----> (map1/1) -----> (sink1/1)
* </pre>
*
* <p>(map1/1) fails, (map1/1) and (sink1/1) restart
*/
@Test
public void localTaskFailureRecoveryThreeTasks() throws Exception {
final int failAfterElements = 150;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1)
.setBufferTimeout(0)
.setMaxParallelism(128)
.disableOperatorChaining()
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getCheckpointConfig().enableApproximateLocalRecovery(true);
env.addSource(new AppSourceFunction())
.slotSharingGroup("source")
.map(new FailingMapper<>(failAfterElements))
.slotSharingGroup("map")
.addSink(new ValidatingAtMostOnceSink(300))
.slotSharingGroup("sink");
FailingMapper.failedBefore = false;
tryExecute(env, "testThreeTasks");
}
/**
* Test the following topology.
*
* <pre>
* (source1/1) -----> (map1/2) -----> (sink1/1)
* | ^
* -------------> (map2/2) ---------|
* </pre>
*
* <p>(map1/2) fails, (map1/2) and (sink1/1) restart
*/
@Test
public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
final int failAfterElements = 20;
final int keyByChannelNumber = 2;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1)
.setBufferTimeout(0)
.disableOperatorChaining()
.setMaxParallelism(128)
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getCheckpointConfig().enableApproximateLocalRecovery(true);
env.addSource(
new AppSourceFunction(
BUFFER_SIZE, env.getMaxParallelism(), keyByChannelNumber))
.slotSharingGroup("source")
.keyBy(InvertedKeyTuple::key)
.map(new FailingMapper<>(failAfterElements))
.setParallelism(keyByChannelNumber)
.slotSharingGroup("map")
.addSink(new ValidatingAtMostOnceSink(200, keyByChannelNumber))
.slotSharingGroup("sink");
FailingMapper.failedBefore = false;
tryExecute(env, "testTwoMapTasks");
}
/**
* source function used to generate InvertedKeyTuple.
*
* <p>InvertedKeyTuple includes four fields: - index: increased upon each tuple is generated. -
* key: based on which the tuple to be partitioned for downstream consumption (a map, as shown
* in the testcase for example). - selectedChannel: the selected channel (the selected instance
* of the downstream operator) the tuple to be sent to. Usually, selectedChannel is decided by
* the hash-partition function applied on the key. However in this case, we invert the process
* by selecting the channel to be sent first, and then decide a possible key the tuple may
* contain. In this way, we can control exactly how many tuples are sent to each instance of the
* downstream operator. - longOrShortString: either long or short string used to test network
* transition upon reconnection.
*
* <p>AppSourceFunction makes sure tuples generated are evenly distributed amongst downstream
* operator instances.
*/
private static class AppSourceFunction extends RichParallelSourceFunction<InvertedKeyTuple> {
private final String shortString =
"I am a very long string to test partial records hohoho hahaha ";
private final String longOrShortString;
private final int maxParallelism;
private final int numberOfChannels;
private final int[] keys;
private int index = 0;
private volatile boolean running = true;
// short-length string
AppSourceFunction() {
this.longOrShortString = shortString;
this.maxParallelism = 128;
this.numberOfChannels = 1;
this.keys = initKeys(numberOfChannels);
}
// long-length string
AppSourceFunction(int bufferSize, int maxParallelism, int numberOfChannels) {
this.maxParallelism = maxParallelism;
this.numberOfChannels = numberOfChannels;
this.keys = initKeys(numberOfChannels);
StringBuilder builder = new StringBuilder(shortString);
for (int i = 0; i <= 2 * bufferSize / shortString.length() + 1; i++) {
builder.append(shortString);
}
this.longOrShortString = builder.toString();
}
@Override
public void run(SourceContext<InvertedKeyTuple> ctx) throws Exception {
while (running) {
synchronized (ctx.getCheckpointLock()) {
if (index % 100 == 0) {
Thread.sleep(50);
}
int selectedChannel = index % numberOfChannels;
int key = keys[selectedChannel];
ctx.collect(
new InvertedKeyTuple(index, key, selectedChannel, longOrShortString));
}
index++;
}
}
@Override
public void cancel() {
running = false;
}
// Find the first key which falls into the key group of each downstream channel
private int[] initKeys(int numberOfChannels) {
int[] keys = new int[numberOfChannels];
for (int i = 0; i < numberOfChannels; i++) {
int key = 0;
while (key < 1000 && selectedChannel(key) != i) {
key++;
}
assert key < 1000 : "Can not find a key within number 1000";
keys[i] = key;
}
return keys;
}
private int selectedChannel(int key) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfChannels);
}
}
/** Fails once the first map instance reaches failCount. */
private static class FailingMapper<T> extends RichMapFunction<T, T> {
private static final long serialVersionUID = 6334389850158703L;
private static volatile boolean failedBefore;
private final int failCount;
private int numElementsTotal;
private boolean failer;
FailingMapper(int failCount) {
this.failCount = failCount;
}
@Override
public void open(Configuration parameters) {
failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
}
@Override
public T map(T value) throws Exception {
numElementsTotal++;
if (!failedBefore) {
Thread.sleep(10);
if (failer && numElementsTotal >= failCount) {
failedBefore = true;
throw new Exception("Artificial Test Failure");
}
}
return value;
}
}
/**
* Validating sink to make sure each selectedChannel (of map) gets at least numElementsTotal
* elements.
*
* <p>Notice that the source generates tuples evenly distributed amongst downstream operator
* (map) instances. Besides, the index generated is continuous and monotonic increasing as long
* as the source is not restarted. Hence, - In the case of approximate local recovery is NOT
* enabled, where the entire job, including source, is restarted after map fails, source
* regenerates tuples indexed from 0. Upon reach numElementsTotal, the maximal possible index is
* numElementsTotal * numberOfInputChannels - 1 - In the case of approximate local recovery is
* enabled, where only the downstream of the failed task restart, source does not regenerating
* tuples, the maximal possible index > numElementsTotal * numberOfInputChannels - 1
*/
private static class ValidatingAtMostOnceSink extends RichSinkFunction<InvertedKeyTuple> {
private static final long serialVersionUID = 1748426382527469932L;
private final int numElementsTotal;
private final int[] numElements;
private final Integer[] indexReachingNumElements;
private final int numberOfInputChannels;
ValidatingAtMostOnceSink(int numElementsTotal, int numberOfInputChannels) {
this.numElementsTotal = numElementsTotal;
this.numberOfInputChannels = numberOfInputChannels;
this.numElements = new int[numberOfInputChannels];
this.indexReachingNumElements = new Integer[numberOfInputChannels];
}
ValidatingAtMostOnceSink(int numElementsTotal) {
this.numElementsTotal = numElementsTotal;
this.numberOfInputChannels = 1;
this.numElements = new int[numberOfInputChannels];
this.indexReachingNumElements = new Integer[numberOfInputChannels];
}
@Override
public void invoke(InvertedKeyTuple tuple, Context ctx) throws Exception {
assert tuple.selectedChannel < numberOfInputChannels;
numElements[tuple.selectedChannel]++;
boolean allReachNumElementsTotal = true;
for (int i = 0; i < numberOfInputChannels; i++) {
if (numElements[i] == numElementsTotal) {
indexReachingNumElements[i] = tuple.index;
} else if (numElements[i] < numElementsTotal) {
allReachNumElementsTotal = false;
}
}
if (allReachNumElementsTotal) {
assert Collections.max(Arrays.asList(indexReachingNumElements))
>= numElementsTotal * numberOfInputChannels;
throw new SuccessException();
}
}
}
private static Configuration createConfig() {
Configuration config = new Configuration();
config.set(
TaskManagerOptions.MEMORY_SEGMENT_SIZE,
MemorySize.parse(Integer.toString(BUFFER_SIZE)));
return config;
}
private static class InvertedKeyTuple {
int index;
/** Key based on which the tuple is partitioned. */
int key;
/** The selected channel of this tuple after key-partitioned. */
int selectedChannel;
String longOrShortString;
InvertedKeyTuple(int index, int key, int selectedChannel, String longOrShortString) {
this.index = index;
this.key = key;
this.selectedChannel = selectedChannel;
this.longOrShortString = longOrShortString;
}
int key() {
return key;
}
}
}