Add the error message in taskStatus for task failures in overlord (#11419)

* add error messages in taskStatus for task failures in overlord

* unused imports

* add helper message for logs to look up

* fix tests

* fix counting the same task failures more than once

* same fix for HttpRemoteTaskRunner
diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
index 6bef27e..00d072c 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -47,11 +47,21 @@
     return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null);
   }
 
+  /**
+   * The succeeded task status should not have error messages.
+   * Use {@link #success(String)} instead.
+   */
+  @Deprecated
   public static TaskStatus success(String taskId, String errorMsg)
   {
     return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null);
   }
 
+  /**
+   * All failed task status must have a non-null error message.
+   * Use {@link #failure(String, String)} instead.
+   */
+  @Deprecated
   public static TaskStatus failure(String taskId)
   {
     return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
@@ -62,6 +72,11 @@
     return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
   }
 
+  /**
+   * This method is deprecated because it does not handle the error message of failed task status properly.
+   * Use {@link #success(String)} or {@link #failure(String, String)} instead.
+   */
+  @Deprecated
   public static TaskStatus fromCode(String taskId, TaskState code)
   {
     return new TaskStatus(taskId, code, -1, null, null);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 6fd3f7f..de0713b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -52,6 +52,7 @@
 import org.apache.druid.curator.cache.PathChildrenCacheFactory;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
@@ -108,6 +109,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes.
@@ -540,7 +542,9 @@
     } else if ((completeTask = completeTasks.get(task.getId())) != null) {
       return completeTask.getResult();
     } else {
-      return addPendingTask(task).getResult();
+      RemoteTaskRunnerWorkItem workItem = addPendingTask(task);
+      runPendingTasks();
+      return workItem.getResult();
     }
   }
 
@@ -681,7 +685,8 @@
   }
 
   /**
-   * Adds a task to the pending queue
+   * Adds a task to the pending queue.
+   * {@link #runPendingTasks()} should be called to run the pending task.
    */
   @VisibleForTesting
   RemoteTaskRunnerWorkItem addPendingTask(final Task task)
@@ -696,7 +701,6 @@
     );
     pendingTaskPayloads.put(task.getId(), task);
     pendingTasks.put(task.getId(), taskRunnerWorkItem);
-    runPendingTasks();
     return taskRunnerWorkItem;
   }
 
@@ -705,7 +709,8 @@
    * are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
    * This method should be run each time there is new worker capacity or if new tasks are assigned.
    */
-  private void runPendingTasks()
+  @VisibleForTesting
+  void runPendingTasks()
   {
     runPendingTasksExec.submit(
         (Callable<Void>) () -> {
@@ -716,30 +721,7 @@
             sortByInsertionTime(copy);
 
             for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
-              String taskId = taskRunnerWorkItem.getTaskId();
-              if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
-                try {
-                  //this can still be null due to race from explicit task shutdown request
-                  //or if another thread steals and completes this task right after this thread makes copy
-                  //of pending tasks. See https://github.com/apache/druid/issues/2842 .
-                  Task task = pendingTaskPayloads.get(taskId);
-                  if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
-                    pendingTaskPayloads.remove(taskId);
-                  }
-                }
-                catch (Exception e) {
-                  log.makeAlert(e, "Exception while trying to assign task")
-                     .addData("taskId", taskRunnerWorkItem.getTaskId())
-                     .emit();
-                  RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
-                  if (workItem != null) {
-                    taskComplete(workItem, null, TaskStatus.failure(taskId));
-                  }
-                }
-                finally {
-                  tryAssignTasks.remove(taskId);
-                }
-              }
+              runPendingTask(taskRunnerWorkItem);
             }
           }
           catch (Exception e) {
@@ -751,6 +733,45 @@
     );
   }
 
+  /**
+   * Run one pending task. This method must be called in the same class except for unit tests.
+   */
+  @VisibleForTesting
+  void runPendingTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem)
+  {
+    String taskId = taskRunnerWorkItem.getTaskId();
+    if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
+      try {
+        //this can still be null due to race from explicit task shutdown request
+        //or if another thread steals and completes this task right after this thread makes copy
+        //of pending tasks. See https://github.com/apache/druid/issues/2842 .
+        Task task = pendingTaskPayloads.get(taskId);
+        if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
+          pendingTaskPayloads.remove(taskId);
+        }
+      }
+      catch (Exception e) {
+        log.makeAlert(e, "Exception while trying to assign task")
+           .addData("taskId", taskRunnerWorkItem.getTaskId())
+           .emit();
+        RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
+        if (workItem != null) {
+          taskComplete(
+              workItem,
+              null,
+              TaskStatus.failure(
+                  taskId,
+                  StringUtils.format("Failed to assign this task. See overlord logs for more details.")
+              )
+          );
+        }
+      }
+      finally {
+        tryAssignTasks.remove(taskId);
+      }
+    }
+  }
+
   @VisibleForTesting
   static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> tasks)
   {
@@ -930,7 +951,18 @@
               elapsed,
               config.getTaskAssignmentTimeout()
           ).emit();
