YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. (#4695)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 173df28..65f2b68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -45,6 +45,28 @@
   /**
    * Throws an exception due to an error.
    *
+   * @param t the throwable raised in the called class.
+   * @param errMsgFormat the error message format string.
+   * @param args referenced by the format specifiers in the format string.
+   * @throws YarnException on failure
+   */
+  @Public
+  @Unstable
+  public static void logAndThrowException(Throwable t, String errMsgFormat, Object... args)
+      throws YarnException {
+    String msg = String.format(errMsgFormat, args);
+    if (t != null) {
+      LOG.error(msg, t);
+      throw new YarnException(msg, t);
+    } else {
+      LOG.error(msg);
+      throw new YarnException(msg);
+    }
+  }
+
+  /**
+   * Throws an exception due to an error.
+   *
    * @param errMsg the error message
    * @param t the throwable raised in the called class.
    * @throws YarnException on failure
@@ -101,4 +123,26 @@
       throw new RuntimeException(errMsg);
     }
   }
+
+  /**
+   * Throws an RunTimeException due to an error.
+   *
+   * @param t the throwable raised in the called class.
+   * @param errMsgFormat the error message format string.
+   * @param args referenced by the format specifiers in the format string.
+   * @throws RuntimeException on failure
+   */
+  @Public
+  @Unstable
+  public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, Object... args)
+      throws RuntimeException {
+    String msg = String.format(errMsgFormat, args);
+    if (t != null) {
+      LOG.error(msg, t);
+      throw new RuntimeException(msg, t);
+    } else {
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 474d5fa..d2fda2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -1360,7 +1360,24 @@
 
   @Override
   public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
-    throw new NotImplementedException("Code is not implemented");
+    if (appId == null || appId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
+    }
+
+    try {
+      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
+
+      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+      return interceptor.getAppAttempts(hsr, appId);
+    } catch (IllegalArgumentException e) {
+      RouterServerUtil.logAndThrowRunTimeException(e,
+          "Unable to get the AppAttempt appId: %s.", appId);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
+    }
+
+    return null;
   }
 
   @Override
@@ -1372,7 +1389,28 @@
   @Override
   public AppAttemptInfo getAppAttempt(HttpServletRequest req,
       HttpServletResponse res, String appId, String appAttemptId) {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (appId == null || appId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
+    }
+    if (appAttemptId == null || appAttemptId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
+    }
+
+    try {
+      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
+
+      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+      return interceptor.getAppAttempt(req, res, appId, appAttemptId);
+    } catch (IllegalArgumentException e) {
+      RouterServerUtil.logAndThrowRunTimeException(e,
+          "Unable to get the AppAttempt appId: %s, appAttemptId: %s.", appId, appAttemptId);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
+    }
+
+    return null;
   }
 
   @Override
@@ -1423,13 +1461,7 @@
     }
 
     try {
-      ApplicationId applicationId = ApplicationId.fromString(appId);
-      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
-
-      if (subClusterInfo == null) {
-        RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
-            applicationId, null);
-      }
+      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
 
       DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
           subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@@ -1483,12 +1515,7 @@
       ContainerId containerIdObj = ContainerId.fromString(containerId);
       ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
 
-      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
-
-      if (subClusterInfo == null) {
-        RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
-            applicationId, null);
-      }
+      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString());
 
       DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
           subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@@ -1556,24 +1583,26 @@
   /**
    * get the HomeSubCluster according to ApplicationId.
    *
-   * @param applicationId applicationId
+   * @param appId applicationId
    * @return HomeSubCluster
    * @throws YarnException on failure
    */
-  private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
+  private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
+      throws YarnException {
     SubClusterInfo subClusterInfo = null;
-    SubClusterId subClusterId = null;
     try {
-      subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
+      ApplicationId applicationId = ApplicationId.fromString(appId);
+      SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
       if (subClusterId == null) {
-        RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId "
-            + applicationId, null);
+        RouterServerUtil.logAndThrowException(null,
+            "Can't get HomeSubCluster by applicationId %s", applicationId);
       }
       subClusterInfo = federationFacade.getSubCluster(subClusterId);
+      return subClusterInfo;
     } catch (YarnException e) {
-      RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId "
-          + applicationId + " failed.", e);
+      RouterServerUtil.logAndThrowException(e,
+          "Get HomeSubClusterInfo by applicationId %s failed.", appId);
     }
