blob: b0358d2bf86177b28ccbf1435903f6c89ad40a40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recovery;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Test for streaming program behaviour in case of TaskManager failure based on {@link
* AbstractTaskManagerProcessFailureRecoveryTest}.
*
* <p>The logic in this test is as follows: - The source slowly emits records (every 10 msecs) until
* the test driver gives the "go" for regular execution - The "go" is given after the first
* taskmanager has been killed, so it can only happen in the recovery run - The mapper must not be
* slow, because otherwise the checkpoint barrier cannot pass the mapper and no checkpoint will be
* completed before the killing of the first TaskManager.
*/
@SuppressWarnings("serial")
public class TaskManagerProcessFailureStreamingRecoveryITCase
extends AbstractTaskManagerProcessFailureRecoveryTest {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private static final int DATA_COUNT = 10000;
@Override
public void testTaskManagerFailure(Configuration configuration, final File coordinateDir)
throws Exception {
final File tempCheckpointDir = tempFolder.newFolder();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(
"localhost",
1337, // not needed since we use ZooKeeper
configuration);
env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.enableCheckpointing(200);
env.setStateBackend(new FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI()));
DataStream<Long> result =
env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
// add a non-chained no-op map to test the chain state restore logic
.map(
new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value;
}
})
.startNewChain()
// populate the coordinate directory so we can proceed to TaskManager
// failure
.map(new Mapper(coordinateDir));
// write result to temporary file
result.addSink(new CheckpointedSink(DATA_COUNT));
try {
// blocking call until execution is done
env.execute();
// TODO: Figure out why this fails when ran with other tests
// Check whether checkpoints have been cleaned up properly
// assertDirectoryEmpty(tempCheckpointDir);
} finally {
// clean up
if (tempCheckpointDir.exists()) {
FileUtils.deleteDirectory(tempCheckpointDir);
}
}
}
private static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
private static final long SLEEP_TIME = 50;
private final File coordinateDir;
private final long end;
private volatile boolean isRunning = true;
private long collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
this.end = end;
}
@Override
public void run(SourceContext<Long> sourceCtx) throws Exception {
final Object checkpointLock = sourceCtx.getCheckpointLock();
RuntimeContext runtimeCtx = getRuntimeContext();
final long stepSize = runtimeCtx.getNumberOfParallelSubtasks();
final long congruence = runtimeCtx.getIndexOfThisSubtask();
final long toCollect =
(end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
while (isRunning && collected < toCollect) {
// check if the proceed file exists (then we go full speed)
// if not, we always recheck and sleep
if (checkForProceedFile) {
if (proceedFile.exists()) {
checkForProceedFile = false;
} else {
// otherwise wait so that we make slow progress
Thread.sleep(SLEEP_TIME);
}
}
synchronized (checkpointLock) {
sourceCtx.collect(collected * stepSize + congruence);
collected++;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.collected);
}
@Override
public void restoreState(List<Long> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException(
"Test failed due to unexpected recovered state size " + state.size());
}
this.collected = state.get(0);
}
}
private static class Mapper extends RichMapFunction<Long, Long> {
private boolean markerCreated = false;
private File coordinateDir;
public Mapper(File coordinateDir) {
this.coordinateDir = coordinateDir;
}
@Override
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
return value;
}
}
private static class CheckpointedSink extends RichSinkFunction<Long>
implements ListCheckpointed<Long> {
private long stepSize;
private long congruence;
private long toCollect;
private Long collected = 0L;
private long end;
public CheckpointedSink(long end) {
this.end = end;
}
@Override
public void open(Configuration parameters) throws IOException {
stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
congruence = getRuntimeContext().getIndexOfThisSubtask();
toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
}
@Override
public void invoke(Long value) throws Exception {
long expected = collected * stepSize + congruence;
Assert.assertTrue(
"Value did not match expected value. " + expected + " != " + value,
value.equals(expected));
collected++;
if (collected > toCollect) {
Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect);
}
}
@Override
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(this.collected);
}
@Override
public void restoreState(List<Long> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException(
"Test failed due to unexpected recovered state size " + state.size());
}
this.collected = state.get(0);
}
}
}