Fix fetch of task location in SpecificTaskServiceLocator (#16462)

* Fix fetch of task location in SpecificTaskServiceLocator

* Resolve future if exception occurs while invoking API

* Remove unused import
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
index 3f54413..689cbdc 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
@@ -23,14 +23,13 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.rpc.ServiceLocation;
@@ -57,7 +56,6 @@
 
   private final String taskId;
   private final OverlordClient overlordClient;
-  private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
   private final Object lock = new Object();
 
   @GuardedBy("lock")
@@ -125,42 +123,15 @@
                     lastUpdateTime = System.currentTimeMillis();
 
                     final TaskStatus status = taskStatusMap.get(taskId);
-
                     if (status == null) {
                       // If the task status is unknown, we'll treat it as closed.
-                      lastKnownState = null;
-                      lastKnownLocation = null;
+                      resolvePendingFuture(null, null);
+                    } else if (TaskLocation.unknown().equals(status.getLocation())) {
+                      // Do not resolve the future just yet, try the fallback API instead
+                      fetchFallbackTaskLocation();
                     } else {
-                      lastKnownState = status.getStatusCode();
-                      final TaskLocation location;
-                      if (TaskLocation.unknown().equals(status.getLocation())) {
-                        location = locationFetcher.getLocation();
-                      } else {
-                        location = status.getLocation();
-                      }
-
-                      if (TaskLocation.unknown().equals(location)) {
-                        lastKnownLocation = null;
-                      } else {
-                        lastKnownLocation = new ServiceLocation(
-                            location.getHost(),
-                            location.getPort(),
-                            location.getTlsPort(),
-                            StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
-                        );
-                      }
+                      resolvePendingFuture(status.getStatusCode(), status.getLocation());
                     }
-
-                    if (lastKnownState != TaskState.RUNNING) {
-                      pendingFuture.set(ServiceLocations.closed());
-                    } else if (lastKnownLocation == null) {
-                      pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
-                    } else {
-                      pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
-                    }
-
-                    // Clear pendingFuture once it has been set.
-                    pendingFuture = null;
                   }
                 }
               }
@@ -168,17 +139,10 @@
               @Override
               public void onFailure(Throwable t)
               {
-                synchronized (lock) {
-                  if (pendingFuture != null) {
-                    pendingFuture.setException(t);
-
-                    // Clear pendingFuture once it has been set.
-                    pendingFuture = null;
-                  }
-                }
+                resolvePendingFutureOnException(t);
               }
             },
-            MoreExecutors.directExecutor()
+            Execs.directExecutor()
         );
 
         return Futures.nonCancellationPropagating(retVal);
@@ -209,18 +173,104 @@
     }
   }
 
-  private class TaskLocationFetcher
+  private void resolvePendingFuture(TaskState state, TaskLocation location)
   {
-    TaskLocation getLocation()
-    {
-      final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
-          overlordClient.taskStatus(taskId),
-          true
-      );
-      if (statusResponse == null || statusResponse.getStatus() == null) {
-        return TaskLocation.unknown();
-      } else {
-        return statusResponse.getStatus().getLocation();
+    synchronized (lock) {
+      if (pendingFuture != null) {
+        lastKnownState = state;
+        lastKnownLocation = location == null ? null : new ServiceLocation(
+            location.getHost(),
+            location.getPort(),
+            location.getTlsPort(),
+            StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
+        );
+
+        if (lastKnownState != TaskState.RUNNING) {
+          pendingFuture.set(ServiceLocations.closed());
+        } else if (lastKnownLocation == null) {
+          pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
+        } else {
+          pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
+        }
+
+        // Clear pendingFuture once it has been set.
+        pendingFuture = null;
+      }
+    }
+  }
+
+  private void resolvePendingFutureOnException(Throwable t)
+  {
+    synchronized (lock) {
+      if (pendingFuture != null) {
+        pendingFuture.setException(t);
+
+        // Clear pendingFuture once it has been set.
+        pendingFuture = null;
+      }
+    }
+  }
+
+  /**
+   * Invokes the single task status API {@link OverlordClient#taskStatus} if the
+   * multi-task status API returns an unknown location (this can happen if the
+   * Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
+   */
+  private void fetchFallbackTaskLocation()
+  {
+    synchronized (lock) {
+      if (pendingFuture != null) {
+        final ListenableFuture<TaskStatusResponse> taskStatusFuture;
+        try {
+          taskStatusFuture = overlordClient.taskStatus(taskId);
+        }
+        catch (Exception e) {
+          resolvePendingFutureOnException(e);
+          return;
+        }
+
+        pendingFuture.addListener(
+            () -> {
+              if (!taskStatusFuture.isDone()) {
+                // pendingFuture may resolve without taskStatusFuture due to close().
+                taskStatusFuture.cancel(true);
+              }
+            },
+            Execs.directExecutor()
+        );
+
+        Futures.addCallback(
+            taskStatusFuture,
+            new FutureCallback<TaskStatusResponse>()
+            {
+              @Override
+              public void onSuccess(final TaskStatusResponse taskStatusResponse)
+              {
+                synchronized (lock) {
+                  if (pendingFuture != null) {
+                    lastUpdateTime = System.currentTimeMillis();
+
+                    final TaskStatusPlus status = taskStatusResponse.getStatus();
+                    if (status == null) {
+                      // If the task status is unknown, we'll treat it as closed.
+                      resolvePendingFuture(null, null);
+                    } else if (TaskLocation.unknown().equals(status.getLocation())) {
+                      resolvePendingFuture(status.getStatusCode(), null);
+                    } else {
+                      resolvePendingFuture(status.getStatusCode(), status.getLocation());
+                    }
+                  }
+                }
+              }
+
+              @Override
+              public void onFailure(Throwable t)
+              {
+                resolvePendingFutureOnException(t);
+              }
+            },
+            Execs.directExecutor()
+        );
       }
     }
   }