Add the error message in taskStatus for task failures in overlord (#11419)
* add error messages in taskStatus for task failures in overlord
* unused imports
* add helper message for logs to look up
* fix tests
* fix counting the same task failures more than once
* same fix for HttpRemoteTaskRunner
diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
index 6bef27e..00d072c 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -47,11 +47,21 @@
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null);
}
+ /**
+ * The succeeded task status should not have error messages.
+ * Use {@link #success(String)} instead.
+ */
+ @Deprecated
public static TaskStatus success(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null);
}
+ /**
+ * All failed task status must have a non-null error message.
+ * Use {@link #failure(String, String)} instead.
+ */
+ @Deprecated
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
@@ -62,6 +72,11 @@
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
}
+ /**
+ * This method is deprecated because it does not handle the error message of failed task status properly.
+ * Use {@link #success(String)} or {@link #failure(String, String)} instead.
+ */
+ @Deprecated
public static TaskStatus fromCode(String taskId, TaskState code)
{
return new TaskStatus(taskId, code, -1, null, null);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 6fd3f7f..de0713b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -52,6 +52,7 @@
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
@@ -108,6 +109,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes.
@@ -540,7 +542,9 @@
} else if ((completeTask = completeTasks.get(task.getId())) != null) {
return completeTask.getResult();
} else {
- return addPendingTask(task).getResult();
+ RemoteTaskRunnerWorkItem workItem = addPendingTask(task);
+ runPendingTasks();
+ return workItem.getResult();
}
}
@@ -681,7 +685,8 @@
}
/**
- * Adds a task to the pending queue
+ * Adds a task to the pending queue.
+ * {@link #runPendingTasks()} should be called to run the pending task.
*/
@VisibleForTesting
RemoteTaskRunnerWorkItem addPendingTask(final Task task)
@@ -696,7 +701,6 @@
);
pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem);
- runPendingTasks();
return taskRunnerWorkItem;
}
@@ -705,7 +709,8 @@
* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
* This method should be run each time there is new worker capacity or if new tasks are assigned.
*/
- private void runPendingTasks()
+ @VisibleForTesting
+ void runPendingTasks()
{
runPendingTasksExec.submit(
(Callable<Void>) () -> {
@@ -716,30 +721,7 @@
sortByInsertionTime(copy);
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
- String taskId = taskRunnerWorkItem.getTaskId();
- if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
- try {
- //this can still be null due to race from explicit task shutdown request
- //or if another thread steals and completes this task right after this thread makes copy
- //of pending tasks. See https://github.com/apache/druid/issues/2842 .
- Task task = pendingTaskPayloads.get(taskId);
- if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
- pendingTaskPayloads.remove(taskId);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Exception while trying to assign task")
- .addData("taskId", taskRunnerWorkItem.getTaskId())
- .emit();
- RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
- if (workItem != null) {
- taskComplete(workItem, null, TaskStatus.failure(taskId));
- }
- }
- finally {
- tryAssignTasks.remove(taskId);
- }
- }
+ runPendingTask(taskRunnerWorkItem);
}
}
catch (Exception e) {
@@ -751,6 +733,45 @@
);
}
+ /**
+ * Run one pending task. This method must be called in the same class except for unit tests.
+ */
+ @VisibleForTesting
+ void runPendingTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem)
+ {
+ String taskId = taskRunnerWorkItem.getTaskId();
+ if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
+ try {
+ //this can still be null due to race from explicit task shutdown request
+ //or if another thread steals and completes this task right after this thread makes copy
+ //of pending tasks. See https://github.com/apache/druid/issues/2842 .
+ Task task = pendingTaskPayloads.get(taskId);
+ if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
+ pendingTaskPayloads.remove(taskId);
+ }
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Exception while trying to assign task")
+ .addData("taskId", taskRunnerWorkItem.getTaskId())
+ .emit();
+ RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
+ if (workItem != null) {
+ taskComplete(
+ workItem,
+ null,
+ TaskStatus.failure(
+ taskId,
+ StringUtils.format("Failed to assign this task. See overlord logs for more details.")
+ )
+ );
+ }
+ }
+ finally {
+ tryAssignTasks.remove(taskId);
+ }
+ }
+ }
+
@VisibleForTesting
static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> tasks)
{
@@ -930,7 +951,18 @@
elapsed,
config.getTaskAssignmentTimeout()
).emit();
- taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
+ taskComplete(
+ taskRunnerWorkItem,
+ theZkWorker,
+ TaskStatus.failure(
+ task.getId(),
+ StringUtils.format(
+ "The worker that this task is assigned did not start it in timeout[%s]. "
+ + "See overlord logs for more details.",
+ config.getTaskAssignmentTimeout()
+ )
+ )
+ );
break;
}
}
@@ -1066,9 +1098,13 @@
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
- log.info("Task[%s] just disappeared!", taskId);
- taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
- TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+ log.warn("Task[%s] just disappeared!", taskId);
+ final TaskStatus taskStatus = TaskStatus.failure(
+ taskId,
+ "The worker that this task was assigned disappeared. See overlord logs for more details."
+ );
+ taskRunnerWorkItem.setResult(taskStatus);
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskId, taskStatus);
} else {
log.info("Task[%s] went bye bye.", taskId);
}
@@ -1189,8 +1225,12 @@
log.info("Failing task[%s]", assignedTask);
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
- taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
- TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
+ final TaskStatus taskStatus = TaskStatus.failure(
+ assignedTask,
+ StringUtils.format("Canceled for worker cleanup. See overlord logs for more details.")
+ );
+ taskRunnerWorkItem.setResult(taskStatus);
+ TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, taskStatus);
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
@@ -1235,7 +1275,7 @@
private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
- ZkWorker zkWorker,
+ @Nullable ZkWorker zkWorker,
TaskStatus taskStatus
)
{
@@ -1255,41 +1295,76 @@
}
// Move from running -> complete
- completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
- runningTasks.remove(taskStatus.getId());
+ // If the task was running and this is the first complete event,
+ // previousComplete should be null and removedRunning should not.
+ final RemoteTaskRunnerWorkItem previousComplete = completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
+ final RemoteTaskRunnerWorkItem removedRunning = runningTasks.remove(taskStatus.getId());
- // Update success/failure counters
- if (zkWorker != null) {
- if (taskStatus.isSuccess()) {
- zkWorker.resetContinuouslyFailedTasksCount();
- if (blackListedWorkers.remove(zkWorker)) {
- zkWorker.setBlacklistedUntil(null);
- log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
+ if (previousComplete != null && removedRunning != null) {
+ log.warn(
+ "This is not the first complete event for task[%s], but it was still known as running. "
+ + "Ignoring the previously known running status.",
+ taskStatus.getId()
+ );
+ }
+
+ if (previousComplete != null) {
+ // This is not the first complete event for the same task.
+ try {
+ // getResult().get() must return immediately.
+ TaskState lastKnownState = previousComplete.getResult().get(1, TimeUnit.MILLISECONDS).getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
}
- } else if (taskStatus.isFailure()) {
- zkWorker.incrementContinuouslyFailedTasksCount();
}
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException | TimeoutException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring this failure.");
+ }
+ } else {
+ // This is the first complete event for this task.
+ // Update success/failure counters
+ if (zkWorker != null) {
+ if (taskStatus.isSuccess()) {
+ zkWorker.resetContinuouslyFailedTasksCount();
+ if (blackListedWorkers.remove(zkWorker)) {
+ zkWorker.setBlacklistedUntil(null);
+ log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
+ }
+ } else if (taskStatus.isFailure()) {
+ zkWorker.incrementContinuouslyFailedTasksCount();
+ }
- // Blacklist node if there are too many failures.
- synchronized (blackListedWorkers) {
- if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
- blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
- zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
- if (blackListedWorkers.add(zkWorker)) {
- log.info(
- "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
- zkWorker.getWorker(),
- zkWorker.getBlacklistedUntil(),
- zkWorker.getContinuouslyFailedTasksCount()
- );
+ // Blacklist node if there are too many failures.
+ synchronized (blackListedWorkers) {
+ if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
+ blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
+ zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
+ if (blackListedWorkers.add(zkWorker)) {
+ log.info(
+ "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
+ zkWorker.getWorker(),
+ zkWorker.getBlacklistedUntil(),
+ zkWorker.getContinuouslyFailedTasksCount()
+ );
+ }
}
}
}
- }
- // Notify interested parties
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
+ // Notify interested parties
+ taskRunnerWorkItem.setResult(taskStatus);
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
+ }
}
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index d341663..d9e582c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -288,7 +288,9 @@
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
- notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
+ final String errorMessage = "Failed while waiting for the task to be ready to run. "
+ + "See overlord logs for more details.";
+ notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
continue;
}
if (taskIsReady) {
@@ -412,7 +414,7 @@
Preconditions.checkNotNull(taskId, "taskId");
for (final Task task : tasks) {
if (task.getId().equals(taskId)) {
- notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args);
+ notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
break;
}
}
@@ -556,7 +558,9 @@
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.emit();
- handleStatus(TaskStatus.failure(task.getId()));
+ handleStatus(
+ TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.")
+ );
}
private void handleStatus(final TaskStatus status)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 623458f..59085a2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -47,6 +47,7 @@
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@@ -68,6 +69,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -424,7 +426,18 @@
config.getTaskAssignmentTimeout()
).emit();
// taskComplete(..) must be called outside of statusLock, see comments on method.
- taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
+ taskComplete(
+ workItem,
+ workerHolder,
+ TaskStatus.failure(
+ taskId,
+ StringUtils.format(
+ "The worker that this task is assigned did not start it in timeout[%s]. "
+ + "See overlord and middleManager/indexer logs for more details.",
+ config.getTaskAssignmentTimeout()
+ )
+ )
+ );
}
return true;
@@ -456,13 +469,36 @@
workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
}
- // Notify interested parties
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
+ if (taskRunnerWorkItem.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState = taskRunnerWorkItem.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring this failure.");
+ }
+ } else {
+ // Notify interested parties
+ taskRunnerWorkItem.setResult(taskStatus);
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
- // Update success/failure counters, Blacklist node if there are too many failures.
- if (workerHolder != null) {
- blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ // Update success/failure counters, Blacklist node if there are too many failures.
+ if (workerHolder != null) {
+ blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ }
}
synchronized (statusLock) {
@@ -647,14 +683,26 @@
for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
if (!taskItem.getResult().isDone()) {
- log.info(
+ log.warn(
"Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
workerHostAndPort,
taskItem.getTaskId(),
config.getTaskCleanupTimeout()
);
// taskComplete(..) must be called outside of statusLock, see comments on method.
- taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
+ taskComplete(
+ taskItem,
+ null,
+ TaskStatus.failure(
+ taskItem.getTaskId(),
+ StringUtils.format(
+ "The worker that this task was assigned disappeared and "
+ + "did not report cleanup within timeout[%s]. "
+ + "See overlord and middleManager/indexer logs for more details.",
+ config.getTaskCleanupTimeout()
+ )
+ )
+ );
}
}
}
@@ -1179,7 +1227,15 @@
if (taskItem.getTask() == null) {
log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit();
// taskComplete(..) must be called outside of statusLock, see comments on method.
- taskComplete(taskItem, null, TaskStatus.failure(taskId));
+ taskComplete(
+ taskItem,
+ null,
+ TaskStatus.failure(
+ taskId,
+ "No payload found for this task. "
+ + "See overlord logs and middleManager/indexer logs for more details."
+ )
+ );
continue;
}
@@ -1205,7 +1261,11 @@
.emit();
// taskComplete(..) must be called outside of statusLock, see comments on method.
- taskComplete(taskItem, null, TaskStatus.failure(taskId));
+ taskComplete(
+ taskItem,
+ null,
+ TaskStatus.failure(taskId, "Failed to assign this task. See overlord logs for more details.")
+ );
}
finally {
synchronized (statusLock) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index 630b0f0..ff9aab1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -390,14 +390,20 @@
announcement.getStatus(),
worker.getHost()
);
- delta.add(TaskAnnouncement.create(
- announcement.getTaskId(),
- announcement.getTaskType(),
- announcement.getTaskResource(),
- TaskStatus.failure(announcement.getTaskId()),
- announcement.getTaskLocation(),
- announcement.getTaskDataSource()
- ));
+ delta.add(
+ TaskAnnouncement.create(
+ announcement.getTaskId(),
+ announcement.getTaskType(),
+ announcement.getTaskResource(),
+ TaskStatus.failure(
+ announcement.getTaskId(),
+ "This task disappeared on the worker where it was assigned. "
+ + "See overlord logs for more details."
+ ),
+ announcement.getTaskLocation(),
+ announcement.getTaskDataSource()
+ )
+ );
}
}
@@ -427,14 +433,20 @@
announcement.getStatus(),
worker.getHost()
);
- delta.add(TaskAnnouncement.create(
- announcement.getTaskId(),
- announcement.getTaskType(),
- announcement.getTaskResource(),
- TaskStatus.failure(announcement.getTaskId()),
- announcement.getTaskLocation(),
- announcement.getTaskDataSource()
- ));
+ delta.add(
+ TaskAnnouncement.create(
+ announcement.getTaskId(),
+ announcement.getTaskType(),
+ announcement.getTaskResource(),
+ TaskStatus.failure(
+ announcement.getTaskId(),
+ "This task disappeared on the worker where it was assigned. "
+ + "See overlord logs for more details."
+ ),
+ announcement.getTaskLocation(),
+ announcement.getTaskDataSource()
+ )
+ );
}
} else if (change instanceof WorkerHistoryItem.Metadata) {
isWorkerDisabled = ((WorkerHistoryItem.Metadata) change).isDisabled();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index e7bf022..589c6e1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -44,6 +44,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -54,6 +55,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collection;
@@ -88,6 +90,7 @@
cf = rtrTestUtils.getCuratorFramework();
task = TestTasks.unending("task id with spaces");
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@After
@@ -131,6 +134,7 @@
{
doSetup();
remoteTaskRunner.addPendingTask(task);
+ remoteTaskRunner.runPendingTasks();
Assert.assertFalse(workerRunningTask(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
@@ -352,6 +356,8 @@
TaskStatus status = future.get();
Assert.assertEquals(status.getStatusCode(), TaskState.FAILED);
+ Assert.assertNotNull(status.getErrorMsg());
+ Assert.assertTrue(status.getErrorMsg().contains("The worker that this task was assigned disappeared"));
}
@Test
@@ -446,6 +452,8 @@
TaskStatus status = future.get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+ Assert.assertNotNull(status.getErrorMsg());
+ Assert.assertTrue(status.getErrorMsg().contains("Canceled for worker cleanup"));
RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig();
Assert.assertTrue(
TestUtils.conditionValid(
@@ -517,6 +525,38 @@
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
+ @Test
+ public void testRunPendingTaskFailToAssignTask() throws Exception
+ {
+ doSetup();
+ RemoteTaskRunnerWorkItem originalItem = remoteTaskRunner.addPendingTask(task);
+ // modify taskId to make task assignment failed
+ RemoteTaskRunnerWorkItem wankyItem = Mockito.mock(RemoteTaskRunnerWorkItem.class);
+ Mockito.when(wankyItem.getTaskId()).thenReturn(originalItem.getTaskId()).thenReturn("wrongId");
+ remoteTaskRunner.runPendingTask(wankyItem);
+ TaskStatus taskStatus = originalItem.getResult().get(0, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
+ Assert.assertEquals(
+ "Failed to assign this task. See overlord logs for more details.",
+ taskStatus.getErrorMsg()
+ );
+ }
+
+ @Test
+ public void testRunPendingTaskTimeoutToAssign() throws Exception
+ {
+ makeWorker();
+ makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
+ RemoteTaskRunnerWorkItem workItem = remoteTaskRunner.addPendingTask(task);
+ remoteTaskRunner.runPendingTask(workItem);
+ TaskStatus taskStatus = workItem.getResult().get(0, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
+ Assert.assertNotNull(taskStatus.getErrorMsg());
+ Assert.assertTrue(
+ taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
+ );
+ }
+
private void doSetup() throws Exception
{
makeWorker();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 2dbd9d1..0b3d872 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -41,6 +42,7 @@
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.metadata.EntryExistsException;
@@ -146,8 +148,14 @@
Assert.assertEquals(task.interval, locksForTask.get(0).getInterval());
// Verify that locks are removed on calling shutdown
- taskQueue.shutdown(task.getId(), "Shutdown Task");
+ taskQueue.shutdown(task.getId(), "Shutdown Task test");
Assert.assertTrue(getLockbox().findLocksForTask(task).isEmpty());
+
+ Optional<TaskStatus> statusOptional = getTaskStorage().getStatus(task.getId());
+ Assert.assertTrue(statusOptional.isPresent());
+ Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
+ Assert.assertNotNull(statusOptional.get().getErrorMsg());
+ Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg());
}
@Test
@@ -305,6 +313,42 @@
Assert.assertFalse(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
+ @Test
+ public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException
+ {
+ final TaskActionClientFactory actionClientFactory = createActionClientFactory();
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null),
+ new DefaultTaskConfig(),
+ getTaskStorage(),
+ new SimpleTaskRunner(actionClientFactory),
+ actionClientFactory,
+ getLockbox(),
+ new NoopServiceEmitter()
+ );
+ taskQueue.setActive(true);
+ final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"))
+ {
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient)
+ {
+ throw new RuntimeException("isReady failure test");
+ }
+ };
+ taskQueue.add(task);
+ taskQueue.manageInternal();
+
+ Optional<TaskStatus> statusOptional = getTaskStorage().getStatus(task.getId());
+ Assert.assertTrue(statusOptional.isPresent());
+ Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
+ Assert.assertNotNull(statusOptional.get().getErrorMsg());
+ Assert.assertTrue(
+ StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()),
+ statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run")
+ );
+ }
+
private static class TestTask extends AbstractBatchIndexTask
{
private final Interval interval;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 2ca1dff..aa3f14e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -49,16 +49,20 @@
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.joda.time.Period;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@@ -79,6 +83,12 @@
*/
public class HttpRemoteTaskRunnerTest
{
+ @Before
+ public void setup()
+ {
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ }
+
/*
Simulates startup of Overlord and Workers being discovered with no previously known tasks. Fresh tasks are given
and expected to be completed.
@@ -733,6 +743,18 @@
Assert.assertTrue(future1.get().isFailure());
Assert.assertTrue(future2.get().isFailure());
+ Assert.assertNotNull(future1.get().getErrorMsg());
+ Assert.assertNotNull(future2.get().getErrorMsg());
+ Assert.assertTrue(
+ future1.get().getErrorMsg().startsWith(
+ "The worker that this task was assigned disappeared and did not report cleanup within timeout"
+ )
+ );
+ Assert.assertTrue(
+ future2.get().getErrorMsg().startsWith(
+ "The worker that this task was assigned disappeared and did not report cleanup within timeout"
+ )
+ );
AtomicInteger ticks = new AtomicInteger();
Set<String> actualShutdowns = new ConcurrentHashSet<>();
@@ -1254,6 +1276,230 @@
);
}
+ @Test
+ public void testTimeoutInAssigningTasks() throws Exception
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig()
+ {
+ @Override
+ public int getPendingTasksRunnerNumThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Period getTaskAssignmentTimeout()
+ {
+ return new Period("PT1S");
+ }
+ },
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ new NoopProvisioningStrategy<>(),
+ druidNodeDiscoveryProvider,
+ EasyMock.createNiceMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ )
+ {
+ @Override
+ protected WorkerHolder createWorkerHolder(
+ ObjectMapper smileMapper,
+ HttpClient httpClient,
+ HttpRemoteTaskRunnerConfig config,
+ ScheduledExecutorService workersSyncExec,
+ WorkerHolder.Listener listener,
+ Worker worker,
+ List<TaskAnnouncement> knownAnnouncements
+ )
+ {
+ return new WorkerHolder(
+ smileMapper,
+ httpClient,
+ config,
+ workersSyncExec,
+ listener,
+ worker,
+ ImmutableList.of()
+ )
+ {
+ @Override
+ public void start()
+ {
+ disabled.set(false);
+ }
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
+ @Override
+ public void waitForInitialization()
+ {
+ }
+
+ @Override
+ public boolean assignTask(Task task)
+ {
+ // Always returns true
+ return true;
+ }
+
+ @Override
+ public void shutdownTask(String taskId)
+ {
+ }
+ };
+ }
+ };
+
+ taskRunner.start();
+
+ DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+ new DruidNode("service", "host1", false, 8080, null, true, false),
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
+ );
+
+ druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
+
+ Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
+ Assert.assertTrue(future.get().isFailure());
+ Assert.assertNotNull(future.get().getErrorMsg());
+ Assert.assertTrue(
+ future.get().getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
+ );
+ }
+
+ @Test
+ public void testExceptionThrownInAssigningTasks() throws Exception
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig()
+ {
+ @Override
+ public int getPendingTasksRunnerNumThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Period getTaskAssignmentTimeout()
+ {
+ return new Period("PT1S");
+ }
+ },
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ new NoopProvisioningStrategy<>(),
+ druidNodeDiscoveryProvider,
+ EasyMock.createNiceMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ )
+ {
+ @Override
+ protected WorkerHolder createWorkerHolder(
+ ObjectMapper smileMapper,
+ HttpClient httpClient,
+ HttpRemoteTaskRunnerConfig config,
+ ScheduledExecutorService workersSyncExec,
+ WorkerHolder.Listener listener,
+ Worker worker,
+ List<TaskAnnouncement> knownAnnouncements
+ )
+ {
+ return new WorkerHolder(
+ smileMapper,
+ httpClient,
+ config,
+ workersSyncExec,
+ listener,
+ worker,
+ ImmutableList.of()
+ )
+ {
+ @Override
+ public void start()
+ {
+ disabled.set(false);
+ }
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
+ @Override
+ public void waitForInitialization()
+ {
+ }
+
+ @Override
+ public boolean assignTask(Task task)
+ {
+ throw new RuntimeException("Assign failure test");
+ }
+
+ @Override
+ public void shutdownTask(String taskId)
+ {
+ }
+ };
+ }
+ };
+
+ taskRunner.start();
+
+ DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+ new DruidNode("service", "host1", false, 8080, null, true, false),
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
+ );
+
+ druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
+
+ Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
+ Assert.assertTrue(future.get().isFailure());
+ Assert.assertNotNull(future.get().getErrorMsg());
+ Assert.assertTrue(
+ StringUtils.format("Actual message is: %s", future.get().getErrorMsg()),
+ future.get().getErrorMsg().startsWith("Failed to assign this task")
+ );
+ }
+
private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
index 3319d81..5f4a9b8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
@@ -115,6 +115,12 @@
Assert.assertEquals(task0.getId(), updates.get(3).getTaskId());
Assert.assertTrue(updates.get(3).getTaskStatus().isFailure());
+ Assert.assertNotNull(updates.get(3).getTaskStatus().getErrorMsg());
+ Assert.assertTrue(
+ updates.get(3).getTaskStatus().getErrorMsg().startsWith(
+ "This task disappeared on the worker where it was assigned"
+ )
+ );
updates.clear();
@@ -138,6 +144,12 @@
Assert.assertEquals(task2.getId(), updates.get(0).getTaskId());
Assert.assertTrue(updates.get(0).getTaskStatus().isFailure());
+ Assert.assertNotNull(updates.get(0).getTaskStatus().getErrorMsg());
+ Assert.assertTrue(
+ updates.get(0).getTaskStatus().getErrorMsg().startsWith(
+ "This task disappeared on the worker where it was assigned"
+ )
+ );
Assert.assertEquals(task3.getId(), updates.get(1).getTaskId());
Assert.assertTrue(updates.get(1).getTaskStatus().isRunnable());