YARN-11227. [Federation] Add getAppTimeout, getAppTimeouts, updateApplicationTimeout REST APIs for Router. (#4715)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppTimeoutInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppTimeoutInfo.java
index 140857e..460309b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppTimeoutInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppTimeoutInfo.java
@@ -22,6 +22,7 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
/**
@@ -45,6 +46,12 @@
remainingTimeInSec = -1;
}
+ public AppTimeoutInfo(ApplicationTimeout applicationTimeout) {
+ this.expiryTime = applicationTimeout.getExpiryTime();
+ this.remainingTimeInSec = applicationTimeout.getRemainingTime();
+ this.timeoutType = applicationTimeout.getTimeoutType();
+ }
+
public ApplicationTimeoutType getTimeoutType() {
return timeoutType;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index d2fda2b..51adf2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -1342,31 +1342,87 @@
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (appId == null || appId.isEmpty()) {
+ throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
+ }
+
+ if (type == null || type.isEmpty()) {
+ throw new IllegalArgumentException("Parameter error, the type is empty or null.");
+ }
+
+ try {
+ SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+ return interceptor.getAppTimeout(hsr, appId, type);
+ } catch (IllegalArgumentException e) {
+ RouterServerUtil.logAndThrowRunTimeException(e,
+ "Unable to get the getAppTimeout appId: %s.", appId);
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowRunTimeException("getAppTimeout Failed.", e);
+ }
+ return null;
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
- throw new NotImplementedException("Code is not implemented");
- }
- @Override
- public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
- HttpServletRequest hsr, String appId) throws AuthorizationException,
- YarnException, InterruptedException, IOException {
- throw new NotImplementedException("Code is not implemented");
- }
-
- @Override
- public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+ return interceptor.getAppTimeouts(hsr, appId);
+ } catch (IllegalArgumentException e) {
+ RouterServerUtil.logAndThrowRunTimeException(e,
+ "Unable to get the getAppTimeouts appId: %s.", appId);
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowRunTimeException("getAppTimeouts Failed.", e);
+ }
+ return null;
+ }
+ @Override
+ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
+ HttpServletRequest hsr, String appId) throws AuthorizationException,
+ YarnException, InterruptedException, IOException {
+
+ if (appId == null || appId.isEmpty()) {
+ throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
+ }
+
+ if (appTimeout == null) {
+ throw new IllegalArgumentException("Parameter error, the appTimeout is null.");
+ }
+
+ try {
+ SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+ return interceptor.updateApplicationTimeout(appTimeout, hsr, appId);
+ } catch (IllegalArgumentException e) {
+ RouterServerUtil.logAndThrowRunTimeException(e,
+ "Unable to get the updateApplicationTimeout appId: %s.", appId);
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowRunTimeException("updateApplicationTimeout Failed.", e);
+ }
+ return null;
+ }
+
+ @Override
+ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
+
+ if (appId == null || appId.isEmpty()) {
+ throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
+ }
+
+ try {
+ SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
@@ -1374,9 +1430,8 @@
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s.", appId);
} catch (YarnException e) {
- RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
+ RouterServerUtil.logAndThrowRunTimeException("getAppAttempts Failed.", e);
}
-
return null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index cedcae6..d5c1106 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
@@ -54,6 +53,8 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -73,6 +74,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -93,7 +96,7 @@
// This property allows us to write tests for specific scenario as YARN RM
// down e.g. network issue, failover.
private boolean isRunning = true;
- private HashSet<ApplicationId> applicationMap = new HashSet<>();
+ private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";
private void validateRunning() throws ConnectException {
@@ -123,7 +126,22 @@
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
LOG.info("Application submitted: " + appId);
- applicationMap.add(appId);
+
+ // Initialize appReport
+ ApplicationReport appReport = ApplicationReport.newInstance(
+ appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
+ null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null,
+ false, Priority.newInstance(newApp.getPriority()), null, null);
+
+ // Initialize appTimeoutsMap
+ HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
+ ApplicationTimeoutType timeoutType = ApplicationTimeoutType.LIFETIME;
+ ApplicationTimeout appTimeOut =
+ ApplicationTimeout.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", 10);
+ appTimeoutsMap.put(timeoutType, appTimeOut);
+ appReport.setApplicationTimeouts(appTimeoutsMap);
+
+ applicationMap.put(appId, appReport);
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
.entity(getSubClusterId()).build();
}
@@ -136,7 +154,7 @@
}
ApplicationId applicationId = ApplicationId.fromString(appId);
- if (!applicationMap.contains(applicationId)) {
+ if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@@ -171,7 +189,7 @@
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
- if (!applicationMap.remove(applicationId)) {
+ if (applicationMap.remove(applicationId) == null) {
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
@@ -244,7 +262,7 @@
}
ApplicationId applicationId = ApplicationId.fromString(appId);
- if (!applicationMap.contains(applicationId)) {
+ if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@@ -428,7 +446,7 @@
}
ApplicationId applicationId = ApplicationId.fromString(appId);
- if (!applicationMap.contains(applicationId)) {
+ if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@@ -454,7 +472,7 @@
}
ApplicationId applicationId = ApplicationId.fromString(appId);
- if (!applicationMap.contains(applicationId)) {
+ if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@@ -463,4 +481,102 @@
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
return infos;
}
+
+ @Override
+ public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr,
+ String appId, String type) throws AuthorizationException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+ if (!applicationMap.containsKey(applicationId)) {
+ throw new NotFoundException("app with id: " + appId + " not found");
+ }
+
+ ApplicationReport appReport = applicationMap.get(applicationId);
+ Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
+ ApplicationTimeoutType paramType = ApplicationTimeoutType.valueOf(type);
+
+ if (paramType == null) {
+ throw new NotFoundException("application timeout type not found");
+ }
+
+ if (!timeouts.containsKey(paramType)) {
+ throw new NotFoundException("timeout with id: " + appId + " not found");
+ }
+
+ ApplicationTimeout applicationTimeout = timeouts.get(paramType);
+
+ AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
+ timeoutInfo.setExpiryTime(applicationTimeout.getExpiryTime());
+ timeoutInfo.setTimeoutType(applicationTimeout.getTimeoutType());
+ timeoutInfo.setRemainingTime(applicationTimeout.getRemainingTime());
+
+ return timeoutInfo;
+ }
+
+ @Override
+ public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
+ throws AuthorizationException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+
+ if (!applicationMap.containsKey(applicationId)) {
+ throw new NotFoundException("app with id: " + appId + " not found");
+ }
+
+ ApplicationReport appReport = applicationMap.get(applicationId);
+ Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
+
+ AppTimeoutsInfo timeoutsInfo = new AppTimeoutsInfo();
+
+ for (ApplicationTimeout timeout : timeouts.values()) {
+ AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
+ timeoutInfo.setExpiryTime(timeout.getExpiryTime());
+ timeoutInfo.setTimeoutType(timeout.getTimeoutType());
+ timeoutInfo.setRemainingTime(timeout.getRemainingTime());
+ timeoutsInfo.add(timeoutInfo);
+ }
+
+ return timeoutsInfo;
+ }
+
+ @Override
+ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, HttpServletRequest hsr,
+ String appId) throws AuthorizationException,
+ YarnException, InterruptedException, IOException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+
+ if (!applicationMap.containsKey(applicationId)) {
+ throw new NotFoundException("app with id: " + appId + " not found");
+ }
+
+ ApplicationReport appReport = applicationMap.get(applicationId);
+ Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
+
+ ApplicationTimeoutType paramTimeoutType = appTimeout.getTimeoutType();
+ if (!timeouts.containsKey(paramTimeoutType)) {
+ throw new NotFoundException("TimeOutType with id: " + appId + " not found");
+ }
+
+ ApplicationTimeout applicationTimeout = timeouts.get(paramTimeoutType);
+ applicationTimeout.setTimeoutType(appTimeout.getTimeoutType());
+ applicationTimeout.setExpiryTime(appTimeout.getExpireTime());
+ applicationTimeout.setRemainingTime(appTimeout.getRemainingTimeInSec());
+
+ AppTimeoutInfo result = new AppTimeoutInfo(applicationTimeout);
+
+ return Response.status(Status.OK).entity(result).build();
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index d3625ff..4bfb8af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -58,10 +59,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.hadoop.yarn.util.Times;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -757,7 +761,7 @@
throws IOException, InterruptedException, YarnException {
// Generate ApplicationId information
- ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
@@ -775,4 +779,80 @@
Assert.assertEquals(124, appAttemptInfo.getRpcPort());
Assert.assertEquals("host", appAttemptInfo.getHost());
}
+
+ @Test
+ public void testGetAppTimeout() throws IOException, InterruptedException, YarnException {
+
+ // Generate ApplicationId information
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Generate ApplicationAttemptId information
+ Assert.assertNotNull(interceptor.submitApplication(context, null));
+
+ ApplicationTimeoutType appTimeoutType = ApplicationTimeoutType.LIFETIME;
+ AppTimeoutInfo appTimeoutInfo =
+ interceptor.getAppTimeout(null, appId.toString(), appTimeoutType.toString());
+ Assert.assertNotNull(appTimeoutInfo);
+ Assert.assertEquals(10, appTimeoutInfo.getRemainingTimeInSec());
+ Assert.assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime());
+ Assert.assertEquals(appTimeoutType, appTimeoutInfo.getTimeoutType());
+ }
+
+ @Test
+ public void testGetAppTimeouts() throws IOException, InterruptedException, YarnException {
+
+ // Generate ApplicationId information
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Generate ApplicationAttemptId information
+ Assert.assertNotNull(interceptor.submitApplication(context, null));
+
+ AppTimeoutsInfo appTimeoutsInfo = interceptor.getAppTimeouts(null, appId.toString());
+ Assert.assertNotNull(appTimeoutsInfo);
+
+ List<AppTimeoutInfo> timeouts = appTimeoutsInfo.getAppTimeouts();
+ Assert.assertNotNull(timeouts);
+ Assert.assertEquals(1, timeouts.size());
+
+ AppTimeoutInfo resultAppTimeout = timeouts.get(0);
+ Assert.assertNotNull(resultAppTimeout);
+ Assert.assertEquals(10, resultAppTimeout.getRemainingTimeInSec());
+ Assert.assertEquals("UNLIMITED", resultAppTimeout.getExpireTime());
+ Assert.assertEquals(ApplicationTimeoutType.LIFETIME, resultAppTimeout.getTimeoutType());
+ }
+
+ @Test
+ public void testUpdateApplicationTimeout() throws IOException, InterruptedException,
+ YarnException {
+
+ // Generate ApplicationId information
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Generate ApplicationAttemptId information
+ Assert.assertNotNull(interceptor.submitApplication(context, null));
+
+ long newLifetime = 10L;
+ // update 10L seconds more to timeout
+ String timeout = Times.formatISO8601(Time.now() + newLifetime * 1000);
+ AppTimeoutInfo paramAppTimeOut = new AppTimeoutInfo();
+ paramAppTimeOut.setExpiryTime(timeout);
+ // RemainingTime = Math.max((timeoutInMillis - System.currentTimeMillis()) / 1000, 0))
+ paramAppTimeOut.setRemainingTime(newLifetime);
+ paramAppTimeOut.setTimeoutType(ApplicationTimeoutType.LIFETIME);
+
+ Response response =
+ interceptor.updateApplicationTimeout(paramAppTimeOut, null, appId.toString());
+ Assert.assertNotNull(response);
+ AppTimeoutInfo entity = (AppTimeoutInfo) response.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertEquals(paramAppTimeOut.getExpireTime(), entity.getExpireTime());
+ Assert.assertEquals(paramAppTimeOut.getTimeoutType(), entity.getTimeoutType());
+ Assert.assertEquals(paramAppTimeOut.getRemainingTimeInSec(), entity.getRemainingTimeInSec());
+ }
}