-          taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
+          taskComplete(
+              taskRunnerWorkItem,
+              theZkWorker,
+              TaskStatus.failure(
+                  task.getId(),
+                  StringUtils.format(
+                      "The worker that this task is assigned did not start it in timeout[%s]. "
+                      + "See overlord logs for more details.",
+                      config.getTaskAssignmentTimeout()
+                  )
+              )
+          );
           break;
         }
       }
@@ -1066,9 +1098,13 @@
               taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
               taskRunnerWorkItem = runningTasks.remove(taskId);
               if (taskRunnerWorkItem != null) {
-                log.info("Task[%s] just disappeared!", taskId);
-                taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
-                TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+                log.warn("Task[%s] just disappeared!", taskId);
+                final TaskStatus taskStatus = TaskStatus.failure(
+                    taskId,
+                    "The worker that this task was assigned disappeared. See overlord logs for more details."
+                );
+                taskRunnerWorkItem.setResult(taskStatus);
+                TaskRunnerUtils.notifyStatusChanged(listeners, taskId, taskStatus);
               } else {
                 log.info("Task[%s] went bye bye.", taskId);
               }
@@ -1189,8 +1225,12 @@
               log.info("Failing task[%s]", assignedTask);
               RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
               if (taskRunnerWorkItem != null) {
-                taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
-                TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
+                final TaskStatus taskStatus = TaskStatus.failure(
+                    assignedTask,
+                    StringUtils.format("Canceled for worker cleanup. See overlord logs for more details.")
+                );
+                taskRunnerWorkItem.setResult(taskStatus);
+                TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, taskStatus);
               } else {
                 log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
               }
@@ -1235,7 +1275,7 @@
 
   private void taskComplete(
       RemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      ZkWorker zkWorker,
+      @Nullable ZkWorker zkWorker,
       TaskStatus taskStatus
   )
   {
@@ -1255,41 +1295,76 @@
     }
 
     // Move from running -> complete
-    completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
-    runningTasks.remove(taskStatus.getId());
+    // If the task was running and this is the first complete event,
+    // previousComplete should be null and removedRunning should not.
+    final RemoteTaskRunnerWorkItem previousComplete = completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
+    final RemoteTaskRunnerWorkItem removedRunning = runningTasks.remove(taskStatus.getId());
 
