[FLINK-38403][tests] Stabilize UnalignedCheckpoint ITs
To take a checkpoint, UC ITs relied on a limited number of retries after failing artificially to produce a checkpoint with in-flight data. Naturally, this approach is flaky when the actual test environment is flaky and retry attempts are used up quickly for unplanned restarts.
The solution is to stop fiddling around with the number of retries and instead use NonRecoverableError to force a failure of the job independent of the retries.
This commit also switches to a better way to retrieve the path of that checkpoint (through JobManager API).
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 35d33d7..d2de69d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
@@ -54,15 +55,16 @@
import org.apache.flink.runtime.shuffle.ShuffleServiceOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
@@ -184,24 +186,28 @@
final CompletableFuture<JobSubmissionResult> result =
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
+ final JobID jobID = result.get().getJobID();
checkCounters(
miniCluster
.getMiniCluster()
- .requestJobResult(result.get().getJobID())
+ .requestJobResult(jobID)
.get()
.toJobExecutionResult(getClass().getClassLoader()));
System.out.println(
"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+ if (settings.generateCheckpoint) {
+ return CommonTestUtils.getLatestCompletedCheckpointPath(
+ jobID, miniCluster.getMiniCluster())
+ .map(File::new)
+ .orElseThrow(() -> new AssertionError("Could not generate checkpoint"));
+ }
} catch (Exception e) {
- if (!ExceptionUtils.findThrowable(e, TestException.class).isPresent()) {
+ if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) {
throw e;
}
} finally {
miniCluster.after();
}
- if (settings.generateCheckpoint) {
- return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir);
- }
return null;
}
@@ -748,8 +754,6 @@
env.getCheckpointConfig()
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
env.setParallelism(parallelism);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(
- env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L);
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
// for custom partitioner
env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
@@ -1128,6 +1132,7 @@
return value;
}
+ @ThrowableAnnotation(ThrowableType.NonRecoverableError)
static class TestException extends Exception {
public TestException(String s) {
super(s);