blob: 21bfa3f90950ae8fb4fca0157d14c4b9fcd00826 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.checkpointing.utils.AccumulatingIntegerSink;
import org.apache.flink.test.checkpointing.utils.CancellingIntegerSource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
/**
* Tests recovery from a snapshot created in different UC mode (i.e. unaligned checkpoints
* enabled/disabled).
*/
@RunWith(Parameterized.class)
public class UnalignedCheckpointCompatibilityITCase extends TestLogger {
@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final int TOTAL_ELEMENTS = 20;
private static final int FIRST_RUN_EL_COUNT = TOTAL_ELEMENTS / 2;
private static final int FIRST_RUN_BACKPRESSURE_MS = 100; // per element
private static final int PARALLELISM = 1;
private final boolean startAligned;
private final CheckpointType type;
private static MiniClusterWithClientResource miniCluster;
@Parameterized.Parameters(name = "type: {0}, startAligned: {1}")
public static Object[][] parameters() {
return new Object[][] {
{CHECKPOINT, true},
{CHECKPOINT, false},
{SAVEPOINT, true},
{SAVEPOINT, false},
};
}
public UnalignedCheckpointCompatibilityITCase(CheckpointType type, boolean startAligned) {
this.startAligned = startAligned;
this.type = type;
}
@BeforeClass
public static void setupMiniCluster() throws Exception {
File folder = temporaryFolder.getRoot();
final Configuration conf = new Configuration();
conf.set(CHECKPOINTS_DIRECTORY, folder.toURI().toString());
// prevent deletion of checkpoint files while it's being checked and used
conf.set(MAX_RETAINED_CHECKPOINTS, Integer.MAX_VALUE);
miniCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(conf)
.build());
miniCluster.before();
}
@AfterClass
public static void teardownMiniCluster() {
miniCluster.after();
}
@Before
public void cleanDirectory() throws IOException {
FileUtils.cleanDirectory(temporaryFolder.getRoot());
}
@Test
@SuppressWarnings("unchecked")
public void test() throws Exception {
Tuple2<String, Map<String, Object>> pathAndAccumulators =
type.isSavepoint() ? runAndTakeSavepoint() : runAndTakeExternalCheckpoint();
String savepointPath = pathAndAccumulators.f0;
Map<String, Object> accumulatorsBeforeBarrier = pathAndAccumulators.f1;
Map<String, Object> accumulatorsAfterBarrier =
runFromSavepoint(savepointPath, !startAligned, TOTAL_ELEMENTS);
if (type.isSavepoint()) { // consistency can only be checked for savepoints because output
// is lost for ext. checkpoints
assertEquals(
intRange(0, TOTAL_ELEMENTS),
extractAndConcat(accumulatorsBeforeBarrier, accumulatorsAfterBarrier));
}
}
private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() throws Exception {
JobClient jobClient = submitJobInitially(env(startAligned, 0));
waitForAllTaskRunning(
() -> miniCluster.getMiniCluster().getExecutionGraph(jobClient.getJobID()).get());
Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for some backpressure from sink
Future<Map<String, Object>> accFuture =
jobClient
.getJobExecutionResult()
.thenApply(JobExecutionResult::getAllAccumulatorResults);
Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
return new Tuple2<>(savepointFuture.get(), accFuture.get());
}
private Tuple2<String, Map<String, Object>> runAndTakeExternalCheckpoint() throws Exception {
JobClient jobClient = submitJobInitially(env(startAligned, 100));
// structure: root/attempt/checkpoint/_metadata
File attemptDir = waitForChild(temporaryFolder.getRoot(), (dir, name) -> true);
File checkpointDir = waitForChild(attemptDir, (dir, name) -> name.startsWith("chk-"));
File metadata = waitForChild(checkpointDir, (dir, name) -> name.equals("_metadata"));
cancelJob(jobClient);
return new Tuple2<>(metadata.getParentFile().toString(), emptyMap());
}
private static JobClient submitJobInitially(StreamExecutionEnvironment env) throws Exception {
return env.executeAsync(dag(FIRST_RUN_EL_COUNT, true, FIRST_RUN_BACKPRESSURE_MS, env));
}
private Map<String, Object> runFromSavepoint(String path, boolean isAligned, int totalCount)
throws Exception {
StreamExecutionEnvironment env = env(isAligned, 50);
final StreamGraph streamGraph = dag(totalCount, false, 0, env);
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(path));
return env.execute(streamGraph).getJobExecutionResult().getAllAccumulatorResults();
}
@SuppressWarnings({"ConstantConditions"})
private static File waitForChild(File dir, FilenameFilter filenameFilter)
throws InterruptedException {
File[] files = dir.listFiles(filenameFilter);
while (files.length == 0) {
Thread.sleep(50);
files = dir.listFiles(filenameFilter);
}
return Arrays.stream(files).max(Comparator.naturalOrder()).get();
}
private void cancelJob(JobClient jobClient)
throws InterruptedException, java.util.concurrent.ExecutionException {
jobClient.cancel().get();
try {
jobClient.getJobExecutionResult(); // wait for cancellation
} catch (Exception e) {
// ignore cancellation exception
}
}
@SuppressWarnings("unchecked")
private StreamExecutionEnvironment env(boolean isAligned, int checkpointingInterval) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
if (checkpointingInterval > 0) {
env.enableCheckpointing(checkpointingInterval);
}
return env;
}
private static StreamGraph dag(
int numElements,
boolean continueAfterNumElementsReached,
int sinkDelayMillis,
StreamExecutionEnvironment env) {
env.addSource(CancellingIntegerSource.upTo(numElements, continueAfterNumElementsReached))
.addSink(new AccumulatingIntegerSink(sinkDelayMillis));
return env.getStreamGraph();
}
private static List<Integer> intRange(int from, int to) {
return IntStream.range(from, to).boxed().collect(Collectors.toList());
}
private static List<Integer> extractAndConcat(Map<String, Object>... accumulators) {
return Stream.of(accumulators)
.map(AccumulatingIntegerSink::getOutput)
.peek(l -> checkState(!l.isEmpty()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private File tempFolder() throws IOException {
return temporaryFolder.newFolder();
}
}