-    // Update success/failure counters
-    if (zkWorker != null) {
-      if (taskStatus.isSuccess()) {
-        zkWorker.resetContinuouslyFailedTasksCount();
-        if (blackListedWorkers.remove(zkWorker)) {
-          zkWorker.setBlacklistedUntil(null);
-          log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
+    if (previousComplete != null && removedRunning != null) {
+      log.warn(
+          "This is not the first complete event for task[%s], but it was still known as running. "
+          + "Ignoring the previously known running status.",
+          taskStatus.getId()
+      );
+    }
+
+    if (previousComplete != null) {
+      // This is not the first complete event for the same task.
+      try {
+        // getResult().get() must return immediately.
+        TaskState lastKnownState = previousComplete.getResult().get(1, TimeUnit.MILLISECONDS).getStatusCode();
+        if (taskStatus.getStatusCode() != lastKnownState) {
+          log.warn(
+              "The state of the new task complete event is different from its last known state. "
+              + "New state[%s], last known state[%s]",
+              taskStatus.getStatusCode(),
+              lastKnownState
+          );
         }
-      } else if (taskStatus.isFailure()) {
-        zkWorker.incrementContinuouslyFailedTasksCount();
       }
+      catch (InterruptedException e) {
+        log.warn(e, "Interrupted while getting the last known task status.");
+        Thread.currentThread().interrupt();
+      }
+      catch (ExecutionException | TimeoutException e) {
+        // This case should not really happen.
+        log.warn(e, "Failed to get the last known task status. Ignoring this failure.");
+      }
+    } else {
+      // This is the first complete event for this task.
+      // Update success/failure counters
+      if (zkWorker != null) {
+        if (taskStatus.isSuccess()) {
+          zkWorker.resetContinuouslyFailedTasksCount();
+          if (blackListedWorkers.remove(zkWorker)) {
+            zkWorker.setBlacklistedUntil(null);
+            log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
+          }
+        } else if (taskStatus.isFailure()) {
+          zkWorker.incrementContinuouslyFailedTasksCount();
+        }
 
-      // Blacklist node if there are too many failures.
-      synchronized (blackListedWorkers) {
-        if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
-            blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
-          zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
-          if (blackListedWorkers.add(zkWorker)) {
-            log.info(
-                "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
-                zkWorker.getWorker(),
-                zkWorker.getBlacklistedUntil(),
-                zkWorker.getContinuouslyFailedTasksCount()
-            );
+        // Blacklist node if there are too many failures.
+        synchronized (blackListedWorkers) {
+          if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
+              blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
+            zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
+            if (blackListedWorkers.add(zkWorker)) {
+              log.info(
+                  "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
+                  zkWorker.getWorker(),
+                  zkWorker.getBlacklistedUntil(),
+                  zkWorker.getContinuouslyFailedTasksCount()
+              );
+            }
           }
         }
       }
-    }
 
-    // Notify interested parties
-    taskRunnerWorkItem.setResult(taskStatus);
-    TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
+      // Notify interested parties
+      taskRunnerWorkItem.setResult(taskStatus);
+      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
+    }
   }
 
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index d341663..d9e582c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -288,7 +288,9 @@
           }
           catch (Exception e) {
             log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
+            final String errorMessage = "Failed while waiting for the task to be ready to run. "
+                                        + "See overlord logs for more details.";
+            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
             continue;
           }
           if (taskIsReady) {
@@ -412,7 +414,7 @@
       Preconditions.checkNotNull(taskId, "taskId");
       for (final Task task : tasks) {
         if (task.getId().equals(taskId)) {
-          notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args);
+          notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
           break;
         }
       }
@@ -556,7 +558,9 @@
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            handleStatus(TaskStatus.failure(task.getId()));
+            handleStatus(
+                TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.")
+            );
           }
 
           private void handleStatus(final TaskStatus status)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 623458f..59085a2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -47,6 +47,7 @@
 import org.apache.druid.discovery.WorkerNodeService;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@@ -68,6 +69,7 @@
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -424,7 +426,18 @@
             config.getTaskAssignmentTimeout()
         ).emit();
         // taskComplete(..) must be called outside of statusLock, see comments on method.
-        taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
+        taskComplete(
+            workItem,
+            workerHolder,
+            TaskStatus.failure(
+                taskId,
+                StringUtils.format(
+                    "The worker that this task is assigned did not start it in timeout[%s]. "
+                    + "See overlord and middleManager/indexer logs for more details.",
+                    config.getTaskAssignmentTimeout()
+                )
+            )
+        );
       }
 
       return true;
@@ -456,13 +469,36 @@
       workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
     }
 
