Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons (#11446)

* Add error message; add unit tests for ForkingTaskRunner

* add tests

* fix comment

* unused import

* add exit code in error message

* fix test
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 2d08338..49cae23 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -343,14 +344,11 @@
                           jsonMapper.writeValue(taskFile, task);
                         }
 
-                        LOGGER.info("Running command: %s", getMaskedCommand(startupLoggingConfig.getMaskProperties(), command));
-                        taskWorkItem.processHolder = new ProcessHolder(
-                          new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
-                          logFile,
-                          taskLocation.getHost(),
-                          taskLocation.getPort(),
-                          taskLocation.getTlsPort()
+                        LOGGER.info(
+                            "Running command: %s",
+                            getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
                         );
+                        taskWorkItem.processHolder = runTaskProcess(command, logFile, taskLocation);
 
                         processHolder = taskWorkItem.processHolder;
                         processHolder.registerWithCloser(closer);
@@ -364,38 +362,23 @@
                       );
 
                       LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
-                      boolean runFailed = true;
-
-                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
-                      // This will block for a while. So we append the thread information with more details
-                      final String priorThreadName = Thread.currentThread().getName();
-                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
-                      try (final OutputStream toLogfile = logSink.openStream()) {
-                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                        final int statusCode = processHolder.process.waitFor();
-                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                        if (statusCode == 0) {
-                          runFailed = false;
-                        }
-                      }
-                      finally {
-                        Thread.currentThread().setName(priorThreadName);
-                        // Upload task logs
-                        taskLogPusher.pushTaskLog(task.getId(), logFile);
-                        if (reportsFile.exists()) {
-                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                        }
-                      }
-
-                      TaskStatus status;
-                      if (!runFailed) {
+                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+                      final TaskStatus status;
+                      if (exitCode == 0) {
+                        LOGGER.info("Process exited successfully for task: %s", task.getId());
                         // Process exited successfully
                         status = jsonMapper.readValue(statusFile, TaskStatus.class);
                       } else {
+                        LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
                         // Process exited unsuccessfully
-                        status = TaskStatus.failure(task.getId());
+                        status = TaskStatus.failure(
+                            task.getId(),
+                            StringUtils.format(
+                                "Task execution process exited unsuccessfully with code[%s]. "
+                                + "See middleManager logs for more details.",
+                                exitCode
+                            )
+                        );
                       }
 
                       TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
@@ -417,7 +400,7 @@
                       synchronized (tasks) {
                         final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
                         if (taskWorkItem != null && taskWorkItem.processHolder != null) {
-                          taskWorkItem.processHolder.process.destroy();
+                          taskWorkItem.processHolder.shutdown();
                         }
                         if (!stopping) {
                           saveRunningTasks();
@@ -458,6 +441,42 @@
     }
   }
 
+  @VisibleForTesting
+  ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
+  {
+    return new ProcessHolder(
+        new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
+        logFile,
+        taskLocation.getHost(),
+        taskLocation.getPort(),
+        taskLocation.getTlsPort()
+    );
+  }
+
+  @VisibleForTesting
+  int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      throws IOException, InterruptedException
+  {
+    final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
+
+    // This will block for a while. So we append the thread information with more details
+    final String priorThreadName = Thread.currentThread().getName();
+    Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
+
+    try (final OutputStream toLogfile = logSink.openStream()) {
+      ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
+      return processHolder.process.waitFor();
+    }
+    finally {
+      Thread.currentThread().setName(priorThreadName);
+      // Upload task logs
+      taskLogPusher.pushTaskLog(task.getId(), logFile);
+      if (reportsFile.exists()) {
+        taskLogPusher.pushTaskReports(task.getId(), reportsFile);
+      }
+    }
+  }
+
   @Override
   @LifecycleStop
   public void stop()
@@ -726,7 +745,8 @@
     }
   }
 
