YARN-11226. [Federation] Add createNewReservation, submitReservation, updateReservation, deleteReservation REST APIs for Router. (#5175)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index e1ebce8..8c36fba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -1062,4 +1062,93 @@ public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
}
+
+ /**
+ * Exists ReservationHomeSubCluster Mapping.
+ *
+ * @param reservationId reservationId
+ * @return true - exist, false - not exist
+ */
+ public boolean existsReservationHomeSubCluster(ReservationId reservationId) {
+ try {
+ SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
+ if (subClusterId != null) {
+ return true;
+ }
+ } catch (YarnException e) {
+ LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
+ }
+ return false;
+ }
+
+ /**
+ * Save Reservation And HomeSubCluster Mapping.
+ *
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public void addReservationHomeSubCluster(ReservationId reservationId,
+ ReservationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ // persist the mapping of reservationId and the subClusterId which has
+ // been selected as its home
+ addReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ String msg = String.format(
+ "Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
+ throw new YarnException(msg, e);
+ }
+ }
+
+ /**
+ * Update Reservation And HomeSubCluster Mapping.
+ *
+ * @param subClusterId subClusterId
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public void updateReservationHomeSubCluster(SubClusterId subClusterId,
+ ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+ updateReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore = getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
+ } else {
+ String msg = String.format(
+ "Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
+ throw new YarnException(msg, e);
+ }
+ }
+ }
+
+ /**
+ * Add or Update ReservationHomeSubCluster.
+ *
+ * @param reservationId reservationId.
+ * @param subClusterId homeSubClusterId, this is selected by strategy.
+ * @param retryCount number of retries.
+ * @throws YarnException yarn exception.
+ */
+ public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId,
+ SubClusterId subClusterId, int retryCount) throws YarnException {
+ Boolean exists = existsReservationHomeSubCluster(reservationId);
+ ReservationHomeSubCluster reservationHomeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+ if (!exists || retryCount == 0) {
+ // persist the mapping of reservationId and the subClusterId which has
+ // been selected as its home.
+ addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
+ } else {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected.
+ updateReservationHomeSubCluster(subClusterId, reservationId,
+ reservationHomeSubCluster);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 8c880f2..8fa6ca2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -27,6 +27,16 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -57,6 +67,8 @@ public final class RouterServerUtil {
private static final String EPOCH_PREFIX = "e";
+ private static final String RESERVEIDSTR_PREFIX = "reservation_";
+
/** Disable constructor. */
private RouterServerUtil() {
}
@@ -494,6 +506,15 @@ public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}
+ /**
+ * Set User information.
+ *
+ * If the username is empty, we will use the Yarn Router user directly.
+ * Do not create a proxy user if userName matches the userName on current UGI.
+ *
+ * @param userName userName.
+ * @return UserGroupInformation.
+ */
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
@@ -513,7 +534,94 @@ public static UserGroupInformation setupUser(final String userName) {
return user;
} catch (IOException e) {
throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
- "Error while creating Router RMAdmin Service for user : %s.", user);
+ "Error while creating Router Service for user : %s.", user);
}
}
+
+ /**
+ * Check reservationId is accurate.
+ *
+ * We need to ensure that reservationId cannot be empty and
+ * can be converted to ReservationId object normally.
+ *
+ * @param reservationId reservationId.
+ * @throws IllegalArgumentException If the format of the reservationId is not accurate,
+ * an IllegalArgumentException needs to be thrown.
+ */
+ @Public
+ @Unstable
+ public static void validateReservationId(String reservationId) throws IllegalArgumentException {
+
+ if (reservationId == null || reservationId.isEmpty()) {
+ throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
+ }
+
+ if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
+ throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
+ }
+
+ String[] resFields = reservationId.split("_");
+ if (resFields.length != 3) {
+ throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
+ }
+
+ String clusterTimestamp = resFields[1];
+ String id = resFields[2];
+ if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
+ throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
+ }
+ }
+
+ /**
+ * Convert ReservationDefinitionInfo to ReservationDefinition.
+ *
+ * @param definitionInfo ReservationDefinitionInfo Object.
+ * @return ReservationDefinition.
+ */
+ public static ReservationDefinition convertReservationDefinition(
+ ReservationDefinitionInfo definitionInfo) {
+ if (definitionInfo == null || definitionInfo.getReservationRequests() == null
+ || definitionInfo.getReservationRequests().getReservationRequest() == null
+ || definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
+ throw new RuntimeException("definitionInfo Or ReservationRequests is Null.");
+ }
+
+ // basic variable
+ long arrival = definitionInfo.getArrival();
+ long deadline = definitionInfo.getDeadline();
+
+ // ReservationRequests reservationRequests
+ String name = definitionInfo.getReservationName();
+ String recurrenceExpression = definitionInfo.getRecurrenceExpression();
+ Priority priority = Priority.newInstance(definitionInfo.getPriority());
+
+ // reservation requests info
+ List<ReservationRequest> reservationRequestList = new ArrayList<>();
+
+ ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();
+
+ List<ReservationRequestInfo> reservationRequestInfos =
+ reservationRequestsInfo.getReservationRequest();
+
+ for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
+ ResourceInfo resourceInfo = resRequestInfo.getCapability();
+ Resource capability =
+ Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
+ ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
+ resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
+ resRequestInfo.getDuration());
+ reservationRequestList.add(reservationRequest);
+ }
+
+ ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
+ ReservationRequestInterpreter reservationRequestInterpreter =
+ values[reservationRequestsInfo.getReservationRequestsInterpreter()];
+ ReservationRequests reservationRequests = ReservationRequests.newInstance(
+ reservationRequestList, reservationRequestInterpreter);
+
+ ReservationDefinition definition = ReservationDefinition.newInstance(
+ arrival, deadline, reservationRequests, name, recurrenceExpression, priority);
+
+ return definition;
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
index 66b2495..baf931a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
@@ -20,9 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-import java.io.IOException;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
/**
* Extends the RequestInterceptor class and provides common functionality which
@@ -68,7 +66,7 @@ public Configuration getConf() {
*/
@Override
public void init(String userName) {
- setupUser(userName);
+ this.user = RouterServerUtil.setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(userName);
}
@@ -92,34 +90,6 @@ public RESTRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}
- /**
- * Set User information.
- *
- * If the username is empty, we will use the Yarn Router user directly.
- * Do not create a proxy user if user name matches the user name on current UGI.
- * @param userName userName.
- */
- private void setupUser(final String userName) {
- try {
- if (userName == null || userName.isEmpty()) {
- user = UserGroupInformation.getCurrentUser();
- } else if (UserGroupInformation.isSecurityEnabled()) {
- user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
- } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
- user = UserGroupInformation.getCurrentUser();
- } else {
- user = UserGroupInformation.createProxyUser(userName,
- UserGroupInformation.getCurrentUser());
- }
- } catch (IOException e) {
- String message = "Error while creating Router RMAdmin Service for user:";
- if (user != null) {
- message += ", user: " + user;
- }
- throw new YarnRuntimeException(message, e);
- }
- }
-
public UserGroupInformation getUser() {
return user;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 61edfb3..93e7a16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -51,11 +51,13 @@
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -101,6 +103,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
@@ -1588,28 +1591,239 @@ public Response cancelDelegationToken(HttpServletRequest hsr)
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
- throw new NotImplementedException("Code is not implemented");
+ long startTime = clock.getTime();
+ try {
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
+ federationFacade.getSubClusters(true);
+ // We declare blackList and retries.
+ List<SubClusterId> blackList = new ArrayList<>();
+ int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
+ Response response = ((FederationActionRetry<Response>) (retryCount) ->
+ invokeCreateNewReservation(subClustersActive, blackList, hsr, retryCount)).
+ runWithRetries(actualRetryNums, submitIntervalTime);
+ // If the response is not empty and the status is SC_OK,
+ // this request can be returned directly.
+ if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime);
+ return response;
+ }
+ } catch (FederationPolicyException e) {
+ // If a FederationPolicyException is thrown, the service is unavailable.
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+ } catch (Exception e) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
+ }
+
+ // return error message directly.
+ String errMsg = "Fail to create a new reservation.";
+ LOG.error(errMsg);
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+ }
+
+ private Response invokeCreateNewReservation(Map<SubClusterId, SubClusterInfo> subClustersActive,
+ List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+ throws YarnException, IOException, InterruptedException {
+ SubClusterId subClusterId =
+ federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
+ LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, subClusterId);
+ SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterId, subClusterInfo.getRMWebServiceAddress());
+ try {
+ Response response = interceptor.createNewReservation(hsr);
+ if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
+ return response;
+ }
+ } catch (Exception e) {
+ blackList.add(subClusterId);
+ RouterServerUtil.logAndThrowException(e.getMessage(), e);
+ }
+ // We need to throw the exception directly.
+ String msg = String.format("Unable to create a new ReservationId in SubCluster %s.",
+ subClusterId.getId());
+ throw new YarnException(msg);
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
- HttpServletRequest hsr)
- throws AuthorizationException, IOException, InterruptedException {
- throw new NotImplementedException("Code is not implemented");
+ HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
+ long startTime = clock.getTime();
+ if (resContext == null || resContext.getReservationId() == null
+ || resContext.getReservationDefinition() == null || resContext.getQueue() == null) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ String errMsg = "Missing submitReservation resContext or reservationId " +
+ "or reservation definition or queue.";
+ return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+ }
+
+ // Check that the resId format is accurate
+ String resId = resContext.getReservationId();
+ try {
+ RouterServerUtil.validateReservationId(resId);
+ } catch (IllegalArgumentException e) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ throw e;
+ }
+
+ List<SubClusterId> blackList = new ArrayList<>();
+ try {
+ int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
+ int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
+ Response response = ((FederationActionRetry<Response>) (retryCount) ->
+ invokeSubmitReservation(resContext, blackList, hsr, retryCount)).
+ runWithRetries(actualRetryNums, submitIntervalTime);
+ if (response != null) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime);
+ return response;
+ }
+ } catch (Exception e) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+ }
+
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ String msg = String.format("Reservation %s failed to be submitted.", resId);
+ return Response.status(Status.SERVICE_UNAVAILABLE).entity(msg).build();
+ }
+
+ private Response invokeSubmitReservation(ReservationSubmissionRequestInfo requestContext,
+ List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+ throws YarnException, IOException, InterruptedException {
+ String resId = requestContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+ ReservationDefinitionInfo definitionInfo = requestContext.getReservationDefinition();
+ ReservationDefinition definition =
+ RouterServerUtil.convertReservationDefinition(definitionInfo);
+
+ // First, Get SubClusterId according to specific strategy.
+ ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
+ definition, requestContext.getQueue(), reservationId);
+ SubClusterId subClusterId = null;
+
+ try {
+ // Get subClusterId from policy.
+ subClusterId = policyFacade.getReservationHomeSubCluster(request);
+
+ // Print the log of submitting the submitApplication.
+ LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.", reservationId,
+ retryCount, subClusterId);
+
+ // Step2. We Store the mapping relationship
+ // between Application and HomeSubCluster in stateStore.
+ federationFacade.addOrUpdateReservationHomeSubCluster(reservationId,
+ subClusterId, retryCount);
+
+ // Step3. We get subClusterInfo based on subClusterId.
+ SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
+
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+ HttpServletRequest hsrCopy = clone(hsr);
+ Response response = interceptor.submitReservation(requestContext, hsrCopy);
+ if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
+ LOG.info("Reservation {} submitted on subCluster {}.", reservationId, subClusterId);
+ return response;
+ }
+ String msg = String.format("application %s failed to be submitted.", resId);
+ throw new YarnException(msg);
+ } catch (Exception e) {
+ LOG.warn("Unable to submit the reservation {} to SubCluster {}.", resId,
+ subClusterId, e);
+ if (subClusterId != null) {
+ blackList.add(subClusterId);
+ }
+ throw e;
+ }
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
- HttpServletRequest hsr)
- throws AuthorizationException, IOException, InterruptedException {
- throw new NotImplementedException("Code is not implemented");
+ HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
+
+ // parameter verification
+ if (resContext == null || resContext.getReservationId() == null
+ || resContext.getReservationDefinition() == null) {
+ routerMetrics.incrUpdateReservationFailedRetrieved();
+ String errMsg = "Missing updateReservation resContext or reservationId " +
+ "or reservation definition.";
+ return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+ }
+
+ // get reservationId
+ String reservationId = resContext.getReservationId();
+
+ // Check that the reservationId format is accurate
+ try {
+ RouterServerUtil.validateReservationId(reservationId);
+ } catch (IllegalArgumentException e) {
+ routerMetrics.incrUpdateReservationFailedRetrieved();
+ throw e;
+ }
+
+ try {
+ SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+ HttpServletRequest hsrCopy = clone(hsr);
+ Response response = interceptor.updateReservation(resContext, hsrCopy);
+ if (response != null) {
+ return response;
+ }
+ } catch (Exception e) {
+ routerMetrics.incrUpdateReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowRunTimeException("updateReservation Failed.", e);
+ }
+
+ // throw an exception
+ routerMetrics.incrUpdateReservationFailedRetrieved();
+ throw new YarnRuntimeException("updateReservation Failed, reservationId = " + reservationId);
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
- throw new NotImplementedException("Code is not implemented");
+
+ // parameter verification
+ if (resContext == null || resContext.getReservationId() == null) {
+ routerMetrics.incrDeleteReservationFailedRetrieved();
+ String errMsg = "Missing deleteReservation request or reservationId.";
+ return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+ }
+
+ // get ReservationId
+ String reservationId = resContext.getReservationId();
+
+ // Check that the reservationId format is accurate
+ try {
+ RouterServerUtil.validateReservationId(reservationId);
+ } catch (IllegalArgumentException e) {
+ routerMetrics.incrDeleteReservationFailedRetrieved();
+ throw e;
+ }
+
+ try {
+ SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+ HttpServletRequest hsrCopy = clone(hsr);
+ Response response = interceptor.deleteReservation(resContext, hsrCopy);
+ if (response != null) {
+ return response;
+ }
+ } catch (Exception e) {
+ routerMetrics.incrDeleteReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowRunTimeException("deleteReservation Failed.", e);
+ }
+
+ // throw an exception
+ routerMetrics.incrDeleteReservationFailedRetrieved();
+ throw new YarnRuntimeException("deleteReservation Failed, reservationId = " + reservationId);
}
@Override
@@ -1627,9 +1841,9 @@ public Response listReservation(String queue, String reservationId,
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}
- // Check that the appId format is accurate
+ // Check that the reservationId format is accurate
try {
- ReservationId.parseReservationId(reservationId);
+ RouterServerUtil.validateReservationId(reservationId);
} catch (IllegalArgumentException e) {
routerMetrics.incrListReservationFailedRetrieved();
throw e;
@@ -2190,6 +2404,10 @@ public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
}
@VisibleForTesting
+ public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
+ return interceptors;
+ }
+
public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java
new file mode 100644
index 0000000..e82f67d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.hadoop.yarn.server.router;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.hadoop.yarn.server.router.webapp.TestFederationInterceptorREST.getReservationSubmissionRequestInfo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestRouterServerUtil {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TestRouterServerUtil.class);
+
+ @Test
+ public void testConvertReservationDefinition() {
+ // Prepare parameters
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+ ReservationSubmissionRequestInfo requestInfo =
+ getReservationSubmissionRequestInfo(reservationId);
+ ReservationDefinitionInfo expectDefinitionInfo = requestInfo.getReservationDefinition();
+
+ // ReservationDefinitionInfo conversion ReservationDefinition
+ ReservationDefinition convertDefinition =
+ RouterServerUtil.convertReservationDefinition(expectDefinitionInfo);
+
+ // reservationDefinition is not null
+ assertNotNull(convertDefinition);
+ assertEquals(expectDefinitionInfo.getArrival(), convertDefinition.getArrival());
+ assertEquals(expectDefinitionInfo.getDeadline(), convertDefinition.getDeadline());
+
+ Priority priority = convertDefinition.getPriority();
+ assertNotNull(priority);
+ assertEquals(expectDefinitionInfo.getPriority(), priority.getPriority());
+ assertEquals(expectDefinitionInfo.getRecurrenceExpression(),
+ convertDefinition.getRecurrenceExpression());
+ assertEquals(expectDefinitionInfo.getReservationName(), convertDefinition.getReservationName());
+
+ ReservationRequestsInfo expectRequestsInfo = expectDefinitionInfo.getReservationRequests();
+ List<ReservationRequestInfo> expectRequestsInfoList =
+ expectRequestsInfo.getReservationRequest();
+
+ ReservationRequests convertReservationRequests =
+ convertDefinition.getReservationRequests();
+ assertNotNull(convertReservationRequests);
+
+ List<ReservationRequest> convertRequestList =
+ convertReservationRequests.getReservationResources();
+ assertNotNull(convertRequestList);
+ assertEquals(1, convertRequestList.size());
+
+ ReservationRequestInfo expectResRequestInfo = expectRequestsInfoList.get(0);
+ ReservationRequest convertResRequest = convertRequestList.get(0);
+ assertNotNull(convertResRequest);
+ assertEquals(expectResRequestInfo.getNumContainers(), convertResRequest.getNumContainers());
+ assertEquals(expectResRequestInfo.getDuration(), convertResRequest.getDuration());
+
+ ResourceInfo expectResourceInfo = expectResRequestInfo.getCapability();
+ Resource convertResource = convertResRequest.getCapability();
+ assertNotNull(expectResourceInfo);
+ assertEquals(expectResourceInfo.getMemorySize(), convertResource.getMemorySize());
+ assertEquals(expectResourceInfo.getvCores(), convertResource.getVirtualCores());
+ }
+
+ @Test
+ public void testConvertReservationDefinitionEmpty() throws Exception {
+
+ // param ReservationDefinitionInfo is Null
+ ReservationDefinitionInfo definitionInfo = null;
+
+ // null request1
+ LambdaTestUtils.intercept(RuntimeException.class,
+ "definitionInfo Or ReservationRequests is Null.",
+ () -> RouterServerUtil.convertReservationDefinition(definitionInfo));
+
+ // param ReservationRequests is Null
+ ReservationDefinitionInfo definitionInfo2 = new ReservationDefinitionInfo();
+
+ // null request2
+ LambdaTestUtils.intercept(RuntimeException.class,
+ "definitionInfo Or ReservationRequests is Null.",
+ () -> RouterServerUtil.convertReservationDefinition(definitionInfo2));
+
+ // param ReservationRequests is Null
+ ReservationDefinitionInfo definitionInfo3 = new ReservationDefinitionInfo();
+ ReservationRequestsInfo requestsInfo = new ReservationRequestsInfo();
+ definitionInfo3.setReservationRequests(requestsInfo);
+
+ // null request3
+ LambdaTestUtils.intercept(RuntimeException.class,
+ "definitionInfo Or ReservationRequests is Null.",
+ () -> RouterServerUtil.convertReservationDefinition(definitionInfo3));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
index a4294bc..9b086e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
@@ -74,10 +75,17 @@ public abstract class BaseRouterWebServicesTest {
private Router router;
public final static int TEST_MAX_CACHE_SIZE = 10;
+ public static final String QUEUE_DEFAULT = "default";
+ public static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
+ CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
+ public static final String QUEUE_DEDICATED = "dedicated";
+ public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
+ CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
+
private RouterWebServices routerWebService;
@Before
- public void setUp() {
+ public void setUp() throws YarnException, IOException {
this.conf = createConfiguration();
router = spy(new Router());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 7f73434..e2ac5fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -27,7 +27,9 @@
import java.util.HashMap;
import java.util.Collections;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -40,6 +42,7 @@
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
@@ -47,6 +50,11 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -65,10 +73,11 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,7 +87,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -117,10 +125,19 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -134,6 +151,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -153,19 +175,16 @@ public class MockDefaultRequestInterceptorREST
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";
- private static final String QUEUE_DEFAULT = "default";
- private static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
- CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
- private static final String QUEUE_DEDICATED = "dedicated";
- public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
- CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
-
// duration(milliseconds), 1mins
public static final long DURATION = 60*1000;
// Containers 4
public static final int NUM_CONTAINERS = 4;
+ private Map<ReservationId, SubClusterId> reservationMap = new HashMap<>();
+ private AtomicLong resCounter = new AtomicLong();
+ private MockRM mockRM = null;
+
private void validateRunning() throws ConnectException {
if (!isRunning) {
throw new ConnectException("RM is stopped");
@@ -859,44 +878,191 @@ public Response listReservation(String queue, String reservationId, long startTi
" Please try again with a valid reservable queue.");
}
- MockRM mockRM = setupResourceManager();
+ ReservationId reservationID =
+ ReservationId.parseReservationId(reservationId);
- ReservationId reservationID = ReservationId.parseReservationId(reservationId);
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ if (!reservationMap.containsKey(reservationID)) {
+ throw new NotFoundException("reservationId with id: " + reservationId + " not found");
+ }
- // Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
- // arrival time from which the resource(s) can be allocated.
- long arrival = Time.now();
-
- // deadline by when the resource(s) must be allocated.
- // The reason for choosing 1.05 is because this gives an integer
- // DURATION * 0.05 = 3000(ms)
- // deadline = arrival + 3000ms
- long deadline = (long) (arrival + 1.05 * DURATION);
-
- // In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory)
- // arrival = Time.now(), and make sure deadline - arrival > duration,
- // the current setting is greater than 3000ms
- ReservationSubmissionRequest submissionRequest =
- ReservationSystemTestUtil.createSimpleReservationRequest(
- reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
- clientService.submitReservation(submissionRequest);
-
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
- queue, reservationID.toString(), startTime, endTime, includeResourceAllocations);
+ queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo = clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
- if (mockRM != null) {
- mockRM.stop();
+ return Response.status(Status.OK).entity(resResponse).build();
+ }
+
+ @Override
+ public Response createNewReservation(HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
}
- return Response.status(Status.OK).entity(resResponse).build();
+ ReservationId resId = ReservationId.newInstance(Time.now(), resCounter.incrementAndGet());
+ LOG.info("Allocated new reservationId: {}.", resId);
+
+ NewReservation reservationId = new NewReservation(resId.toString());
+ return Response.status(Status.OK).entity(reservationId).build();
+ }
+
+ @Override
+ public Response submitReservation(ReservationSubmissionRequestInfo resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ReservationId reservationId = ReservationId.parseReservationId(resContext.getReservationId());
+ ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition();
+ ReservationDefinition definition =
+ RouterServerUtil.convertReservationDefinition(definitionInfo);
+ ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
+ definition, resContext.getQueue(), reservationId);
+ submitReservation(request);
+
+ LOG.info("Reservation submitted: {}.", reservationId);
+
+ SubClusterId subClusterId = getSubClusterId();
+ reservationMap.put(reservationId, subClusterId);
+
+ return Response.status(Status.ACCEPTED).build();
+ }
+
+ private void submitReservation(ReservationSubmissionRequest request) {
+ try {
+ // synchronize plan
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ // Generate reserved resources
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.submitReservation(request);
+ } catch (IOException | YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Response updateReservation(ReservationUpdateRequestInfo resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
+
+ if (resContext == null || resContext.getReservationId() == null ||
+ resContext.getReservationDefinition() == null) {
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+
+ String resId = resContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+ if (!reservationMap.containsKey(reservationId)) {
+ throw new NotFoundException("reservationId with id: " + reservationId + " not found");
+ }
+
+ // Generate reserved resources
+ updateReservation(resContext);
+
+ ReservationUpdateResponseInfo resRespInfo = new ReservationUpdateResponseInfo();
+ return Response.status(Status.OK).entity(resRespInfo).build();
+ }
+
+ private void updateReservation(ReservationUpdateRequestInfo resContext) throws IOException {
+
+ if (resContext == null) {
+ throw new BadRequestException("Input ReservationSubmissionContext should not be null");
+ }
+
+ ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
+ if (resInfo == null) {
+ throw new BadRequestException("Input ReservationDefinition should not be null");
+ }
+
+ ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+ if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+ || resReqsInfo.getReservationRequest().isEmpty()) {
+ throw new BadRequestException("The ReservationDefinition should " +
+ "contain at least one ReservationRequest");
+ }
+
+ if (resContext.getReservationId() == null) {
+ throw new BadRequestException("Update operations must specify an existing ReservaitonId");
+ }
+
+ ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
+ ReservationRequestInterpreter requestInterpreter =
+ values[resReqsInfo.getReservationRequestsInterpreter()];
+ List<ReservationRequest> list = new ArrayList<>();
+
+ for (ReservationRequestInfo resReqInfo : resReqsInfo.getReservationRequest()) {
+ ResourceInfo rInfo = resReqInfo.getCapability();
+ Resource capability = Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
+ int numContainers = resReqInfo.getNumContainers();
+ int minConcurrency = resReqInfo.getMinConcurrency();
+ long duration = resReqInfo.getDuration();
+ ReservationRequest rr = ReservationRequest.newInstance(
+ capability, numContainers, minConcurrency, duration);
+ list.add(rr);
+ }
+
+ ReservationRequests reqs = ReservationRequests.newInstance(list, requestInterpreter);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(
+ resInfo.getArrival(), resInfo.getDeadline(), reqs,
+ resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
+ Priority.newInstance(resInfo.getPriority()));
+ ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
+ rDef, ReservationId.parseReservationId(resContext.getReservationId()));
+
+ ClientRMService clientService = mockRM.getClientRMService();
+ try {
+ clientService.updateReservation(request);
+ } catch (YarnException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public Response deleteReservation(ReservationDeleteRequestInfo resContext, HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ try {
+ String resId = resContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+ if (!reservationMap.containsKey(reservationId)) {
+ throw new NotFoundException("reservationId with id: " + reservationId + " not found");
+ }
+
+ ReservationDeleteRequest reservationDeleteRequest =
+ ReservationDeleteRequest.newInstance(reservationId);
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.deleteReservation(reservationDeleteRequest);
+
+ ReservationDeleteResponseInfo resRespInfo = new ReservationDeleteResponseInfo();
+ reservationMap.remove(reservationId);
+
+ return Response.status(Status.OK).entity(resRespInfo).build();
+ } catch (YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ public MockRM getMockRM() {
+ return mockRM;
+ }
+
+ @VisibleForTesting
+ public void setMockRM(MockRM mockResourceManager) {
+ this.mockRM = mockResourceManager;
}
@Override
@@ -939,7 +1105,7 @@ private MockRM setupResourceManager() throws Exception {
public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
String queueAclType, HttpServletRequest hsr) throws AuthorizationException {
- ResourceManager mockRM = mock(ResourceManager.class);
+ ResourceManager mockResourceManager = mock(ResourceManager.class);
Configuration conf = new YarnConfiguration();
ResourceScheduler mockScheduler = new CapacityScheduler() {
@@ -959,8 +1125,9 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
}
};
- when(mockRM.getResourceScheduler()).thenReturn(mockScheduler);
- MockRMWebServices webSvc = new MockRMWebServices(mockRM, conf, mock(HttpServletResponse.class));
+ when(mockResourceManager.getResourceScheduler()).thenReturn(mockScheduler);
+ MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf,
+ mock(HttpServletResponse.class));
return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index 7c82e71..14533d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -45,6 +46,11 @@
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -59,8 +65,6 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
-import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
-import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -86,6 +90,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
@@ -94,6 +99,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
@@ -106,7 +114,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL;
+import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION;
+import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -130,7 +139,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
private List<SubClusterId> subClusters;
@Override
- public void setUp() {
+ public void setUp() throws YarnException, IOException {
super.setUpConfig();
interceptor = new TestableFederationInterceptorREST();
@@ -156,6 +165,13 @@ public void setUp() {
Assert.fail();
}
+ for (SubClusterId subCluster : subClusters) {
+ SubClusterInfo subClusterInfo = stateStoreUtil.querySubClusterInfo(subCluster);
+ interceptor.getOrCreateInterceptorForSubCluster(
+ subCluster, subClusterInfo.getRMWebServiceAddress());
+ }
+
+ interceptor.setupResourceManager();
}
@Override
@@ -1100,14 +1116,9 @@ public void testGetAppActivities() throws IOException, InterruptedException {
@Test
public void testListReservation() throws Exception {
- // Add ReservationId In stateStore
+ // submitReservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
- SubClusterId homeSubClusterId = subClusters.get(0);
- ReservationHomeSubCluster reservationHomeSubCluster =
- ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
- AddReservationHomeSubClusterRequest request =
- AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
- stateStore.addReservationHomeSubCluster(request);
+ submitReservation(reservationId);
// Call the listReservation method
String applyReservationId = reservationId.toString();
@@ -1158,6 +1169,199 @@ public void testListReservation() throws Exception {
}
@Test
+ public void testCreateNewReservation() throws Exception {
+ Response response = interceptor.createNewReservation(null);
+ Assert.assertNotNull(response);
+
+ Object entity = response.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertTrue(entity instanceof NewReservation);
+
+ NewReservation newReservation = (NewReservation) entity;
+ Assert.assertNotNull(newReservation);
+ Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
+ }
+
+ @Test
+ public void testSubmitReservation() throws Exception {
+
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ String applyReservationId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ Object entity = reservationResponse.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+ ReservationListInfo listInfo = (ReservationListInfo) entity;
+ Assert.assertNotNull(listInfo);
+
+ List<ReservationInfo> reservationInfos = listInfo.getReservations();
+ Assert.assertNotNull(reservationInfos);
+ Assert.assertEquals(1, reservationInfos.size());
+
+ ReservationInfo reservationInfo = reservationInfos.get(0);
+ Assert.assertNotNull(reservationInfo);
+ Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId);
+ }
+
+ @Test
+ public void testUpdateReservation() throws Exception {
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ // update reservation
+ ReservationSubmissionRequest resSubRequest =
+ getReservationSubmissionRequest(reservationId, 6, 2048, 2);
+ ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition();
+ ReservationDefinitionInfo reservationDefinitionInfo =
+ new ReservationDefinitionInfo(reservationDefinition);
+
+ ReservationUpdateRequestInfo updateRequestInfo = new ReservationUpdateRequestInfo();
+ updateRequestInfo.setReservationId(reservationId.toString());
+ updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+ Response updateReservationResp = interceptor.updateReservation(updateRequestInfo, null);
+ Assert.assertNotNull(updateReservationResp);
+ Assert.assertEquals(Status.OK.getStatusCode(), updateReservationResp.getStatus());
+
+ String applyReservationId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ Object entity = reservationResponse.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+ ReservationListInfo listInfo = (ReservationListInfo) entity;
+ Assert.assertNotNull(listInfo);
+
+ List<ReservationInfo> reservationInfos = listInfo.getReservations();
+ Assert.assertNotNull(reservationInfos);
+ Assert.assertEquals(1, reservationInfos.size());
+
+ ReservationInfo reservationInfo = reservationInfos.get(0);
+ Assert.assertNotNull(reservationInfo);
+ Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId);
+
+ ReservationDefinitionInfo resDefinitionInfo = reservationInfo.getReservationDefinition();
+ Assert.assertNotNull(resDefinitionInfo);
+
+ ReservationRequestsInfo reservationRequestsInfo = resDefinitionInfo.getReservationRequests();
+ Assert.assertNotNull(reservationRequestsInfo);
+
+ ArrayList<ReservationRequestInfo> reservationRequestInfoList =
+ reservationRequestsInfo.getReservationRequest();
+ Assert.assertNotNull(reservationRequestInfoList);
+ Assert.assertEquals(1, reservationRequestInfoList.size());
+
+ ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0);
+ Assert.assertNotNull(reservationRequestInfo);
+ Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
+
+ ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
+ Assert.assertNotNull(resourceInfo);
+
+ int vCore = resourceInfo.getvCores();
+ long memory = resourceInfo.getMemorySize();
+ Assert.assertEquals(2, vCore);
+ Assert.assertEquals(2048, memory);
+ }
+
+ @Test
+ public void testDeleteReservation() throws Exception {
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ String applyResId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ ReservationDeleteRequestInfo deleteRequestInfo =
+ new ReservationDeleteRequestInfo();
+ deleteRequestInfo.setReservationId(applyResId);
+ Response delResponse = interceptor.deleteReservation(deleteRequestInfo, null);
+ Assert.assertNotNull(delResponse);
+
+ LambdaTestUtils.intercept(Exception.class,
+ "reservationId with id: " + reservationId + " not found",
+ () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null));
+ }
+
+ private Response submitReservation(ReservationId reservationId)
+ throws IOException, InterruptedException {
+ ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+ getReservationSubmissionRequestInfo(reservationId);
+ Response response = interceptor.submitReservation(resSubmissionRequestInfo, null);
+ return response;
+ }
+
+ public static ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
+ ReservationId reservationId) {
+
+ ReservationSubmissionRequest resSubRequest =
+ getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 1);
+ ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition();
+
+ ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+ new ReservationSubmissionRequestInfo();
+ resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
+ resSubmissionRequestInfo.setReservationId(reservationId.toString());
+ ReservationDefinitionInfo reservationDefinitionInfo =
+ new ReservationDefinitionInfo(reservationDefinition);
+ resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+
+ return resSubmissionRequestInfo;
+ }
+
+ public static ReservationSubmissionRequest getReservationSubmissionRequest(
+ ReservationId reservationId, int numContainers, int memory, int vcore) {
+
+ // arrival time from which the resource(s) can be allocated.
+ long arrival = Time.now();
+
+ // deadline by when the resource(s) must be allocated.
+ // The reason for choosing 1.05 is because this gives an integer
+ // DURATION * 0.05 = 3000(ms)
+ // deadline = arrival + 3000ms
+ long deadline = (long) (arrival + 1.05 * DURATION);
+
+ ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest(
+ reservationId, numContainers, arrival, deadline, DURATION, memory, vcore);
+
+ return submissionRequest;
+ }
+
+ public static ReservationSubmissionRequest createSimpleReservationRequest(
+ ReservationId reservationId, int numContainers, long arrival,
+ long deadline, long duration, int memory, int vcore) {
+ // create a request with a single atomic ask
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(memory, vcore), numContainers, 1, duration);
+ ReservationRequests reqs = ReservationRequests.newInstance(
+ Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(
+ arrival, deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED);
+ ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
+ rDef, QUEUE_DEDICATED_FULL, reservationId);
+ return request;
+ }
+
+ @Test
public void testWebAddressWithScheme() {
// The style of the web address reported by the subCluster in the heartbeat is 0.0.0.0:8000
// We design the following 2 test cases:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
index 7126ca5..31fd756 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
@@ -18,10 +18,25 @@
package org.apache.hadoop.yarn.server.router.webapp;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
+import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
/**
* Extends the FederationInterceptorREST and overrides methods to provide a
@@ -30,7 +45,11 @@
public class TestableFederationInterceptorREST
extends FederationInterceptorREST {
- private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
+ private List<SubClusterId> badSubCluster = new ArrayList<>();
+ private MockRM mockRM = null;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestableFederationInterceptorREST.class);
/**
* For testing purpose, some subclusters has to be down to simulate particular
@@ -51,4 +70,51 @@ protected void registerBadSubCluster(SubClusterId badSC) {
interceptor.setRunning(false);
}
+ protected void setupResourceManager() throws IOException {
+
+ if (mockRM != null) {
+ return;
+ }
+
+ try {
+
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+
+ // Define default queue
+ conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
+ // Define dedicated queues
+ String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED};
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues);
+ conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
+ conf.setReservable(QUEUE_DEDICATED_FULL, true);
+
+ conf.setClass(YarnConfiguration.RM_SCHEDULER,
+ CapacityScheduler.class, ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
+
+ mockRM = new MockRM(conf);
+ mockRM.start();
+ mockRM.registerNode("127.0.0.1:5678", 100*1024, 100);
+
+ Map<SubClusterId, DefaultRequestInterceptorREST> interceptors = super.getInterceptors();
+ for (DefaultRequestInterceptorREST item : interceptors.values()) {
+ MockDefaultRequestInterceptorREST interceptor = (MockDefaultRequestInterceptorREST) item;
+ interceptor.setMockRM(mockRM);
+ }
+ } catch (Exception e) {
+ LOG.error("setupResourceManager failed.", e);
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (mockRM != null) {
+ mockRM.stop();
+ mockRM = null;
+ }
+ super.shutdown();
+ }
}
\ No newline at end of file