-    // Notify interested parties
-    taskRunnerWorkItem.setResult(taskStatus);
-    TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
+    if (taskRunnerWorkItem.getResult().isDone()) {
+      // This is not the first complete event.
+      try {
+        TaskState lastKnownState = taskRunnerWorkItem.getResult().get().getStatusCode();
+        if (taskStatus.getStatusCode() != lastKnownState) {
+          log.warn(
+              "The state of the new task complete event is different from its last known state. "
+              + "New state[%s], last known state[%s]",
+              taskStatus.getStatusCode(),
+              lastKnownState
+          );
+        }
+      }
+      catch (InterruptedException e) {
+        log.warn(e, "Interrupted while getting the last known task status.");
+        Thread.currentThread().interrupt();
+      }
+      catch (ExecutionException e) {
+        // This case should not really happen.
+        log.warn(e, "Failed to get the last known task status. Ignoring this failure.");
+      }
+    } else {
+      // Notify interested parties
+      taskRunnerWorkItem.setResult(taskStatus);
+      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
 
-    // Update success/failure counters, Blacklist node if there are too many failures.
-    if (workerHolder != null) {
-      blacklistWorkerIfNeeded(taskStatus, workerHolder);
+      // Update success/failure counters, Blacklist node if there are too many failures.
+      if (workerHolder != null) {
+        blacklistWorkerIfNeeded(taskStatus, workerHolder);
+      }
     }
 
     synchronized (statusLock) {
@@ -647,14 +683,26 @@
 
             for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
               if (!taskItem.getResult().isDone()) {
-                log.info(
+                log.warn(
                     "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
                     workerHostAndPort,
                     taskItem.getTaskId(),
                     config.getTaskCleanupTimeout()
                 );
                 // taskComplete(..) must be called outside of statusLock, see comments on method.
-                taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
+                taskComplete(
+                    taskItem,
+                    null,
+                    TaskStatus.failure(
+                        taskItem.getTaskId(),
+                        StringUtils.format(
+                            "The worker that this task was assigned disappeared and "
+                            + "did not report cleanup within timeout[%s]. "
+                            + "See overlord and middleManager/indexer logs for more details.",
+                            config.getTaskCleanupTimeout()
+                        )
+                    )
+                );
               }
             }
           }
@@ -1179,7 +1227,15 @@
         if (taskItem.getTask() == null) {
           log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit();
           // taskComplete(..) must be called outside of statusLock, see comments on method.
-          taskComplete(taskItem, null, TaskStatus.failure(taskId));
+          taskComplete(
+              taskItem,
+              null,
+              TaskStatus.failure(
+                  taskId,
+                  "No payload found for this task. "
+                  + "See overlord logs and middleManager/indexer logs for more details."
+              )
+          );
           continue;
         }
 
@@ -1205,7 +1261,11 @@
              .emit();
 
           // taskComplete(..) must be called outside of statusLock, see comments on method.
-          taskComplete(taskItem, null, TaskStatus.failure(taskId));
+          taskComplete(
+              taskItem,
+              null,
+              TaskStatus.failure(taskId, "Failed to assign this task. See overlord logs for more details.")
+          );
         }
         finally {
           synchronized (statusLock) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index 630b0f0..ff9aab1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -390,14 +390,20 @@
                 announcement.getStatus(),
                 worker.getHost()
             );
-            delta.add(TaskAnnouncement.create(
-                announcement.getTaskId(),
-                announcement.getTaskType(),
-                announcement.getTaskResource(),
-                TaskStatus.failure(announcement.getTaskId()),
-                announcement.getTaskLocation(),
-                announcement.getTaskDataSource()
-            ));
+            delta.add(
+                TaskAnnouncement.create(
+                    announcement.getTaskId(),
+                    announcement.getTaskType(),
+                    announcement.getTaskResource(),
+                    TaskStatus.failure(
+                        announcement.getTaskId(),
+                        "This task disappeared on the worker where it was assigned. "
+                        + "See overlord logs for more details."
+                    ),
+                    announcement.getTaskLocation(),
+                    announcement.getTaskDataSource()
+                )
+            );
           }
         }
 
@@ -427,14 +433,20 @@
                   announcement.getStatus(),
                   worker.getHost()
               );