-  private static class ProcessHolder
+  @VisibleForTesting
+  static class ProcessHolder
   {
     private final Process process;
     private final File logFile;
@@ -743,11 +763,18 @@
       this.tlsPort = tlsPort;
     }
 
-    private void registerWithCloser(Closer closer)
+    @VisibleForTesting
+    void registerWithCloser(Closer closer)
     {
       closer.register(process.getInputStream());
       closer.register(process.getOutputStream());
     }
+
+    @VisibleForTesting
+    void shutdown()
+    {
+      process.destroy();
+    }
   }
 }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index cfbbab4..24dba4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -208,10 +208,31 @@
              .emit();
           log.warn(e, "Graceful shutdown of task[%s] aborted with exception.", task.getId());
           error = true;
-          TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
+          // Creating a new status to only feed listeners seems quite strange.
+          // This is currently OK because we have no listeners yet registered in peon.
+          // However, we should fix this in the near future by always retrieving task status
+          // from one single source of truth that is also propagated to the overlord.
+          // See https://github.com/apache/druid/issues/11445.
+          TaskRunnerUtils.notifyStatusChanged(
+              listeners,
+              task.getId(),
+              TaskStatus.failure(
+                  task.getId(),
+                  "Failed to stop gracefully with exception. See task logs for more details."
+              )
+          );
         }
       } else {
-        TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
+        // Creating a new status to only feed listeners seems quite strange.
+        // This is currently OK because we have no listeners yet registered in peon.
+        // However, we should fix this in the near future by always retrieving task status
+        // from one single source of truth that is also propagated to the overlord.
+        // See https://github.com/apache/druid/issues/11445.
+        TaskRunnerUtils.notifyStatusChanged(
+            listeners,
+            task.getId(),
+            TaskStatus.failure(task.getId(), "Canceled as task execution process stopped")
+        );
       }
 
       elapsed = System.currentTimeMillis() - start;
@@ -417,7 +438,6 @@
     {
       return task.getDataSource();
     }
-
   }
 
   private class SingleTaskBackgroundRunnerCallable implements Callable<TaskStatus>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index e463d83..84a414c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -197,7 +197,7 @@
                             Thread.currentThread()
                                   .setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName));
 
-                            TaskStatus taskStatus = null;
+                            TaskStatus taskStatus;
                             final TaskToolbox toolbox = toolboxFactory.build(task);
                             TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
                             TaskRunnerUtils.notifyStatusChanged(
@@ -212,12 +212,13 @@
                             }
                             catch (Throwable t) {
                               LOGGER.error(t, "Exception caught while running the task.");
+                              taskStatus = TaskStatus.failure(
+                                  task.getId(),
+                                  "Failed with an exception. See indexer logs for more details."
+                              );
                             }
                             finally {
                               taskWorkItem.setState(RunnerTaskState.NONE);
-                              if (taskStatus == null) {
-                                taskStatus = TaskStatus.failure(task.getId());
-                              }
                               Thread.currentThread().setName(priorThreadName);
                               if (reportsFile.exists()) {
                                 taskLogPusher.pushTaskReports(task.getId(), reportsFile);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 19598d0..d0f2a3b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -251,7 +251,15 @@
           @Override
           public void onFailure(Throwable t)
           {
-            submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
+            submitNoticeToExec(
+                new StatusNotice(
+                    task,
+                    TaskStatus.failure(
+                        task.getId(),
+                        "Failed to run task with an exception. See middleManager or indexer logs for more details."
+                    )
+                )
+            );
           }
         }
     );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
index 23b1968..27fb31e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
@@ -127,7 +127,10 @@
           if (completedAnnouncement != null) {
             completionStatus = completedAnnouncement.getTaskStatus();
           } else if (!runningTasks.containsKey(announcement.getTaskStatus().getId())) {
-            completionStatus = TaskStatus.failure(announcement.getTaskStatus().getId());
+            completionStatus = TaskStatus.failure(
+                announcement.getTaskStatus().getId(),
+                "Canceled as unknown task. See middleManager or indexer logs for more details."
+            );
           }
 
           if (completionStatus != null) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index b766bc7..06629b1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -19,18 +19,35 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.assertj.core.util.Lists;
