Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons (#11446)
* Add error message; add unit tests for ForkingTaskRunner
* add tests
* fix comment
* unused import
* add exit code in error message
* fix test
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 2d08338..49cae23 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -343,14 +344,11 @@
jsonMapper.writeValue(taskFile, task);
}
- LOGGER.info("Running command: %s", getMaskedCommand(startupLoggingConfig.getMaskProperties(), command));
- taskWorkItem.processHolder = new ProcessHolder(
- new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
- logFile,
- taskLocation.getHost(),
- taskLocation.getPort(),
- taskLocation.getTlsPort()
+ LOGGER.info(
+ "Running command: %s",
+ getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
);
+ taskWorkItem.processHolder = runTaskProcess(command, logFile, taskLocation);
processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
@@ -364,38 +362,23 @@
);
LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
- boolean runFailed = true;
-
- final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
- // This will block for a while. So we append the thread information with more details
- final String priorThreadName = Thread.currentThread().getName();
- Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
- try (final OutputStream toLogfile = logSink.openStream()) {
- ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
- final int statusCode = processHolder.process.waitFor();
- LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
- if (statusCode == 0) {
- runFailed = false;
- }
- }
- finally {
- Thread.currentThread().setName(priorThreadName);
- // Upload task logs
- taskLogPusher.pushTaskLog(task.getId(), logFile);
- if (reportsFile.exists()) {
- taskLogPusher.pushTaskReports(task.getId(), reportsFile);
- }
- }
-
- TaskStatus status;
- if (!runFailed) {
+ final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+ final TaskStatus status;
+ if (exitCode == 0) {
+ LOGGER.info("Process exited successfully for task: %s", task.getId());
// Process exited successfully
status = jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
+ LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
// Process exited unsuccessfully
- status = TaskStatus.failure(task.getId());
+ status = TaskStatus.failure(
+ task.getId(),
+ StringUtils.format(
+ "Task execution process exited unsuccessfully with code[%s]. "
+ + "See middleManager logs for more details.",
+ exitCode
+ )
+ );
}
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
@@ -417,7 +400,7 @@
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
- taskWorkItem.processHolder.process.destroy();
+ taskWorkItem.processHolder.shutdown();
}
if (!stopping) {
saveRunningTasks();
@@ -458,6 +441,42 @@
}
}
+ @VisibleForTesting
+ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
+ {
+ return new ProcessHolder(
+ new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
+ logFile,
+ taskLocation.getHost(),
+ taskLocation.getPort(),
+ taskLocation.getTlsPort()
+ );
+ }
+
+ @VisibleForTesting
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+ throws IOException, InterruptedException
+ {
+ final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
+
+ // This will block for a while. So we append the thread information with more details
+ final String priorThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
+
+ try (final OutputStream toLogfile = logSink.openStream()) {
+ ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
+ return processHolder.process.waitFor();
+ }
+ finally {
+ Thread.currentThread().setName(priorThreadName);
+ // Upload task logs
+ taskLogPusher.pushTaskLog(task.getId(), logFile);
+ if (reportsFile.exists()) {
+ taskLogPusher.pushTaskReports(task.getId(), reportsFile);
+ }
+ }
+ }
+
@Override
@LifecycleStop
public void stop()
@@ -726,7 +745,8 @@
}
}
- private static class ProcessHolder
+ @VisibleForTesting
+ static class ProcessHolder
{
private final Process process;
private final File logFile;
@@ -743,11 +763,18 @@
this.tlsPort = tlsPort;
}
- private void registerWithCloser(Closer closer)
+ @VisibleForTesting
+ void registerWithCloser(Closer closer)
{
closer.register(process.getInputStream());
closer.register(process.getOutputStream());
}
+
+ @VisibleForTesting
+ void shutdown()
+ {
+ process.destroy();
+ }
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index cfbbab4..24dba4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -208,10 +208,31 @@
.emit();
log.warn(e, "Graceful shutdown of task[%s] aborted with exception.", task.getId());
error = true;
- TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
+ // Creating a new status to only feed listeners seems quite strange.
+ // This is currently OK because we have no listeners yet registered in peon.
+ // However, we should fix this in the near future by always retrieving task status
+ // from one single source of truth that is also propagated to the overlord.
+ // See https://github.com/apache/druid/issues/11445.
+ TaskRunnerUtils.notifyStatusChanged(
+ listeners,
+ task.getId(),
+ TaskStatus.failure(
+ task.getId(),
+ "Failed to stop gracefully with exception. See task logs for more details."
+ )
+ );
}
} else {
- TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
+ // Creating a new status to only feed listeners seems quite strange.
+ // This is currently OK because we have no listeners yet registered in peon.
+ // However, we should fix this in the near future by always retrieving task status
+ // from one single source of truth that is also propagated to the overlord.
+ // See https://github.com/apache/druid/issues/11445.
+ TaskRunnerUtils.notifyStatusChanged(
+ listeners,
+ task.getId(),
+ TaskStatus.failure(task.getId(), "Canceled as task execution process stopped")
+ );
}
elapsed = System.currentTimeMillis() - start;
@@ -417,7 +438,6 @@
{
return task.getDataSource();
}
-
}
private class SingleTaskBackgroundRunnerCallable implements Callable<TaskStatus>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index e463d83..84a414c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -197,7 +197,7 @@
Thread.currentThread()
.setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName));
- TaskStatus taskStatus = null;
+ TaskStatus taskStatus;
final TaskToolbox toolbox = toolboxFactory.build(task);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
TaskRunnerUtils.notifyStatusChanged(
@@ -212,12 +212,13 @@
}
catch (Throwable t) {
LOGGER.error(t, "Exception caught while running the task.");
+ taskStatus = TaskStatus.failure(
+ task.getId(),
+ "Failed with an exception. See indexer logs for more details."
+ );
}
finally {
taskWorkItem.setState(RunnerTaskState.NONE);
- if (taskStatus == null) {
- taskStatus = TaskStatus.failure(task.getId());
- }
Thread.currentThread().setName(priorThreadName);
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 19598d0..d0f2a3b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -251,7 +251,15 @@
@Override
public void onFailure(Throwable t)
{
- submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
+ submitNoticeToExec(
+ new StatusNotice(
+ task,
+ TaskStatus.failure(
+ task.getId(),
+ "Failed to run task with an exception. See middleManager or indexer logs for more details."
+ )
+ )
+ );
}
}
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
index 23b1968..27fb31e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
@@ -127,7 +127,10 @@
if (completedAnnouncement != null) {
completionStatus = completedAnnouncement.getTaskStatus();
} else if (!runningTasks.containsKey(announcement.getTaskStatus().getId())) {
- completionStatus = TaskStatus.failure(announcement.getTaskStatus().getId());
+ completionStatus = TaskStatus.failure(
+ announcement.getTaskStatus().getId(),
+ "Canceled as unknown task. See middleManager or indexer logs for more details."
+ );
}
if (completionStatus != null) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index b766bc7..06629b1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -19,18 +19,35 @@
package org.apache.druid.indexing.overlord;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
import org.assertj.core.util.Lists;
+import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
public class ForkingTaskRunnerTest
{
@@ -145,12 +162,192 @@
"-Dsome.somepassword = secret=value",
"-Dsome.some=notasecret",
"-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
- ),
- "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
- "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+ ),
+ "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+ + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
);
StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
- ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
- Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ null,
+ new WorkerConfig(),
+ null,
+ null,
+ null,
+ null,
+ startupLoggingConfig
+ );
+ Assert.assertEquals(
+ originalAndExpectedCommand.rhs,
+ forkingTaskRunner.getMaskedCommand(
+ startupLoggingConfig.getMaskProperties(),
+ originalAndExpectedCommand.lhs
+ )
+ );
+ }
+
+ @Test
+ public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+ {
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false
+ ),
+ new WorkerConfig(),
+ new Properties(),
+ new NoopTaskLogs(),
+ new DefaultObjectMapper(),
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig()
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+ {
+ ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+ Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+ Mockito.doNothing().when(processHolder).shutdown();
+ return processHolder;
+ }
+
+ @Override
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+ {
+ // Emulate task process failure
+ return 1;
+ }
+ };
+
+ final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+ Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(
+ "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
+ status.getErrorMsg()
+ );
+ }
+
+ @Test
+ public void testTaskStatusWhenTaskProcessSucceedsTaskSucceeds() throws ExecutionException, InterruptedException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ Task task = NoopTask.create();
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false
+ ),
+ new WorkerConfig(),
+ new Properties(),
+ new NoopTaskLogs(),
+ mapper,
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig()
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
+ {
+ ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+ Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+ Mockito.doNothing().when(processHolder).shutdown();
+
+ for (String param : command) {
+ if (param.endsWith("status.json")) {
+ mapper.writeValue(new File(param), TaskStatus.success(task.getId()));
+ break;
+ }
+ }
+
+ return processHolder;
+ }
+
+ @Override
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+ {
+ return 0;
+ }
+ };
+
+ final TaskStatus status = forkingTaskRunner.run(task).get();
+ Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
+ Assert.assertNull(status.getErrorMsg());
+ }
+
+ @Test
+ public void testTaskStatusWhenTaskProcessSucceedsTaskFails() throws ExecutionException, InterruptedException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ Task task = NoopTask.create();
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false
+ ),
+ new WorkerConfig(),
+ new Properties(),
+ new NoopTaskLogs(),
+ mapper,
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig()
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
+ {
+ ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+ Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+ Mockito.doNothing().when(processHolder).shutdown();
+
+ for (String param : command) {
+ if (param.endsWith("status.json")) {
+ mapper.writeValue(new File(param), TaskStatus.failure(task.getId(), "task failure test"));
+ break;
+ }
+ }
+
+ return processHolder;
+ }
+
+ @Override
+ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+ {
+ return 0;
+ }
+ };
+
+ final TaskStatus status = forkingTaskRunner.run(task).get();
+ Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals("task failure test", status.getErrorMsg());
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index f1ea2a0..7b12c0d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -21,6 +21,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -35,6 +36,8 @@
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunner;
@@ -64,9 +67,11 @@
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
public class SingleTaskBackgroundRunnerTest
{
@@ -94,6 +99,7 @@
false
);
final ServiceEmitter emitter = new NoopServiceEmitter();
+ EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
null,
@@ -199,6 +205,111 @@
Assert.assertTrue(holder.get());
}
+ @Test
+ public void testStopRestorableTaskExceptionAfterStop()
+ {
+ // statusChanged callback can be called by multiple threads.
+ AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
+ runner.registerListener(
+ new TaskRunnerListener()
+ {
+ @Override
+ public String getListenerId()
+ {
+ return "testStopRestorableTaskExceptionAfterStop";
+ }
+
+ @Override
+ public void locationChanged(String taskId, TaskLocation newLocation)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void statusChanged(String taskId, TaskStatus status)
+ {
+ statusHolder.set(status);
+ }
+ },
+ Execs.directExecutor()
+ );
+ runner.run(
+ new RestorableTask(new BooleanHolder())
+ {
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ throw new Error("task failure test");
+ }
+ }
+ );
+ runner.stop();
+ Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
+ Assert.assertEquals(
+ "Failed to stop gracefully with exception. See task logs for more details.",
+ statusHolder.get().getErrorMsg()
+ );
+ }
+
+ @Test
+ public void testStopNonRestorableTask() throws InterruptedException
+ {
+ // latch to wait for SingleTaskBackgroundRunnerCallable to be executed before stopping the task
+ // We need this latch because TaskRunnerListener is currently racy.
+ // See https://github.com/apache/druid/issues/11445 for more details.
+ CountDownLatch runLatch = new CountDownLatch(1);
+ // statusChanged callback can be called by multiple threads.
+ AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
+ runner.registerListener(
+ new TaskRunnerListener()
+ {
+ @Override
+ public String getListenerId()
+ {
+ return "testStopNonRestorableTask";
+ }
+
+ @Override
+ public void locationChanged(String taskId, TaskLocation newLocation)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void statusChanged(String taskId, TaskStatus status)
+ {
+ if (status.getStatusCode() == TaskState.RUNNING) {
+ runLatch.countDown();
+ } else {
+ statusHolder.set(status);
+ }
+ }
+ },
+ Execs.directExecutor()
+ );
+ runner.run(
+ new NoopTask(
+ null,
+ null,
+ "datasource",
+ 10000, // 10 sec
+ 0,
+ null,
+ null,
+ null
+ )
+ );
+
+ Assert.assertTrue(runLatch.await(1, TimeUnit.SECONDS));
+ runner.stop();
+
+ Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
+ Assert.assertEquals(
+ "Canceled as task execution process stopped",
+ statusHolder.get().getErrorMsg()
+ );
+ }
+
private static class RestorableTask extends AbstractTask
{
private final BooleanHolder gracefullyStopped;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
new file mode 100644
index 0000000..d8ddd51
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class ThreadingTaskRunnerTest
+{
+
+ @Test
+ public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws ExecutionException, InterruptedException
+ {
+ ThreadingTaskRunner runner = new ThreadingTaskRunner(
+ mockTaskToolboxFactory(),
+ new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of(),
+ false,
+ new Period("PT0S"),
+ new Period("PT10S"),
+ ImmutableList.of(),
+ false,
+ false
+ ),
+ new WorkerConfig(),
+ new NoopTaskLogs(),
+ new DefaultObjectMapper(),
+ new TestAppenderatorsManager(),
+ new MultipleFileTaskReportFileWriter(),
+ new DruidNode("middleManager", "host", false, 8091, null, true, false)
+ );
+
+ Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id", "datasource", null)
+ {
+ @Override
+ public String getType()
+ {
+ return "test";
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient)
+ {
+ return true;
+ }
+
+ @Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ throw new RuntimeException("Task failure test");
+ }
+ });
+
+ TaskStatus status = statusFuture.get();
+ Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(
+ "Failed with an exception. See indexer logs for more details.",
+ status.getErrorMsg()
+ );
+ }
+
+ private static TaskToolboxFactory mockTaskToolboxFactory()
+ {
+ TaskToolboxFactory factory = Mockito.mock(TaskToolboxFactory.class);
+ Mockito.when(factory.build(ArgumentMatchers.any())).thenReturn(Mockito.mock(TaskToolbox.class));
+ return factory;
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index b7a489f..389d380 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -25,8 +25,10 @@
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
@@ -55,6 +57,7 @@
import org.junit.Test;
import java.io.File;
+import java.util.Map;
/**
*/
@@ -263,6 +266,35 @@
Assert.assertNotNull(update4.getTaskAnnouncement().getTaskLocation().getHost());
}
+ @Test(timeout = 30_000L)
+ public void testTaskStatusWhenTaskRunnerFutureThrowsException() throws Exception
+ {
+ Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
+ {
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ throw new Error("task failure test");
+ }
+ };
+ workerTaskManager.start();
+ workerTaskManager.assignTask(task);
+
+ Map<String, TaskAnnouncement> completeTasks;
+ do {
+ completeTasks = workerTaskManager.getCompletedTasks();
+ } while (completeTasks.isEmpty());
+
+ Assert.assertEquals(1, completeTasks.size());
+ TaskAnnouncement announcement = completeTasks.get(task.getId());
+ Assert.assertNotNull(announcement);
+ Assert.assertEquals(TaskState.FAILED, announcement.getStatus());
+ Assert.assertEquals(
+ "Failed to run task with an exception. See middleManager or indexer logs for more details.",
+ announcement.getTaskStatus().getErrorMsg()
+ );
+ }
+
private NoopTask createNoopTask(String id)
{
return new NoopTask(id, null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index dbc44f0..5a02700 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -361,6 +361,10 @@
Assert.assertEquals(1, announcements.size());
Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
Assert.assertEquals(TaskState.FAILED, announcements.get(0).getTaskStatus().getStatusCode());
+ Assert.assertEquals(
+ "Canceled as unknown task. See middleManager or indexer logs for more details.",
+ announcements.get(0).getTaskStatus().getErrorMsg()
+ );
}
@Test(timeout = 60_000L)