[FLINK-15917][runtime] Fix that the root exception is not shown in Web UI
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index c107174..9779dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -182,7 +182,8 @@
 		}
 	}
 
-	private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) {
+	private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
+		setGlobalFailureCause(error);
 		final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
 		maybeRestartTasks(failureHandlingResult);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index d31e5e4..bd62c1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -94,6 +94,8 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -348,8 +350,10 @@
 			.transitionState(ExecutionState.SCHEDULED));
 	}
 
-	protected void setGlobalFailureCause(final Throwable cause) {
-		getExecutionGraph().initFailureCause(cause);
+	protected void setGlobalFailureCause(@Nullable final Throwable cause) {
+		if (cause != null) {
+			getExecutionGraph().initFailureCause(cause);
+		}
 	}
 
 	protected ComponentMainThreadExecutor getMainThreadExecutor() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 75ff382..40126cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
@@ -89,10 +90,12 @@
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -596,6 +599,23 @@
 		assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
 	}
 
+	@Test
+	public void failureInfoIsSetAfterTaskFailure() {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		final JobID jobId = jobGraph.getJobID();
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+		final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
+
+		final String exceptionMessage = "expected exception";
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, new RuntimeException(exceptionMessage)));
+
+		final ErrorInfo failureInfo = scheduler.requestJob().getFailureInfo();
+		assertThat(failureInfo, is(notNullValue()));
+		assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage));
+	}
+
 	private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {
 		final JobVertex v = new JobVertex(name);
 		v.setParallelism(parallelism);