-              delta.add(TaskAnnouncement.create(
-                  announcement.getTaskId(),
-                  announcement.getTaskType(),
-                  announcement.getTaskResource(),
-                  TaskStatus.failure(announcement.getTaskId()),
-                  announcement.getTaskLocation(),
-                  announcement.getTaskDataSource()
-              ));
+              delta.add(
+                  TaskAnnouncement.create(
+                      announcement.getTaskId(),
+                      announcement.getTaskType(),
+                      announcement.getTaskResource(),
+                      TaskStatus.failure(
+                          announcement.getTaskId(),
+                          "This task disappeared on the worker where it was assigned. "
+                          + "See overlord logs for more details."
+                      ),
+                      announcement.getTaskLocation(),
+                      announcement.getTaskDataSource()
+                  )
+              );
             }
           } else if (change instanceof WorkerHistoryItem.Metadata) {
             isWorkerDisabled = ((WorkerHistoryItem.Metadata) change).isDisabled();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index e7bf022..589c6e1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -44,6 +44,7 @@
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.testing.DeadlockDetectingTimeout;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -54,6 +55,7 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -88,6 +90,7 @@
     cf = rtrTestUtils.getCuratorFramework();
 
     task = TestTasks.unending("task id with spaces");
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
 
   @After
@@ -131,6 +134,7 @@
   {
     doSetup();
     remoteTaskRunner.addPendingTask(task);
+    remoteTaskRunner.runPendingTasks();
     Assert.assertFalse(workerRunningTask(task.getId()));
 
     ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
@@ -352,6 +356,8 @@
     TaskStatus status = future.get();
 
     Assert.assertEquals(status.getStatusCode(), TaskState.FAILED);
+    Assert.assertNotNull(status.getErrorMsg());
+    Assert.assertTrue(status.getErrorMsg().contains("The worker that this task was assigned disappeared"));
   }
 
   @Test
@@ -446,6 +452,8 @@
     TaskStatus status = future.get();
 
     Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertNotNull(status.getErrorMsg());
+    Assert.assertTrue(status.getErrorMsg().contains("Canceled for worker cleanup"));
     RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig();
     Assert.assertTrue(
         TestUtils.conditionValid(
@@ -517,6 +525,38 @@
     Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
   }
 
+  @Test
+  public void testRunPendingTaskFailToAssignTask() throws Exception
+  {
+    doSetup();
+    RemoteTaskRunnerWorkItem originalItem = remoteTaskRunner.addPendingTask(task);
+    // modify taskId to make task assignment failed
+    RemoteTaskRunnerWorkItem wankyItem = Mockito.mock(RemoteTaskRunnerWorkItem.class);
+    Mockito.when(wankyItem.getTaskId()).thenReturn(originalItem.getTaskId()).thenReturn("wrongId");
+    remoteTaskRunner.runPendingTask(wankyItem);
+    TaskStatus taskStatus = originalItem.getResult().get(0, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
+    Assert.assertEquals(
+        "Failed to assign this task. See overlord logs for more details.",
+        taskStatus.getErrorMsg()
+    );
+  }
+
+  @Test
+  public void testRunPendingTaskTimeoutToAssign() throws Exception
+  {
+    makeWorker();
+    makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
+    RemoteTaskRunnerWorkItem workItem = remoteTaskRunner.addPendingTask(task);
+    remoteTaskRunner.runPendingTask(workItem);
+    TaskStatus taskStatus = workItem.getResult().get(0, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
+    Assert.assertNotNull(taskStatus.getErrorMsg());
+    Assert.assertTrue(
+        taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
+    );
+  }
+
   private void doSetup() throws Exception
   {
     makeWorker();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 2dbd9d1..0b3d872 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -24,6 +24,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -41,6 +42,7 @@
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.metadata.EntryExistsException;
@@ -146,8 +148,14 @@
     Assert.assertEquals(task.interval, locksForTask.get(0).getInterval());
 
     // Verify that locks are removed on calling shutdown
-    taskQueue.shutdown(task.getId(), "Shutdown Task");
+    taskQueue.shutdown(task.getId(), "Shutdown Task test");
     Assert.assertTrue(getLockbox().findLocksForTask(task).isEmpty());
+
+    Optional<TaskStatus> statusOptional = getTaskStorage().getStatus(task.getId());
+    Assert.assertTrue(statusOptional.isPresent());
+    Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
+    Assert.assertNotNull(statusOptional.get().getErrorMsg());
+    Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg());
   }
 
   @Test
@@ -305,6 +313,42 @@
     Assert.assertFalse(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
   }
 
+  @Test
+  public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException
+  {
+    final TaskActionClientFactory actionClientFactory = createActionClientFactory();
+    final TaskQueue taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, null, null, null),
+        new DefaultTaskConfig(),
+        getTaskStorage(),
+        new SimpleTaskRunner(actionClientFactory),
+        actionClientFactory,
+        getLockbox(),
+        new NoopServiceEmitter()
+    );
+    taskQueue.setActive(true);
+    final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"))
+    {
+      @Override
+      public boolean isReady(TaskActionClient taskActionClient)
+      {
+        throw new RuntimeException("isReady failure test");
+      }
+    };
+    taskQueue.add(task);
+    taskQueue.manageInternal();
+
+    Optional<TaskStatus> statusOptional = getTaskStorage().getStatus(task.getId());
+    Assert.assertTrue(statusOptional.isPresent());
+    Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
+    Assert.assertNotNull(statusOptional.get().getErrorMsg());
+    Assert.assertTrue(
+        StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()),
+        statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run")
+    );
+  }
+
   private static class TestTask extends AbstractBatchIndexTask
   {
     private final Interval interval;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 2ca1dff..aa3f14e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -49,16 +49,20 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.easymock.EasyMock;
 import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.joda.time.Period;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -79,6 +83,12 @@
  */
 public class HttpRemoteTaskRunnerTest
 {
+  @Before
+  public void setup()
+  {
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+  }
+
   /*
   Simulates startup of Overlord and Workers being discovered with no previously known tasks. Fresh tasks are given
   and expected to be completed.
@@ -733,6 +743,18 @@
 
     Assert.assertTrue(future1.get().isFailure());
     Assert.assertTrue(future2.get().isFailure());
+    Assert.assertNotNull(future1.get().getErrorMsg());
+    Assert.assertNotNull(future2.get().getErrorMsg());
+    Assert.assertTrue(
+        future1.get().getErrorMsg().startsWith(
+            "The worker that this task was assigned disappeared and did not report cleanup within timeout"
+        )
+    );
+    Assert.assertTrue(
+        future2.get().getErrorMsg().startsWith(
+            "The worker that this task was assigned disappeared and did not report cleanup within timeout"
+        )
+    );
 
     AtomicInteger ticks = new AtomicInteger();
     Set<String> actualShutdowns = new ConcurrentHashSet<>();
@@ -1254,6 +1276,230 @@
     );
   }
 
+  @Test
+  public void testTimeoutInAssigningTasks() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 1;
+          }
+
+          @Override
+          public Period getTaskAssignmentTimeout()
+          {
+            return new Period("PT1S");
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return new WorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of()
+        )
+        {
+          @Override
+          public void start()
+          {
+            disabled.set(false);
+          }
+
+          @Override
+          public void stop()
+          {
+          }
+
+          @Override
+          public boolean isInitialized()
+          {
+            return true;
+          }
+
+          @Override
+          public void waitForInitialization()
+          {
+          }
+
+          @Override
+          public boolean assignTask(Task task)
+          {
+            // Always returns true
+            return true;
+          }
+
+          @Override
+          public void shutdownTask(String taskId)
+          {
+          }
+        };
+      }
+    };
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
+
+    Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
+    Assert.assertTrue(future.get().isFailure());
+    Assert.assertNotNull(future.get().getErrorMsg());
+    Assert.assertTrue(
+        future.get().getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
+    );
+  }
+
+  @Test
+  public void testExceptionThrownInAssigningTasks() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 1;
+          }
+
+          @Override
+          public Period getTaskAssignmentTimeout()
+          {
+            return new Period("PT1S");
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return new WorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of()
+        )
+        {
+          @Override
+          public void start()
+          {
+            disabled.set(false);
+          }
+
+          @Override
+          public void stop()
+          {
+          }
+
+          @Override
+          public boolean isInitialized()
+          {
+            return true;
+          }
+
+          @Override
+          public void waitForInitialization()
+          {
+          }
+
+          @Override
+          public boolean assignTask(Task task)
+          {
+            throw new RuntimeException("Assign failure test");
+          }
+
+          @Override
+          public void shutdownTask(String taskId)
+          {
+          }
+        };
+      }
+    };
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
+
+    Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
+    Assert.assertTrue(future.get().isFailure());
+    Assert.assertNotNull(future.get().getErrorMsg());
+    Assert.assertTrue(
+        StringUtils.format("Actual message is: %s", future.get().getErrorMsg()),
+        future.get().getErrorMsg().startsWith("Failed to assign this task")
+    );
+  }
+
   private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
       TaskStorage taskStorage,
       List<Object> listenerNotificationsAccumulator
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
index 3319d81..5f4a9b8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
@@ -115,6 +115,12 @@
 
     Assert.assertEquals(task0.getId(), updates.get(3).getTaskId());
     Assert.assertTrue(updates.get(3).getTaskStatus().isFailure());
+    Assert.assertNotNull(updates.get(3).getTaskStatus().getErrorMsg());
+    Assert.assertTrue(
+        updates.get(3).getTaskStatus().getErrorMsg().startsWith(
+            "This task disappeared on the worker where it was assigned"
+        )
+    );
 
     updates.clear();
 
@@ -138,6 +144,12 @@
 
     Assert.assertEquals(task2.getId(), updates.get(0).getTaskId());
     Assert.assertTrue(updates.get(0).getTaskStatus().isFailure());
+    Assert.assertNotNull(updates.get(0).getTaskStatus().getErrorMsg());
+    Assert.assertTrue(
+        updates.get(0).getTaskStatus().getErrorMsg().startsWith(
+            "This task disappeared on the worker where it was assigned"
+        )
+    );
 
     Assert.assertEquals(task3.getId(), updates.get(1).getTaskId());
     Assert.assertTrue(updates.get(1).getTaskStatus().isRunnable());