blob: 45b51385a63e816cc0633850e838dcf06f6baab1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
/**
* IT case for resuming from checkpoints manually via their external pointer, rather than automatic
* failover through the checkpoint coordinator. This test checks that this works properly with the
* common state backends and checkpoint stores, in combination with asynchronous and incremental
* snapshots.
*
* <p>This tests considers full and incremental checkpoints and was introduced to guard against problems like FLINK-6964.
*/
@Category(AlsoRunWithSchedulerNG.class)
public class ResumeCheckpointManuallyITCase extends TestLogger {
private static final int PARALLELISM = 2;
private static final int NUM_TASK_MANAGERS = 2;
private static final int SLOTS_PER_TASK_MANAGER = 2;
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
createRocksDBStateBackend(checkpointDir, true),
false);
}
@Test
public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
createRocksDBStateBackend(checkpointDir, false),
false);
}
@Test
public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
createRocksDBStateBackend(checkpointDir, true),
true);
}
@Test
public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
createRocksDBStateBackend(checkpointDir, false),
true);
}
@Test
public void testExternalizedFSCheckpointsStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
createFsStateBackend(checkpointDir),
false);
}
@Test
public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
createFsStateBackend(checkpointDir),
true);
}
@Test
public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, true),
false);
} finally {
zkServer.stop();
}
}
@Test
public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, false),
false);
} finally {
zkServer.stop();
}
}
@Test
public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, true),
true);
} finally {
zkServer.stop();
}
}
@Test
public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, false),
true);
} finally {
zkServer.stop();
}
}
@Test
public void testExternalizedFSCheckpointsZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
createFsStateBackend(checkpointDir),
false);
} finally {
zkServer.stop();
}
}
@Test
public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
createFsStateBackend(checkpointDir),
true);
} finally {
zkServer.stop();
}
}
private FsStateBackend createFsStateBackend(File checkpointDir) throws IOException {
return new FsStateBackend(checkpointDir.toURI().toString(), true);
}
private RocksDBStateBackend createRocksDBStateBackend(
File checkpointDir,
boolean incrementalCheckpointing) throws IOException {
return new RocksDBStateBackend(checkpointDir.toURI().toString(), incrementalCheckpointing);
}
private void testExternalizedCheckpoints(
File checkpointDir,
String zooKeeperQuorum,
StateBackend backend,
boolean localRecovery) throws Exception {
final Configuration config = new Configuration();
final File savepointDir = temporaryFolder.newFolder();
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, localRecovery);
// ZooKeeper recovery mode?
if (zooKeeperQuorum != null) {
final File haDir = temporaryFolder.newFolder();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(NUM_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
.build());
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
client.setDetached(true);
try {
// main test sequence: start job -> eCP -> restore job -> eCP -> restore job
String firstExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
assertNotNull(firstExternalCheckpoint);
String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client);
assertNotNull(secondExternalCheckpoint);
String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client);
assertNotNull(thirdExternalCheckpoint);
} finally {
cluster.after();
}
}
private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader());
// wait until all sources have been started
NotifyingInfiniteTupleSource.countDownLatch.await();
waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
client.cancel(initialJobGraph.getJobID());
waitUntilCanceled(initialJobGraph.getJobID(), client);
return getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID());
}
private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException {
Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir, jobId);
if (!checkpoint.isPresent()) {
throw new AssertionError("No complete checkpoint could be found.");
} else {
return checkpoint.get().toString();
}
}
private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
while (true) {
Thread.sleep(50);
Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
if (externalizedCheckpoint.isPresent()) {
break;
}
}
}
private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
try (Stream<Path> checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
return checkpoints
.filter(path -> path.getFileName().toString().startsWith("chk-"))
.filter(path -> {
try (Stream<Path> checkpointFiles = Files.list(path)) {
return checkpointFiles.anyMatch(child -> child.getFileName().toString().contains("meta"));
} catch (IOException ignored) {
return false;
}
})
.findAny();
}
}
private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
while (client.getJobStatus(jobId).get() != JobStatus.CANCELED) {
Thread.sleep(50);
}
}
private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setStateBackend(backend);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(PARALLELISM);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.addSource(new NotifyingInfiniteTupleSource(10_000))
.keyBy(0)
.timeWindow(Time.seconds(3))
.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
.filter(value -> value.f0.startsWith("Tuple 0"));
StreamGraph streamGraph = env.getStreamGraph("Test");
JobGraph jobGraph = streamGraph.getJobGraph();
// recover from previous iteration?
if (externalCheckpoint != null) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
}
return jobGraph;
}
/**
* Infinite source which notifies when all of its sub tasks have been started via the count down latch.
*/
public static class NotifyingInfiniteTupleSource extends ManualWindowSpeedITCase.InfiniteTupleSource {
private static final long serialVersionUID = 8120981235081181746L;
private static CountDownLatch countDownLatch;
public NotifyingInfiniteTupleSource(int numKeys) {
super(numKeys);
}
@Override
public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception {
if (countDownLatch != null) {
countDownLatch.countDown();
}
super.run(out);
}
}
}