+import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 public class ForkingTaskRunnerTest
 {
@@ -145,12 +162,192 @@
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
+        status.getErrorMsg()
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessSucceedsTaskSucceeds() throws ExecutionException, InterruptedException
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    Task task = NoopTask.create();
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        mapper,
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+
+        for (String param : command) {
+          if (param.endsWith("status.json")) {
+            mapper.writeValue(new File(param), TaskStatus.success(task.getId()));
+            break;
+          }
+        }
+
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        return 0;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(task).get();
+    Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
+    Assert.assertNull(status.getErrorMsg());
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessSucceedsTaskFails() throws ExecutionException, InterruptedException
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    Task task = NoopTask.create();
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        mapper,
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+
+        for (String param : command) {
+          if (param.endsWith("status.json")) {
+            mapper.writeValue(new File(param), TaskStatus.failure(task.getId(), "task failure test"));
+            break;
+          }
+        }
+
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        return 0;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(task).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals("task failure test", status.getErrorMsg());
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index f1ea2a0..7b12c0d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -21,6 +21,7 @@
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -35,6 +36,8 @@
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryRunner;
@@ -64,9 +67,11 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class SingleTaskBackgroundRunnerTest
 {
@@ -94,6 +99,7 @@
         false
     );
     final ServiceEmitter emitter = new NoopServiceEmitter();
+    EmittingLogger.registerEmitter(emitter);
     final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
         taskConfig,
         null,
@@ -199,6 +205,111 @@
     Assert.assertTrue(holder.get());
   }
 