-    return subClusterInfo;
+    throw new YarnException("Unable to get subCluster by applicationId = " + appId);
   }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 8a6aa6d..cedcae6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -49,6 +49,11 @@
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -67,6 +72,8 @@
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -412,4 +419,48 @@
 
     return Response.status(Status.OK).build();
   }
+
+  @Override
+  public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
+      String appId, String appAttemptId) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.contains(applicationId)) {
+      throw new NotFoundException("app with id: " + appId + " not found");
+    }
+
+    ApplicationReport newApplicationReport = ApplicationReport.newInstance(
+        applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
+        "user", "queue", "appname", "host", 124, null,
+        YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
+        FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+
+    ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
+        ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
+        "host", 124, "url", "oUrl", "diagnostics",
+        YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
+        newApplicationReport.getCurrentApplicationAttemptId(), 1));
+
+    return new AppAttemptInfo(attempt);
+  }
+
+  @Override
+  public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.contains(applicationId)) {
+      throw new NotFoundException("app with id: " + appId + " not found");
+    }
+
+    AppAttemptsInfo infos = new AppAttemptsInfo();
+    infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(0));
+    infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
+    return infos;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index 6233300..d3625ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -56,6 +56,8 @@
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -717,4 +719,60 @@
         appId.toString(), appAttemptId.toString(), "0");
     Assert.assertNotNull(containerInfo);
   }
+
+  @Test
+  public void testGetAppAttempts()
+      throws IOException, InterruptedException, YarnException {
+    // Submit application to multiSubCluster
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Assert.assertNotNull(interceptor.submitApplication(context, null));
+
+    AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString());
+    Assert.assertNotNull(appAttemptsInfo);
+
+    ArrayList<AppAttemptInfo> attemptLists = appAttemptsInfo.getAttempts();
+    Assert.assertNotNull(appAttemptsInfo);
+    Assert.assertEquals(2, attemptLists.size());
+
+    AppAttemptInfo attemptInfo1 = attemptLists.get(0);
+    Assert.assertNotNull(attemptInfo1);
+    Assert.assertEquals(0, attemptInfo1.getAttemptId());
+    Assert.assertEquals("AppAttemptId_0", attemptInfo1.getAppAttemptId());
+    Assert.assertEquals("LogLink_0", attemptInfo1.getLogsLink());
+    Assert.assertEquals(1659621705L, attemptInfo1.getFinishedTime());
+
+    AppAttemptInfo attemptInfo2 = attemptLists.get(1);
+    Assert.assertNotNull(attemptInfo2);
+    Assert.assertEquals(0, attemptInfo2.getAttemptId());
+    Assert.assertEquals("AppAttemptId_1", attemptInfo2.getAppAttemptId());
+    Assert.assertEquals("LogLink_1", attemptInfo2.getLogsLink());
+    Assert.assertEquals(1659621705L, attemptInfo2.getFinishedTime());
+  }
+
+  @Test
+  public void testGetAppAttempt()
+      throws IOException, InterruptedException, YarnException {
+
+    // Generate ApplicationId information
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Generate ApplicationAttemptId information
+    Assert.assertNotNull(interceptor.submitApplication(context, null));
+    ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
+        appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), "1");
+
+    Assert.assertNotNull(appAttemptInfo);
+    Assert.assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId());
+    Assert.assertEquals("url", appAttemptInfo.getTrackingUrl());
+    Assert.assertEquals("oUrl", appAttemptInfo.getOriginalTrackingUrl());
+    Assert.assertEquals(124, appAttemptInfo.getRpcPort());
+    Assert.assertEquals("host", appAttemptInfo.getHost());
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
index edf3804..565074b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
@@ -30,12 +30,16 @@
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Test class to validate RouterWebServiceUtil methods.
  */
@@ -579,4 +583,13 @@
     metrics.setActiveNodes(rand.nextInt(1000));
     metrics.setShutdownNodes(rand.nextInt(1000));
   }
+
+  public static AppAttemptInfo generateAppAttemptInfo(int attemptId) {
+    AppAttemptInfo appAttemptInfo = mock(AppAttemptInfo.class);
+    when(appAttemptInfo.getAppAttemptId()).thenReturn("AppAttemptId_" + attemptId);
+    when(appAttemptInfo.getAttemptId()).thenReturn(0);
+    when(appAttemptInfo.getFinishedTime()).thenReturn(1659621705L);
+    when(appAttemptInfo.getLogsLink()).thenReturn("LogLink_" + attemptId);
+    return appAttemptInfo;
+  }
 }
\ No newline at end of file