[FLINK-14366][tests] Enable TaskFailureITCase to pass with NG scheduler
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
index b830508..525bd61 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
@@ -31,6 +31,9 @@
import java.util.List;
+import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests that both jobs, the failing and the working one, are handled correctly. The first (failing) job must be
* canceled and the client must report the failure. The second (working) job must finish successfully and compute the
@@ -51,13 +54,13 @@
Assert.fail();
}
// for collection execution, no restarts. So, exception should be appended with 0.
- Assert.assertEquals(EXCEPTION_STRING + ":0", e.getMessage());
+ assertTrue(findThrowableWithMessage(e, EXCEPTION_STRING + ":0").isPresent());
} catch (JobExecutionException e) { //expected for cluster execution
if (isCollectionExecution()) {
Assert.fail();
}
// for cluster execution, one restart. So, exception should be appended with 1.
- Assert.assertEquals(EXCEPTION_STRING + ":1", e.getCause().getMessage());
+ assertTrue(findThrowableWithMessage(e, EXCEPTION_STRING + ":1").isPresent());
}
//test correct version
executeTask(new TestMapper(), 0);
@@ -65,6 +68,7 @@
private void executeTask(MapFunction<Long, Long> mapper, int retries) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 0));
List<Long> result = env.generateSequence(1, 9)
.map(mapper)