[FLINK-14366][tests] Enable TaskFailureITCase to pass with NG scheduler
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
index b830508..525bd61 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
@@ -31,6 +31,9 @@
 
 import java.util.List;
 
+import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests that both jobs, the failing and the working one, are handled correctly. The first (failing) job must be
  * canceled and the client must report the failure. The second (working) job must finish successfully and compute the
@@ -51,13 +54,13 @@
 				Assert.fail();
 			}
 			// for collection execution, no restarts. So, exception should be appended with 0.
-			Assert.assertEquals(EXCEPTION_STRING + ":0", e.getMessage());
+			assertTrue(findThrowableWithMessage(e, EXCEPTION_STRING + ":0").isPresent());
 		} catch (JobExecutionException e) { //expected for cluster execution
 			if (isCollectionExecution()) {
 				Assert.fail();
 			}
 			// for cluster execution, one restart. So, exception should be appended with 1.
-			Assert.assertEquals(EXCEPTION_STRING + ":1", e.getCause().getMessage());
+			assertTrue(findThrowableWithMessage(e, EXCEPTION_STRING + ":1").isPresent());
 		}
 		//test correct version
 		executeTask(new TestMapper(), 0);
@@ -65,6 +68,7 @@
 
 	private void executeTask(MapFunction<Long, Long> mapper, int retries) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 0));
 		List<Long> result = env.generateSequence(1, 9)
 				.map(mapper)