blob: 3537adcad4b97d08d42ad66fb70589748d955ddb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Integration test for triggering and resuming from savepoints. */
@SuppressWarnings("serial")
public class SavepointITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
@Rule public final TemporaryFolder folder = new TemporaryFolder();
private File checkpointDir;
private File savepointDir;
@Before
public void setUp() throws Exception {
final File testRoot = folder.newFolder();
checkpointDir = new File(testRoot, "checkpoints");
savepointDir = new File(testRoot, "savepoints");
if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
fail("Test setup failed: failed to create temporary directories.");
}
}
@Test
public void testStopWithSavepointForFlip27SourceWithDrain() throws Exception {
testStopWithSavepointForFlip27Source(true);
}
@Test
public void testStopWithSavepointForFlip27SourceWithoutDrain() throws Exception {
testStopWithSavepointForFlip27Source(false);
}
private void testStopWithSavepointForFlip27Source(boolean drain) throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
BoundedPassThroughOperator<Long> operator =
new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
DataStream<Long> stream =
env.fromSequence(0, Long.MAX_VALUE)
.transform("pass-through", BasicTypeInfo.LONG_TYPE_INFO, operator);
stream.addSink(new DiscardingSink<>());
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
try {
BoundedPassThroughOperator.resetForTest(1, true);
client.submitJob(jobGraph).get();
BoundedPassThroughOperator.getProgressLatch().await();
client.stopWithSavepoint(jobId, drain, null).get();
if (drain) {
Assert.assertTrue(BoundedPassThroughOperator.inputEnded);
} else {
Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
}
} finally {
cluster.after();
}
}
/**
* Triggers a savepoint for a job that uses the FsStateBackend. We expect that all checkpoint
* files are written to a new savepoint directory.
*
* <ol>
* <li>Submit job, wait for some progress
* <li>Trigger savepoint and verify that savepoint has been created
* <li>Shut down the cluster, re-submit the job from the savepoint, verify that the initial
* state has been reset, and all tasks are running again
* <li>Cancel job, dispose the savepoint, and verify that everything has been cleaned up
* </ol>
*/
@Test
public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers * numSlotsPerTaskManager;
final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
verifySavepoint(parallelism, savepointPath);
restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
}
@Test
public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath()
throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers * numSlotsPerTaskManager;
final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
final org.apache.flink.core.fs.Path oldPath =
new org.apache.flink.core.fs.Path(savepointPath);
final org.apache.flink.core.fs.Path newPath =
new org.apache.flink.core.fs.Path(folder.newFolder().toURI().toString());
(new org.apache.flink.core.fs.Path(savepointPath).getFileSystem()).rename(oldPath, newPath);
verifySavepoint(parallelism, newPath.toUri().toString());
restoreJobAndVerifyState(newPath.toUri().toString(), clusterFactory, parallelism);
}
@Test
public void testShouldAddEntropyToSavepointPath() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers * numSlotsPerTaskManager;
final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers,
numSlotsPerTaskManager,
getCheckpointingWithEntropyConfig());
final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
assertThat(savepointDir, hasEntropyInFileStateHandlePaths());
restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
}
private Configuration getCheckpointingWithEntropyConfig() {
final String savepointPathWithEntropyPlaceholder =
new File(savepointDir, EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY)
.getPath();
final Configuration config =
getFileBasedCheckpointsConfig(
"test-entropy://" + savepointPathWithEntropyPlaceholder);
config.setString("s3.entropy.key", EntropyInjectingTestFileSystem.ENTROPY_INJECTION_KEY);
return config;
}
private String submitJobAndTakeSavepoint(
MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
final JobID jobId = jobGraph.getJobID();
StatefulCounter.resetForTest(parallelism);
MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
try {
client.submitJob(jobGraph).get();
waitForAllTaskRunning(cluster.getMiniCluster(), jobId);
StatefulCounter.getProgressLatch().await();
return client.cancelWithSavepoint(jobId, null).get();
} finally {
cluster.after();
StatefulCounter.resetForTest(parallelism);
}
}
private void verifySavepoint(final int parallelism, final String savepointPath)
throws URISyntaxException {
// Only one savepoint should exist
File savepointDir = new File(new URI(savepointPath));
assertTrue("Savepoint directory does not exist.", savepointDir.exists());
assertTrue(
"Savepoint did not create self-contained directory.", savepointDir.isDirectory());
File[] savepointFiles = savepointDir.listFiles();
if (savepointFiles != null) {
// Expect one metadata file and one checkpoint file per stateful
// parallel subtask
String errMsg =
"Did not write expected number of savepoint/checkpoint files to directory: "
+ Arrays.toString(savepointFiles);
assertEquals(errMsg, 1 + parallelism, savepointFiles.length);
} else {
fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath));
}
}
private void restoreJobAndVerifyState(
String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism)
throws Exception {
final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));
final JobID jobId = jobGraph.getJobID();
StatefulCounter.resetForTest(parallelism);
MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
try {
client.submitJob(jobGraph).get();
// Await state is restored
StatefulCounter.getRestoreLatch().await();
// Await some progress after restore
StatefulCounter.getProgressLatch().await();
client.cancel(jobId).get();
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobId),
Time.milliseconds(50),
Deadline.now().plus(Duration.ofSeconds(30)),
status -> status == JobStatus.CANCELED,
TestingUtils.defaultScheduledExecutor());
client.disposeSavepoint(savepointPath).get();
assertFalse("Savepoint not properly cleaned up.", new File(savepointPath).exists());
} finally {
cluster.after();
StatefulCounter.resetForTest(parallelism);
}
}
@Test
public void testTriggerSavepointForNonExistingJob() throws Exception {
// Config
final int numTaskManagers = 1;
final int numSlotsPerTaskManager = 1;
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
final MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
cluster.before();
final ClusterClient<?> client = cluster.getClusterClient();
final JobID jobID = new JobID();
try {
client.triggerSavepoint(jobID, null).get();
fail();
} catch (ExecutionException e) {
assertTrue(
ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent());
assertTrue(ExceptionUtils.findThrowableWithMessage(e, jobID.toString()).isPresent());
} finally {
cluster.after();
}
}
@Test
public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
// Config
final int numTaskManagers = 1;
final int numSlotsPerTaskManager = 1;
final Configuration config = new Configuration();
final MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
cluster.before();
final ClusterClient<?> client = cluster.getClusterClient();
final JobVertex vertex = new JobVertex("Blocking vertex");
vertex.setInvokableClass(BlockingNoOpInvokable.class);
vertex.setParallelism(1);
final JobGraph graph = JobGraphTestUtils.streamingJobGraph(vertex);
try {
client.submitJob(graph).get();
// triggerSavepoint is only available after all tasks are running
waitForAllTaskRunning(cluster.getMiniCluster(), graph.getJobID());
client.triggerSavepoint(graph.getJobID(), null).get();
fail();
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.findThrowable(e, IllegalStateException.class).isPresent());
assertTrue(
ExceptionUtils.findThrowableWithMessage(e, graph.getJobID().toString())
.isPresent());
assertTrue(
ExceptionUtils.findThrowableWithMessage(e, "is not a streaming job")
.isPresent());
} finally {
cluster.after();
}
}
static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, BoundedOneInput {
static volatile CountDownLatch progressLatch;
static volatile CountDownLatch snapshotAllowedLatch;
static volatile CountDownLatch snapshotStartedLatch;
static volatile boolean inputEnded;
private transient boolean processed;
BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
this.chainingStrategy = chainingStrategy;
}
private static void allowSnapshots() {
snapshotAllowedLatch.countDown();
}
public static void awaitSnapshotStarted() throws InterruptedException {
snapshotStartedLatch.await();
}
@Override
public void endInput() throws Exception {
inputEnded = true;
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
output.collect(element);
if (!processed) {
processed = true;
progressLatch.countDown();
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
snapshotStartedLatch.countDown();
snapshotAllowedLatch.await();
super.snapshotState(context);
}
// --------------------------------------------------------------------
static CountDownLatch getProgressLatch() {
return progressLatch;
}
static void resetForTest(int parallelism, boolean allowSnapshots) {
progressLatch = new CountDownLatch(parallelism);
snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : 1);
snapshotStartedLatch = new CountDownLatch(parallelism);
inputEnded = false;
}
}
private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
.filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
.isPresent();
}
@Test
public void testStopSavepointWithBoundedInput() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) {
final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers,
numSlotsPerTaskManager,
getFileBasedCheckpointsConfig());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
BoundedPassThroughOperator<Integer> operator =
new BoundedPassThroughOperator<>(chainingStrategy);
DataStream<Integer> stream =
env.addSource(new InfiniteTestSource())
.transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
stream.addSink(new DiscardingSink<>());
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
try {
BoundedPassThroughOperator.resetForTest(1, true);
client.submitJob(jobGraph).get();
BoundedPassThroughOperator.getProgressLatch().await();
client.stopWithSavepoint(jobId, false, null).get();
Assert.assertFalse(
"input ended with chainingStrategy " + chainingStrategy,
BoundedPassThroughOperator.inputEnded);
} finally {
cluster.after();
}
}
}
@Test
public void testSubmitWithUnknownSavepointPath() throws Exception {
// Config
int numTaskManagers = 1;
int numSlotsPerTaskManager = 1;
int parallelism = numTaskManagers * numSlotsPerTaskManager;
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
try {
// High value to ensure timeouts if restarted.
int numberOfRetries = 1000;
// Submit the job
// Long delay to ensure that the test times out if the job
// manager tries to restart the job.
final JobGraph jobGraph = createJobGraph(parallelism, numberOfRetries, 3600000);
// Set non-existing savepoint path
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
assertEquals("unknown path", jobGraph.getSavepointRestoreSettings().getRestorePath());
LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
try {
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Exception e) {
Optional<JobExecutionException> expectedJobExecutionException =
ExceptionUtils.findThrowable(e, JobExecutionException.class);
Optional<FileNotFoundException> expectedFileNotFoundException =
ExceptionUtils.findThrowable(e, FileNotFoundException.class);
if (!(expectedJobExecutionException.isPresent()
&& expectedFileNotFoundException.isPresent())) {
throw e;
}
}
} finally {
cluster.after();
}
}
@Test
public void testStopWithSavepointFailingInSnapshotCreation() throws Exception {
testStopWithFailingSourceInOnePipeline(
new SnapshotFailingInfiniteTestSource(),
folder.newFolder(),
// two restarts expected:
// 1. task failure restart
// 2. job failover triggered by the CheckpointFailureManager
2,
assertInSnapshotCreationFailure());
}
@Test
public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception {
// the trigger need to be reset in case the test is run multiple times
CancelFailingInfiniteTestSource.cancelTriggered = false;
testStopWithFailingSourceInOnePipeline(
new CancelFailingInfiniteTestSource(),
folder.newFolder(),
// two restarts expected:
// 1. task failure restart
// 2. job failover triggered by SchedulerBase.stopWithSavepoint
2,
assertAfterSnapshotCreationFailure());
}
private static BiConsumer<JobID, ExecutionException> assertAfterSnapshotCreationFailure() {
return (jobId, actualException) -> {
if (ClusterOptions.isAdaptiveSchedulerEnabled(new Configuration())) {
assertThat(
actualException,
containsMessage("Stop with savepoint operation could not be completed"));
} else {
Optional<FlinkException> actualFlinkException =
ExceptionUtils.findThrowable(actualException, FlinkException.class);
assertTrue(actualFlinkException.isPresent());
assertThat(
actualFlinkException.get(),
containsMessage(
String.format(
"A global fail-over is triggered to recover the job %s.",
jobId)));
}
};
}
private static BiConsumer<JobID, ExecutionException> assertInSnapshotCreationFailure() {
return (ignored, actualException) -> {
if (ClusterOptions.isAdaptiveSchedulerEnabled(new Configuration())) {
assertThat(actualException, containsCause(FlinkException.class));
} else {
Optional<CheckpointException> actualFailureCause =
ExceptionUtils.findThrowable(actualException, CheckpointException.class);
assertTrue(actualFailureCause.isPresent());
}
};
}
private static OneShotLatch failingPipelineLatch;
private static OneShotLatch succeedingPipelineLatch;
/**
* FLINK-21030
*
* <p>Tests the handling of a failure that happened while stopping an embarrassingly parallel
* job with a Savepoint. The test expects that the stopping action fails and all executions are
* in state {@code RUNNING} afterwards.
*
* @param failingSource the failing {@link SourceFunction} used in one of the two pipelines.
* @param expectedMaximumNumberOfRestarts the maximum number of restarts allowed by the restart
* strategy.
* @param exceptionAssertion asserts the client-call exception to verify that the right error
* was handled.
* @see SavepointITCase#failingPipelineLatch The latch used to trigger the successful start of
* the later on failing pipeline.
* @see SavepointITCase#succeedingPipelineLatch The latch that triggers the successful start of
* the succeeding pipeline.
* @throws Exception if an error occurred while running the test.
*/
private static void testStopWithFailingSourceInOnePipeline(
InfiniteTestSource failingSource,
File savepointDir,
int expectedMaximumNumberOfRestarts,
BiConsumer<JobID, ExecutionException> exceptionAssertion)
throws Exception {
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder().build());
failingPipelineLatch = new OneShotLatch();
succeedingPipelineLatch = new OneShotLatch();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig()
.setRestartStrategy(
RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 0));
env.addSource(failingSource)
.name("Failing Source")
.map(
value -> {
failingPipelineLatch.trigger();
return value;
})
.addSink(new DiscardingSink<>());
env.addSource(new InfiniteTestSource())
.name("Succeeding Source")
.map(
value -> {
succeedingPipelineLatch.trigger();
return value;
})
.addSink(new DiscardingSink<>());
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
cluster.before();
try {
ClusterClient<?> client = cluster.getClusterClient();
client.submitJob(jobGraph).get();
// we need to wait for both pipelines to be in state RUNNING because that's the only
// state which allows creating a savepoint
failingPipelineLatch.await();
succeedingPipelineLatch.await();
try {
client.stopWithSavepoint(jobGraph.getJobID(), false, savepointDir.getAbsolutePath())
.get();
fail("The future should fail exceptionally.");
} catch (ExecutionException e) {
exceptionAssertion.accept(jobGraph.getJobID(), e);
}
waitUntilAllTasksAreRunning(cluster.getRestAddres(), jobGraph.getJobID());
} finally {
cluster.after();
}
}
public static void waitUntilAllTasksAreRunning(URI restAddress, JobID jobId) throws Exception {
// access the REST endpoint of the cluster to determine the state of each
// ExecutionVertex
final RestClient restClient =
new RestClient(
RestClientConfiguration.fromConfiguration(
new UnmodifiableConfiguration(new Configuration())),
TestingUtils.defaultExecutor());
final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
final JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
params.jobPathParameter.resolve(jobId);
CommonTestUtils.waitUntilCondition(
() -> {
JobDetailsInfo detailsInfo =
restClient
.sendRequest(
restAddress.getHost(),
restAddress.getPort(),
detailsHeaders,
params,
EmptyRequestBody.getInstance())
.get();
return allVerticesRunning(detailsInfo.getJobVerticesPerState());
},
Deadline.fromNow(Duration.ofSeconds(10)));
}
private static boolean allVerticesRunning(Map<ExecutionState, Integer> states) {
return states.entrySet().stream()
.allMatch(
entry -> {
if (entry.getKey() == ExecutionState.RUNNING) {
return entry.getValue() > 0;
} else {
return entry.getValue() == 0; // no vertices in non-running state.
}
});
}
/**
* FLINK-5985
*
* <p>This test ensures we can restore from a savepoint under modifications to the job graph
* that only concern stateless operators.
*/
@Test
public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
// Config
int numTaskManagers = 2;
int numSlotsPerTaskManager = 2;
int parallelism = 2;
// Test deadline
final Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5));
// Flink configuration
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
String savepointPath;
LOG.info("Flink configuration: " + config + ".");
// Start Flink
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
LOG.info("Shutting down Flink cluster.");
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
try {
final StatefulCounter statefulCounter = new StatefulCounter();
StatefulCounter.resetForTest(parallelism);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.addSource(new InfiniteTestSource())
.shuffle()
.map(value -> 4 * value)
.shuffle()
.map(statefulCounter)
.uid("statefulCounter")
.shuffle()
.map(value -> 2 * value)
.addSink(new DiscardingSink<>());
JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
JobID jobID = client.submitJob(originalJobGraph).get();
// wait for the Tasks to be ready
waitForAllTaskRunning(cluster.getMiniCluster(), jobID);
assertTrue(
StatefulCounter.getProgressLatch()
.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
savepointPath = client.triggerSavepoint(jobID, null).get();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
} finally {
// Shut down the Flink cluster (thereby canceling the job)
LOG.info("Shutting down Flink cluster.");
cluster.after();
}
// create a new MiniCluster to make sure we start with completely
// new resources
cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
LOG.info("Restarting Flink cluster.");
cluster.before();
client = cluster.getClusterClient();
try {
// Reset static test helpers
StatefulCounter.resetForTest(parallelism);
// Gather all task deployment descriptors
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
// generate a modified job graph that adds a stateless op
env.addSource(new InfiniteTestSource())
.shuffle()
.map(new StatefulCounter())
.uid("statefulCounter")
.shuffle()
.map(value -> value)
.addSink(new DiscardingSink<>());
JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
// Set the savepoint path
modifiedJobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath));
LOG.info(
"Resubmitting job "
+ modifiedJobGraph.getJobID()
+ " with "
+ "savepoint path "
+ savepointPath
+ " in detached mode.");
// Submit the job
client.submitJob(modifiedJobGraph).get();
// Await state is restored
assertTrue(
StatefulCounter.getRestoreLatch()
.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Await some progress after restore
assertTrue(
StatefulCounter.getProgressLatch()
.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
} finally {
cluster.after();
}
}
// ------------------------------------------------------------------------
// Test program
// ------------------------------------------------------------------------
/** Creates a streaming JobGraph from the StreamEnvironment. */
private JobGraph createJobGraph(int parallelism, int numberOfRetries, long restartDelay) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.disableOperatorChaining();
env.getConfig()
.setRestartStrategy(
RestartStrategies.fixedDelayRestart(numberOfRetries, restartDelay));
DataStream<Integer> stream =
env.addSource(new InfiniteTestSource()).shuffle().map(new StatefulCounter());
stream.addSink(new DiscardingSink<>());
return env.getStreamGraph().getJobGraph();
}
private static class InfiniteTestSource implements SourceFunction<Integer> {
private static final long serialVersionUID = 1L;
private volatile boolean running = true;
private volatile boolean suspended = false;
private static final Collection<InfiniteTestSource> createdSources =
new CopyOnWriteArrayList<>();
private transient volatile CompletableFuture<Void> completeFuture;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
completeFuture = new CompletableFuture<>();
createdSources.add(this);
try {
while (running) {
if (!suspended) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(1);
}
}
Thread.sleep(1);
}
completeFuture.complete(null);
} catch (Exception e) {
completeFuture.completeExceptionally(e);
throw e;
}
}
@Override
public void cancel() {
running = false;
}
public void suspend() {
suspended = true;
}
public static void resetForTest() {
createdSources.clear();
}
public CompletableFuture<Void> getCompleteFuture() {
return completeFuture;
}
public static void cancelAllAndAwait() throws ExecutionException, InterruptedException {
createdSources.forEach(InfiniteTestSource::cancel);
allOf(
createdSources.stream()
.map(InfiniteTestSource::getCompleteFuture)
.toArray(CompletableFuture[]::new))
.get();
}
public static void suspendAll() {
createdSources.forEach(InfiniteTestSource::suspend);
}
}
/**
* An {@link InfiniteTestSource} implementation that fails when cancel is called for the first
* time.
*/
private static class CancelFailingInfiniteTestSource extends InfiniteTestSource {
private static volatile boolean cancelTriggered = false;
@Override
public void cancel() {
if (!cancelTriggered) {
cancelTriggered = true;
throw new RuntimeException("Expected RuntimeException after snapshot creation.");
}
super.cancel();
}
}
/** An {@link InfiniteTestSource} implementation that fails while creating a snapshot. */
private static class SnapshotFailingInfiniteTestSource extends InfiniteTestSource
implements CheckpointedFunction {
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
throw new Exception(
"Expected Exception happened during snapshot creation within test source");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// all good here
}
}
private static class StatefulCounter extends RichMapFunction<Integer, Integer>
implements ListCheckpointed<byte[]> {
private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
private int numCollectedElements = 0;
private static final long serialVersionUID = 7317800376639115920L;
private byte[] data;
@Override
public void open(Configuration parameters) throws Exception {
if (data == null) {
// We need this to be large, because we want to test with files
Random rand = new Random(getRuntimeContext().getIndexOfThisSubtask());
data =
new byte
[(int)
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD
.defaultValue()
.getBytes()
+ 1];
rand.nextBytes(data);
}
}
@Override
public Integer map(Integer value) throws Exception {
for (int i = 0; i < data.length; i++) {
data[i] += 1;
}
if (numCollectedElements++ > 10) {
progressLatch.countDown();
}
return value;
}
@Override
public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(data);
}
@Override
public void restoreState(List<byte[]> state) throws Exception {
if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException(
"Test failed due to unexpected recovered state size " + state.size());
}
this.data = state.get(0);
restoreLatch.countDown();
}
// --------------------------------------------------------------------
static CountDownLatch getProgressLatch() {
return progressLatch;
}
static CountDownLatch getRestoreLatch() {
return restoreLatch;
}
static void resetForTest(int parallelism) {
progressLatch = new CountDownLatch(parallelism);
restoreLatch = new CountDownLatch(parallelism);
}
}
private static final int ITER_TEST_PARALLELISM = 1;
private static OneShotLatch[] iterTestSnapshotWait = new OneShotLatch[ITER_TEST_PARALLELISM];
private static OneShotLatch[] iterTestRestoreWait = new OneShotLatch[ITER_TEST_PARALLELISM];
private static int[] iterTestCheckpointVerify = new int[ITER_TEST_PARALLELISM];
@Test
public void testSavepointForJobWithIteration() throws Exception {
for (int i = 0; i < ITER_TEST_PARALLELISM; ++i) {
iterTestSnapshotWait[i] = new OneShotLatch();
iterTestRestoreWait[i] = new OneShotLatch();
iterTestCheckpointVerify[i] = 0;
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final IntegerStreamSource source = new IntegerStreamSource();
IterativeStream<Integer> iteration =
env.addSource(source)
.flatMap(
new RichFlatMapFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Integer in, Collector<Integer> clctr)
throws Exception {
clctr.collect(in);
}
})
.setParallelism(ITER_TEST_PARALLELISM)
.keyBy(
new KeySelector<Integer, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object getKey(Integer value) throws Exception {
return value;
}
})
.flatMap(new DuplicateFilter())
.setParallelism(ITER_TEST_PARALLELISM)
.iterate();
DataStream<Integer> iterationBody =
iteration
.map(
new MapFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer map(Integer value) throws Exception {
return value;
}
})
.setParallelism(ITER_TEST_PARALLELISM);
iteration.closeWith(iterationBody);
StreamGraph streamGraph = env.getStreamGraph("Test");
JobGraph jobGraph = streamGraph.getJobGraph();
Configuration config = getFileBasedCheckpointsConfig();
config.addAll(jobGraph.getJobConfiguration());
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ZERO);
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism())
.build());
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
String savepointPath = null;
try {
client.submitJob(jobGraph).get();
waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID());
for (OneShotLatch latch : iterTestSnapshotWait) {
latch.await();
}
savepointPath = client.triggerSavepoint(jobGraph.getJobID(), null).get();
client.cancel(jobGraph.getJobID()).get();
while (!client.getJobStatus(jobGraph.getJobID()).get().isGloballyTerminalState()) {
Thread.sleep(100);
}
jobGraph = streamGraph.getJobGraph();
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
client.submitJob(jobGraph).get();
for (OneShotLatch latch : iterTestRestoreWait) {
latch.await();
}
client.cancel(jobGraph.getJobID()).get();
while (!client.getJobStatus(jobGraph.getJobID()).get().isGloballyTerminalState()) {
Thread.sleep(100);
}
} finally {
if (null != savepointPath) {
client.disposeSavepoint(savepointPath);
}
cluster.after();
}
}
private static final class IntegerStreamSource extends RichSourceFunction<Integer>
implements ListCheckpointed<Integer> {
private static final long serialVersionUID = 1L;
private volatile boolean running;
private volatile boolean isRestored;
private int emittedCount;
public IntegerStreamSource() {
this.running = true;
this.isRestored = false;
this.emittedCount = 0;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(emittedCount);
}
if (emittedCount < 100) {
++emittedCount;
} else {
emittedCount = 0;
}
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()] = emittedCount;
return Collections.singletonList(emittedCount);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
if (!state.isEmpty()) {
this.emittedCount = state.get(0);
}
Assert.assertEquals(
iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()],
emittedCount);
iterTestRestoreWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
}
}
private static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
static final ValueStateDescriptor<Boolean> DESCRIPTOR =
new ValueStateDescriptor<>("seen", Boolean.class, false);
private static final long serialVersionUID = 1L;
private ValueState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getState(DESCRIPTOR);
}
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
if (!operatorState.value()) {
out.collect(value);
operatorState.update(true);
}
if (30 == value) {
iterTestSnapshotWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
}
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static class MiniClusterResourceFactory {
private final int numTaskManagers;
private final int numSlotsPerTaskManager;
private final Configuration config;
private MiniClusterResourceFactory(
int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
this.numTaskManagers = numTaskManagers;
this.numSlotsPerTaskManager = numSlotsPerTaskManager;
this.config = config;
}
MiniClusterWithClientResource get() {
return new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
}
}
private Configuration getFileBasedCheckpointsConfig(final String savepointDir) {
final Configuration config = new Configuration();
config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
config.setString(
CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
return config;
}
private Configuration getFileBasedCheckpointsConfig() {
return getFileBasedCheckpointsConfig(savepointDir.toURI().toString());
}
private static Matcher<File> hasEntropyInFileStateHandlePaths() {
return new TypeSafeDiagnosingMatcher<File>() {
@Override
protected boolean matchesSafely(
final File savepointDir, final Description mismatchDescription) {
if (savepointDir == null) {
mismatchDescription.appendText("savepoint dir must not be null");
return false;
}
final List<Path> filesWithoutEntropy =
listRecursively(
savepointDir
.toPath()
.resolve(
EntropyInjectingTestFileSystem
.ENTROPY_INJECTION_KEY));
final Path savepointDirWithEntropy =
savepointDir.toPath().resolve(EntropyInjectingTestFileSystem.ENTROPY);
final List<Path> filesWithEntropy = listRecursively(savepointDirWithEntropy);
if (!filesWithoutEntropy.isEmpty()) {
mismatchDescription.appendText(
"there are savepoint files with unresolved entropy placeholders");
return false;
}
if (!Files.exists(savepointDirWithEntropy) || filesWithEntropy.isEmpty()) {
mismatchDescription.appendText(
"there are no savepoint files with added entropy");
return false;
}
return true;
}
@Override
public void describeTo(final Description description) {
description.appendText("all savepoint files should have added entropy");
}
};
}
private static List<Path> listRecursively(final Path dir) {
try {
if (!Files.exists(dir)) {
return Collections.emptyList();
} else {
try (Stream<Path> files = Files.walk(dir, FileVisitOption.FOLLOW_LINKS)) {
return files.filter(Files::isRegularFile).collect(Collectors.toList());
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}