Fix fetch of task location in SpecificTaskServiceLocator (#16462)
* Fix fetch of task location in SpecificTaskServiceLocator
* Resolve future if exception occurs while invoking API
* Remove unused import
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
index 3f54413..689cbdc 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
@@ -23,14 +23,13 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
@@ -57,7 +56,6 @@
private final String taskId;
private final OverlordClient overlordClient;
- private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
private final Object lock = new Object();
@GuardedBy("lock")
@@ -125,42 +123,15 @@
lastUpdateTime = System.currentTimeMillis();
final TaskStatus status = taskStatusMap.get(taskId);
-
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
- lastKnownState = null;
- lastKnownLocation = null;
+ resolvePendingFuture(null, null);
+ } else if (TaskLocation.unknown().equals(status.getLocation())) {
+ // Do not resolve the future just yet, try the fallback API instead
+ fetchFallbackTaskLocation();
} else {
- lastKnownState = status.getStatusCode();
- final TaskLocation location;
- if (TaskLocation.unknown().equals(status.getLocation())) {
- location = locationFetcher.getLocation();
- } else {
- location = status.getLocation();
- }
-
- if (TaskLocation.unknown().equals(location)) {
- lastKnownLocation = null;
- } else {
- lastKnownLocation = new ServiceLocation(
- location.getHost(),
- location.getPort(),
- location.getTlsPort(),
- StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
- );
- }
+ resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
-
- if (lastKnownState != TaskState.RUNNING) {
- pendingFuture.set(ServiceLocations.closed());
- } else if (lastKnownLocation == null) {
- pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
- } else {
- pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
- }
-
- // Clear pendingFuture once it has been set.
- pendingFuture = null;
}
}
}
@@ -168,17 +139,10 @@
@Override
public void onFailure(Throwable t)
{
- synchronized (lock) {
- if (pendingFuture != null) {
- pendingFuture.setException(t);
-
- // Clear pendingFuture once it has been set.
- pendingFuture = null;
- }
- }
+ resolvePendingFutureOnException(t);
}
},
- MoreExecutors.directExecutor()
+ Execs.directExecutor()
);
return Futures.nonCancellationPropagating(retVal);
@@ -209,18 +173,104 @@
}
}
- private class TaskLocationFetcher
+ private void resolvePendingFuture(TaskState state, TaskLocation location)
{
- TaskLocation getLocation()
- {
- final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
- overlordClient.taskStatus(taskId),
- true
- );
- if (statusResponse == null || statusResponse.getStatus() == null) {
- return TaskLocation.unknown();
- } else {
- return statusResponse.getStatus().getLocation();
+ synchronized (lock) {
+ if (pendingFuture != null) {
+ lastKnownState = state;
+ lastKnownLocation = location == null ? null : new ServiceLocation(
+ location.getHost(),
+ location.getPort(),
+ location.getTlsPort(),
+ StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
+ );
+
+ if (lastKnownState != TaskState.RUNNING) {
+ pendingFuture.set(ServiceLocations.closed());
+ } else if (lastKnownLocation == null) {
+ pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
+ } else {
+ pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
+ }
+
+ // Clear pendingFuture once it has been set.
+ pendingFuture = null;
+ }
+ }
+ }
+
+ private void resolvePendingFutureOnException(Throwable t)
+ {
+ synchronized (lock) {
+ if (pendingFuture != null) {
+ pendingFuture.setException(t);
+
+ // Clear pendingFuture once it has been set.
+ pendingFuture = null;
+ }
+ }
+ }
+
+ /**
+ * Invokes the single task status API {@link OverlordClient#taskStatus} if the
+ * multi-task status API returns an unknown location (this can happen if the
+ * Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
+ */
+ private void fetchFallbackTaskLocation()
+ {
+ synchronized (lock) {
+ if (pendingFuture != null) {
+ final ListenableFuture<TaskStatusResponse> taskStatusFuture;
+ try {
+ taskStatusFuture = overlordClient.taskStatus(taskId);
+ }
+ catch (Exception e) {
+ resolvePendingFutureOnException(e);
+ return;
+ }
+
+ pendingFuture.addListener(
+ () -> {
+ if (!taskStatusFuture.isDone()) {
+ // pendingFuture may resolve without taskStatusFuture due to close().
+ taskStatusFuture.cancel(true);
+ }
+ },
+ Execs.directExecutor()
+ );
+
+ Futures.addCallback(
+ taskStatusFuture,
+ new FutureCallback<TaskStatusResponse>()
+ {
+ @Override
+ public void onSuccess(final TaskStatusResponse taskStatusResponse)
+ {
+ synchronized (lock) {
+ if (pendingFuture != null) {
+ lastUpdateTime = System.currentTimeMillis();
+
+ final TaskStatusPlus status = taskStatusResponse.getStatus();
+ if (status == null) {
+ // If the task status is unknown, we'll treat it as closed.
+ resolvePendingFuture(null, null);
+ } else if (TaskLocation.unknown().equals(status.getLocation())) {
+ resolvePendingFuture(status.getStatusCode(), null);
+ } else {
+ resolvePendingFuture(status.getStatusCode(), status.getLocation());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ resolvePendingFutureOnException(t);
+ }
+ },
+ Execs.directExecutor()
+ );
}
}
}