blob: b0358d2bf86177b28ccbf1435903f6c89ad40a40 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.test.recovery;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import java.util.Collections;
import java.util.List;
* Test for streaming program behaviour in case of TaskManager failure based on {@link
* AbstractTaskManagerProcessFailureRecoveryTest}.
* <p>The logic in this test is as follows: - The source slowly emits records (every 10 msecs) until
* the test driver gives the "go" for regular execution - The "go" is given after the first
* taskmanager has been killed, so it can only happen in the recovery run - The mapper must not be
* slow, because otherwise the checkpoint barrier cannot pass the mapper and no checkpoint will be
* completed before the killing of the first TaskManager.
public class TaskManagerProcessFailureStreamingRecoveryITCase
extends AbstractTaskManagerProcessFailureRecoveryTest {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private static final int DATA_COUNT = 10000;
public void testTaskManagerFailure(Configuration configuration, final File coordinateDir)
throws Exception {
final File tempCheckpointDir = tempFolder.newFolder();
StreamExecutionEnvironment env =
1337, // not needed since we use ZooKeeper
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.setStateBackend(new FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI()));
DataStream<Long> result =
env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
// add a non-chained no-op map to test the chain state restore logic
new MapFunction<Long, Long>() {
public Long map(Long value) throws Exception {
return value;
// populate the coordinate directory so we can proceed to TaskManager
// failure
.map(new Mapper(coordinateDir));
// write result to temporary file
result.addSink(new CheckpointedSink(DATA_COUNT));
try {
// blocking call until execution is done
// TODO: Figure out why this fails when ran with other tests
// Check whether checkpoints have been cleaned up properly
// assertDirectoryEmpty(tempCheckpointDir);
} finally {
// clean up
if (tempCheckpointDir.exists()) {
private static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
private static final long SLEEP_TIME = 50;
private final File coordinateDir;
private final long end;
private volatile boolean isRunning = true;
private long collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
this.end = end;
public void run(SourceContext<Long> sourceCtx) throws Exception {
final Object checkpointLock = sourceCtx.getCheckpointLock();
RuntimeContext runtimeCtx = getRuntimeContext();
final long stepSize = runtimeCtx.getNumberOfParallelSubtasks();
final long congruence = runtimeCtx.getIndexOfThisSubtask();
final long toCollect =
(end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
while (isRunning && collected < toCollect) {
// check if the proceed file exists (then we go full speed)
// if not, we always recheck and sleep
if (checkForProceedFile) {
if (proceedFile.exists()) {
checkForProceedFile = false;
} else {
// otherwise wait so that we make slow progress
synchronized (checkpointLock) {
sourceCtx.collect(collected * stepSize + congruence);
public void cancel() {
isRunning = false;
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.collected);
public void restoreState(List<Long> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException(
"Test failed due to unexpected recovered state size " + state.size());
this.collected = state.get(0);
private static class Mapper extends RichMapFunction<Long, Long> {
private boolean markerCreated = false;
private File coordinateDir;
public Mapper(File coordinateDir) {
this.coordinateDir = coordinateDir;
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
return value;
private static class CheckpointedSink extends RichSinkFunction<Long>
implements ListCheckpointed<Long> {
private long stepSize;
private long congruence;
private long toCollect;
private Long collected = 0L;
private long end;
public CheckpointedSink(long end) {
this.end = end;
public void open(Configuration parameters) throws IOException {
stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
congruence = getRuntimeContext().getIndexOfThisSubtask();
toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
public void invoke(Long value) throws Exception {
long expected = collected * stepSize + congruence;
"Value did not match expected value. " + expected + " != " + value,
if (collected > toCollect) {"Collected <= toCollect: " + collected + " > " + toCollect);
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.collected);
public void restoreState(List<Long> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException(
"Test failed due to unexpected recovered state size " + state.size());
this.collected = state.get(0);