[FLINK-15917][runtime] Fix that the root exception is not shown in Web UI
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index c107174..9779dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -182,7 +182,8 @@
}
}
- private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) {
+ private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
+ setGlobalFailureCause(error);
final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
maybeRestartTasks(failureHandlingResult);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index d31e5e4..bd62c1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -94,6 +94,8 @@
import org.slf4j.Logger;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
@@ -348,8 +350,10 @@
.transitionState(ExecutionState.SCHEDULED));
}
- protected void setGlobalFailureCause(final Throwable cause) {
- getExecutionGraph().initFailureCause(cause);
+ protected void setGlobalFailureCause(@Nullable final Throwable cause) {
+ if (cause != null) {
+ getExecutionGraph().initFailureCause(cause);
+ }
}
protected ComponentMainThreadExecutor getMainThreadExecutor() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 75ff382..40126cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -34,6 +34,7 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
@@ -89,10 +90,12 @@
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -596,6 +599,23 @@
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
}
+ @Test
+ public void failureInfoIsSetAfterTaskFailure() {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+ final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+ final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
+
+ final String exceptionMessage = "expected exception";
+ scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, new RuntimeException(exceptionMessage)));
+
+ final ErrorInfo failureInfo = scheduler.requestJob().getFailureInfo();
+ assertThat(failureInfo, is(notNullValue()));
+ assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage));
+ }
+
private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {
final JobVertex v = new JobVertex(name);
v.setParallelism(parallelism);