+  @Test
+  public void testStopRestorableTaskExceptionAfterStop()
+  {
+    // statusChanged callback can be called by multiple threads.
+    AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
+    runner.registerListener(
+        new TaskRunnerListener()
+        {
+          @Override
+          public String getListenerId()
+          {
+            return "testStopRestorableTaskExceptionAfterStop";
+          }
+
+          @Override
+          public void locationChanged(String taskId, TaskLocation newLocation)
+          {
+            // do nothing
+          }
+
+          @Override
+          public void statusChanged(String taskId, TaskStatus status)
+          {
+            statusHolder.set(status);
+          }
+        },
+        Execs.directExecutor()
+    );
+    runner.run(
+        new RestorableTask(new BooleanHolder())
+        {
+          @Override
+          public TaskStatus run(TaskToolbox toolbox)
+          {
+            throw new Error("task failure test");
+          }
+        }
+    );
+    runner.stop();
+    Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
+    Assert.assertEquals(
+        "Failed to stop gracefully with exception. See task logs for more details.",
+        statusHolder.get().getErrorMsg()
+    );
+  }
+
+  @Test
+  public void testStopNonRestorableTask() throws InterruptedException
+  {
+    // latch to wait for SingleTaskBackgroundRunnerCallable to be executed before stopping the task
+    // We need this latch because TaskRunnerListener is currently racy.
+    // See https://github.com/apache/druid/issues/11445 for more details.
+    CountDownLatch runLatch = new CountDownLatch(1);
+    // statusChanged callback can be called by multiple threads.
+    AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
+    runner.registerListener(
+        new TaskRunnerListener()
+        {
+          @Override
+          public String getListenerId()
+          {
+            return "testStopNonRestorableTask";
+          }
+
+          @Override
+          public void locationChanged(String taskId, TaskLocation newLocation)
+          {
+            // do nothing
+          }
+
+          @Override
+          public void statusChanged(String taskId, TaskStatus status)
+          {
+            if (status.getStatusCode() == TaskState.RUNNING) {
+              runLatch.countDown();
+            } else {
+              statusHolder.set(status);
+            }
+          }
+        },
+        Execs.directExecutor()
+    );
+    runner.run(
+        new NoopTask(
+            null,
+            null,
+            "datasource",
+            10000, // 10 sec
+            0,
+            null,
+            null,
+            null
+        )
+    );
+
+    Assert.assertTrue(runLatch.await(1, TimeUnit.SECONDS));
+    runner.stop();
+
+    Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
+    Assert.assertEquals(
+        "Canceled as task execution process stopped",
+        statusHolder.get().getErrorMsg()
+    );
+  }
+
   private static class RestorableTask extends AbstractTask
   {
     private final BooleanHolder gracefullyStopped;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
new file mode 100644
index 0000000..d8ddd51
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class ThreadingTaskRunnerTest
+{
+
+  @Test
+  public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws ExecutionException, InterruptedException
+  {
+    ThreadingTaskRunner runner = new ThreadingTaskRunner(
+        mockTaskToolboxFactory(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new TestAppenderatorsManager(),
+        new MultipleFileTaskReportFileWriter(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false)
+    );
+
+    Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id", "datasource", null)
+    {
+      @Override
+      public String getType()
+      {
+        return "test";
+      }
+
+      @Override
+      public boolean isReady(TaskActionClient taskActionClient)
+      {
+        return true;
+      }
+
+      @Override
+      public void stopGracefully(TaskConfig taskConfig)
+      {
+      }
+
+      @Override
+      public TaskStatus run(TaskToolbox toolbox)
+      {
+        throw new RuntimeException("Task failure test");
+      }
+    });
+
+    TaskStatus status = statusFuture.get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Failed with an exception. See indexer logs for more details.",
+        status.getErrorMsg()
+    );
+  }
+
+  private static TaskToolboxFactory mockTaskToolboxFactory()
+  {
+    TaskToolboxFactory factory = Mockito.mock(TaskToolboxFactory.class);
+    Mockito.when(factory.build(ArgumentMatchers.any())).thenReturn(Mockito.mock(TaskToolbox.class));
+    return factory;
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index b7a489f..389d380 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -25,8 +25,10 @@
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestTasks;
 import org.apache.druid.indexing.common.TestUtils;
@@ -55,6 +57,7 @@
 import org.junit.Test;
 
 import java.io.File;
+import java.util.Map;
 
 /**
  */
@@ -263,6 +266,35 @@
     Assert.assertNotNull(update4.getTaskAnnouncement().getTaskLocation().getHost());
   }
 
+  @Test(timeout = 30_000L)
+  public void testTaskStatusWhenTaskRunnerFutureThrowsException() throws Exception
+  {
+    Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
+    {
+      @Override
+      public TaskStatus run(TaskToolbox toolbox)
+      {
+        throw new Error("task failure test");
+      }
+    };
+    workerTaskManager.start();
+    workerTaskManager.assignTask(task);
+
+    Map<String, TaskAnnouncement> completeTasks;
+    do {
+      completeTasks = workerTaskManager.getCompletedTasks();
+    } while (completeTasks.isEmpty());
+
+    Assert.assertEquals(1, completeTasks.size());
+    TaskAnnouncement announcement = completeTasks.get(task.getId());
+    Assert.assertNotNull(announcement);
+    Assert.assertEquals(TaskState.FAILED, announcement.getStatus());
+    Assert.assertEquals(
+        "Failed to run task with an exception. See middleManager or indexer logs for more details.",
+        announcement.getTaskStatus().getErrorMsg()
+    );
+  }
+
   private NoopTask createNoopTask(String id)
   {
     return new NoopTask(id, null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index dbc44f0..5a02700 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -361,6 +361,10 @@
     Assert.assertEquals(1, announcements.size());
     Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
     Assert.assertEquals(TaskState.FAILED, announcements.get(0).getTaskStatus().getStatusCode());
+    Assert.assertEquals(
+        "Canceled as unknown task. See middleManager or indexer logs for more details.",
+        announcements.get(0).getTaskStatus().getErrorMsg()
+    );
   }
 
   @Test(timeout = 60_000L)