Surface lock revocation exceptions in task status (#16325)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 7bb57c5..9cc9e4da 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -42,8 +42,6 @@
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
@@ -232,9 +230,8 @@
if (taskLock == null) {
return false;
- } else if (taskLock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked", interval));
}
+ taskLock.assertNotRevoked();
}
}
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index 309eb38..e0eee25 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -20,6 +20,14 @@
package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TimeChunkLock;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
@@ -28,27 +36,31 @@
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
+import java.util.List;
public class MSQControllerTaskTest
{
+ private final List<Interval> INTERVALS =
+ Collections.singletonList(Intervals.of(
+ "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"));
+
private final MSQSpec MSQ_SPEC = MSQSpec
.builder()
.destination(new DataSourceMSQDestination(
"target",
Granularities.DAY,
null,
- null
+ INTERVALS
))
.query(new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
- .intervals(new MultipleIntervalSegmentSpec(
- Collections.singletonList(Intervals.of(
- "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"))))
+ .intervals(new MultipleIntervalSegmentSpec(INTERVALS))
.dataSource("target")
.build()
)
@@ -88,4 +100,84 @@
);
Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId());
}
+
+ @Test
+ public void testIsReady() throws Exception
+ {
+ final String taskId = "taskId";
+ MSQControllerTask controllerTask = new MSQControllerTask(
+ taskId,
+ MSQ_SPEC,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ TestTaskActionClient taskActionClient = new TestTaskActionClient(
+ new TimeChunkLock(
+ TaskLockType.REPLACE,
+ "groupId",
+ "dataSource",
+ INTERVALS.get(0),
+ "0",
+ 0
+ )
+ );
+ Assert.assertTrue(controllerTask.isReady(taskActionClient));
+ }
+
+ @Test
+ public void testIsReadyWithRevokedLock()
+ {
+ final String taskId = "taskId";
+ MSQControllerTask controllerTask = new MSQControllerTask(
+ taskId,
+ MSQ_SPEC,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ TestTaskActionClient taskActionClient = new TestTaskActionClient(
+ new TimeChunkLock(
+ TaskLockType.REPLACE,
+ "groupId",
+ "dataSource",
+ INTERVALS.get(0),
+ "0",
+ 0,
+ true
+ )
+ );
+ DruidException exception = Assert.assertThrows(
+ DruidException.class,
+ () -> controllerTask.isReady(taskActionClient));
+ Assert.assertEquals(
+ "Lock of type[REPLACE] for interval[2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z] was revoked",
+ exception.getMessage());
+ }
+
+ private static class TestTaskActionClient implements TaskActionClient
+ {
+ private final TaskLock taskLock;
+
+ TestTaskActionClient(TaskLock taskLock)
+ {
+ this.taskLock = taskLock;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <RetType> RetType submit(TaskAction<RetType> taskAction)
+ {
+ if (!(taskAction instanceof TimeChunkLockTryAcquireAction)) {
+ throw new ISE("action[%s] is not supported", taskAction);
+ }
+ return (RetType) taskLock;
+ }
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java
index 8c1e5d2..eb96eb4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.LockRequest;
import org.joda.time.Interval;
@@ -62,4 +63,18 @@
boolean isRevoked();
boolean conflict(LockRequest request);
+
+ /**
+ * Checks if the lock is revoked and throws a {@link DruidException} if so.
+ *
+ * @throws DruidException if the lock is revoked.
+ */
+ default void assertNotRevoked()
+ {
+ if (isRevoked()) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build("Lock of type[%s] for interval[%s] was revoked", getType(), getInterval());
+ }
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 53daa6c..9fe3b78 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -59,7 +59,6 @@
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Stopwatch;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -486,9 +485,7 @@
if (lock == null) {
return false;
}
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur));
- }
+ lock.assertNotRevoked();
locksAcquired++;
intervalToLockVersion.put(cur, lock.getVersion());
}
@@ -829,9 +826,7 @@
"Cannot acquire a lock for interval[%s]",
interval
);
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
- }
+ lock.assertNotRevoked();
version = lock.getVersion();
} else {
version = existingLockVersion;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
index 2b527c8..0c2a6ca 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
@@ -30,7 +30,6 @@
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.Interval;
import java.io.IOException;
@@ -88,9 +87,7 @@
if (lock == null) {
return false;
}
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
- }
+ lock.assertNotRevoked();
return true;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 81abc86..1230711 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -337,9 +337,7 @@
if (lock == null) {
return false;
}
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval()));
- }
+ lock.assertNotRevoked();
return true;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 21b1783..4a31be2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -52,7 +52,6 @@
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -221,9 +220,7 @@
if (lock == null) {
return false;
}
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
- }
+ lock.assertNotRevoked();
return true;
} else {
return true;
@@ -420,9 +417,7 @@
),
"Cannot acquire a lock for interval[%s]", interval
);
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
- }
+ lock.assertNotRevoked();
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index 9ffebac..a32ef91 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -42,7 +42,6 @@
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -263,9 +262,7 @@
"Cannot acquire a lock for interval[%s]",
segment.getInterval()
);
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval()));
- }
+ lock.assertNotRevoked();
toolbox.getSegmentAnnouncer().announceSegment(segment);
}
@@ -292,9 +289,7 @@
"Cannot acquire a lock for interval[%s]",
segment.getInterval()
);
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval()));
- }
+ lock.assertNotRevoked();
}
toolbox.getSegmentAnnouncer().announceSegments(segments);
}
@@ -346,9 +341,7 @@
"Cannot acquire a lock for interval[%s]",
interval
);
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
- }
+ lock.assertNotRevoked();
return lock.getVersion();
}
catch (IOException e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 70a8869..6036fe6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -424,15 +424,16 @@
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
final String errorMessage;
- if (e instanceof MaxAllowedLocksExceededException) {
+ if (e instanceof MaxAllowedLocksExceededException || e instanceof DruidException) {
errorMessage = e.getMessage();
} else {
errorMessage = StringUtils.format(
"Encountered error[%s] while waiting for task to be ready. See Overlord logs for more details.",
- StringUtils.chop(e.getMessage(), 100)
+ e.getMessage()
);
}
- notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+ TaskStatus taskStatus = TaskStatus.failure(task.getId(), errorMessage);
+ notifyStatus(task, taskStatus, taskStatus.getErrorMsg());
continue;
}
if (taskIsReady) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 94ce367..b5eefcd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -464,9 +464,7 @@
if (lock == null) {
return false;
}
- if (lock.isRevoked()) {
- throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval()));
- }
+ lock.assertNotRevoked();
return true;
}
}
diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java b/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java
index 7006f13..987d12d 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -37,7 +37,7 @@
*/
public class TaskStatus
{
- public static final int MAX_ERROR_MSG_LENGTH = 100;
+ public static final int MAX_ERROR_MSG_TRUNCATION_LIMIT = 1024;
public static TaskStatus running(String taskId)
{
@@ -88,8 +88,10 @@
*/
private static @Nullable String truncateErrorMsg(@Nullable String errorMsg)
{
- if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) {
- return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "...";
+ if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_TRUNCATION_LIMIT) {
+ return errorMsg.substring(0, MAX_ERROR_MSG_TRUNCATION_LIMIT / 2)
+ + "..." + (errorMsg.length() - MAX_ERROR_MSG_TRUNCATION_LIMIT) + " characters omitted..."
+ + errorMsg.substring(errorMsg.length() - MAX_ERROR_MSG_TRUNCATION_LIMIT / 2);
} else {
return errorMsg;
}
diff --git a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
index d0cf6b3..a8a320d 100644
--- a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
+++ b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
@@ -27,6 +27,32 @@
public class TaskStatusTest
{
+ static final String STACK_TRACE =
+ "org.apache.druid.java.util.common.ISE: Lock for interval [2024-04-23T00:00:00.000Z/2024-04-24T00:00:00.000Z] was revoked.\n"
+ + "\tat org.apache.druid.indexing.common.task.AbstractBatchIndexTask.tryTimeChunkLock(AbstractBatchIndexTask.java:465) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask.isReady(PartialHashSegmentGenerateTask.java:152) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.overlord.TaskQueue.manageInternalCritical(TaskQueue.java:420) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.overlord.TaskQueue.manageInternal(TaskQueue.java:373) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.overlord.TaskQueue.manage(TaskQueue.java:356) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.overlord.TaskQueue.access$000(TaskQueue.java:91) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.overlord.TaskQueue$1.run(TaskQueue.java:212) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n"
+ + "\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n"
+ + "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n"
+ + "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n"
+ + "\tat java.base/java.lang.Thread.run(Thread.java:829) [?:?]\n";
+
+ static final String EXPECTED_ERROR_MESSAGE =
+ "org.apache.druid.java.util.common.ISE: Lock for interval [2024-04-23T00:00:00.000Z/2024-04-24T00:00:00.000Z] was revoked.\n"
+ + "\tat org.apache.druid.indexing.common.task.AbstractBatchIndexTask.tryTimeChunkLock(AbstractBatchIndexTask.java:465) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask.isReady(PartialHashSegmentGenerateTask.java:152) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat org.apache.druid.i...584 characters omitted...e$1.run(TaskQueue.java:212) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ + "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n"
+ + "\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n"
+ + "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n"
+ + "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n"
+ + "\tat java.base/java.lang.Thread.run(Thread.java:829) [?:?]\n";
+
@Test
public void testSerde() throws IOException
{
@@ -64,4 +90,11 @@
Assert.assertEquals(success.getLocation().getPort(), 0);
Assert.assertEquals(success.getLocation().getTlsPort(), 1);
}
+
+ @Test
+ public void testTruncation()
+ {
+ final TaskStatus status = TaskStatus.failure("testId", STACK_TRACE);
+ Assert.assertEquals(status.getErrorMsg(), EXPECTED_ERROR_MESSAGE);
+ }
}