| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.router.webapp; |
| |
| import java.io.IOException; |
| import java.security.Principal; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletionService; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletRequestWrapper; |
| import javax.servlet.http.HttpServletResponse; |
| import javax.ws.rs.core.HttpHeaders; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.Response.Status; |
| |
| import org.apache.commons.lang.NotImplementedException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.authorize.AuthorizationException; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; |
| import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; |
| import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; |
| import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; |
| import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; |
| import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; |
| import org.apache.hadoop.yarn.server.router.RouterMetrics; |
| import org.apache.hadoop.yarn.server.router.RouterServerUtil; |
| import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; |
| import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; |
| import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.MonotonicClock; |
| import org.apache.hadoop.yarn.webapp.NotFoundException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * Extends the {@code AbstractRESTRequestInterceptor} class and provides an |
| * implementation for federation of YARN RM and scaling an application across |
| * multiple YARN SubClusters. All the federation specific implementation is |
| * encapsulated in this class. This is always the last intercepter in the chain. |
| */ |
| public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(FederationInterceptorREST.class); |
| |
| private int numSubmitRetries; |
| private FederationStateStoreFacade federationFacade; |
| private Random rand; |
| private RouterPolicyFacade policyFacade; |
| private RouterMetrics routerMetrics; |
| private final Clock clock = new MonotonicClock(); |
| private boolean returnPartialReport; |
| |
| private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors; |
| |
| /** |
| * Thread pool used for asynchronous operations. |
| */ |
| private ExecutorService threadpool; |
| |
| @Override |
| public void init(String user) { |
| federationFacade = FederationStateStoreFacade.getInstance(); |
| rand = new Random(); |
| |
| final Configuration conf = this.getConf(); |
| |
| try { |
| SubClusterResolver subClusterResolver = |
| this.federationFacade.getSubClusterResolver(); |
| policyFacade = new RouterPolicyFacade( |
| conf, federationFacade, subClusterResolver, null); |
| } catch (FederationPolicyInitializationException e) { |
| throw new YarnRuntimeException(e); |
| } |
| |
| numSubmitRetries = conf.getInt( |
| YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, |
| YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); |
| |
| interceptors = new HashMap<>(); |
| routerMetrics = RouterMetrics.getMetrics(); |
| threadpool = HadoopExecutors.newCachedThreadPool( |
| new ThreadFactoryBuilder() |
| .setNameFormat("FederationInterceptorREST #%d") |
| .build()); |
| |
| returnPartialReport = conf.getBoolean( |
| YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, |
| YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); |
| } |
| |
| private SubClusterId getRandomActiveSubCluster( |
| Map<SubClusterId, SubClusterInfo> activeSubclusters, |
| List<SubClusterId> blackListSubClusters) throws YarnException { |
| |
| if (activeSubclusters == null || activeSubclusters.size() < 1) { |
| RouterServerUtil.logAndThrowException( |
| FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); |
| } |
| List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); |
| |
| FederationPolicyUtils.validateSubClusterAvailability( |
| list, blackListSubClusters); |
| |
| if (blackListSubClusters != null) { |
| |
| // Remove from the active SubClusters from StateStore the blacklisted ones |
| for (SubClusterId scId : blackListSubClusters) { |
| list.remove(scId); |
| } |
| } |
| |
| return list.get(rand.nextInt(list.size())); |
| } |
| |
| @VisibleForTesting |
| protected DefaultRequestInterceptorREST getInterceptorForSubCluster( |
| SubClusterId subClusterId) { |
| if (interceptors.containsKey(subClusterId)) { |
| return interceptors.get(subClusterId); |
| } else { |
| LOG.error( |
| "The interceptor for SubCluster {} does not exist in the cache.", |
| subClusterId); |
| return null; |
| } |
| } |
| |
| private DefaultRequestInterceptorREST createInterceptorForSubCluster( |
| SubClusterId subClusterId, String webAppAddress) { |
| |
| final Configuration conf = this.getConf(); |
| |
| String interceptorClassName = conf.get( |
| YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, |
| YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); |
| DefaultRequestInterceptorREST interceptorInstance = null; |
| try { |
| Class<?> interceptorClass = conf.getClassByName(interceptorClassName); |
| if (DefaultRequestInterceptorREST.class |
| .isAssignableFrom(interceptorClass)) { |
| interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils |
| .newInstance(interceptorClass, conf); |
| |
| } else { |
| throw new YarnRuntimeException( |
| "Class: " + interceptorClassName + " not instance of " |
| + DefaultRequestInterceptorREST.class.getCanonicalName()); |
| } |
| } catch (ClassNotFoundException e) { |
| throw new YarnRuntimeException( |
| "Could not instantiate ApplicationMasterRequestInterceptor: " |
| + interceptorClassName, |
| e); |
| } |
| |
| interceptorInstance.setWebAppAddress("http://" + webAppAddress); |
| interceptorInstance.setSubClusterId(subClusterId); |
| interceptors.put(subClusterId, interceptorInstance); |
| return interceptorInstance; |
| } |
| |
| @VisibleForTesting |
| protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster( |
| SubClusterId subClusterId, String webAppAddress) { |
| DefaultRequestInterceptorREST interceptor = |
| getInterceptorForSubCluster(subClusterId); |
| if (interceptor == null) { |
| interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress); |
| } |
| return interceptor; |
| } |
| |
| /** |
| * Yarn Router forwards every getNewApplication requests to any RM. During |
| * this operation there will be no communication with the State Store. The |
| * Router will forward the requests to any SubCluster. The Router will retry |
| * to submit the request on #numSubmitRetries different SubClusters. The |
| * SubClusters are randomly chosen from the active ones. |
| * <p> |
| * Possible failures and behaviors: |
| * <p> |
| * Client: identical behavior as {@code RMWebServices}. |
| * <p> |
| * Router: the Client will timeout and resubmit. |
| * <p> |
| * ResourceManager: the Router will timeout and contacts another RM. |
| * <p> |
| * StateStore: not in the execution. |
| */ |
| @Override |
| public Response createNewApplication(HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException { |
| |
| long startTime = clock.getTime(); |
| |
| Map<SubClusterId, SubClusterInfo> subClustersActive; |
| try { |
| subClustersActive = federationFacade.getSubClusters(true); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedCreated(); |
| return Response.status(Status.INTERNAL_SERVER_ERROR) |
| .entity(e.getLocalizedMessage()).build(); |
| } |
| |
| List<SubClusterId> blacklist = new ArrayList<SubClusterId>(); |
| |
| for (int i = 0; i < numSubmitRetries; ++i) { |
| |
| SubClusterId subClusterId; |
| try { |
| subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedCreated(); |
| return Response.status(Status.SERVICE_UNAVAILABLE) |
| .entity(e.getLocalizedMessage()).build(); |
| } |
| |
| LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); |
| |
| DefaultRequestInterceptorREST interceptor = |
| getOrCreateInterceptorForSubCluster(subClusterId, |
| subClustersActive.get(subClusterId).getRMWebServiceAddress()); |
| Response response = null; |
| try { |
| response = interceptor.createNewApplication(hsr); |
| } catch (Exception e) { |
| LOG.warn("Unable to create a new ApplicationId in SubCluster {}", |
| subClusterId.getId(), e); |
| } |
| |
| if (response != null && |
| response.getStatus() == HttpServletResponse.SC_OK) { |
| |
| long stopTime = clock.getTime(); |
| routerMetrics.succeededAppsCreated(stopTime - startTime); |
| |
| return response; |
| } else { |
| // Empty response from the ResourceManager. |
| // Blacklist this subcluster for this request. |
| blacklist.add(subClusterId); |
| } |
| } |
| |
| String errMsg = "Fail to create a new application."; |
| LOG.error(errMsg); |
| routerMetrics.incrAppsFailedCreated(); |
| return Response |
| .status(Status.INTERNAL_SERVER_ERROR) |
| .entity(errMsg) |
| .build(); |
| } |
| |
| /** |
| * Today, in YARN there are no checks of any applicationId submitted. |
| * <p> |
| * Base scenarios: |
| * <p> |
| * The Client submits an application to the Router. • The Router selects one |
| * SubCluster to forward the request. • The Router inserts a tuple into |
| * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The |
| * State Store replies with the selected SubCluster (e.g. SC1). • The Router |
| * submits the request to the selected SubCluster. |
| * <p> |
| * In case of State Store failure: |
| * <p> |
| * The client submits an application to the Router. • The Router selects one |
| * SubCluster to forward the request. • The Router inserts a tuple into State |
| * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the |
| * State Store down the Router times out and it will retry depending on the |
| * FederationFacade settings. • The Router replies to the client with an error |
| * message. |
| * <p> |
| * If State Store fails after inserting the tuple: identical behavior as |
| * {@code RMWebServices}. |
| * <p> |
| * In case of Router failure: |
| * <p> |
| * Scenario 1 – Crash before submission to the ResourceManager |
| * <p> |
| * The Client submits an application to the Router. • The Router selects one |
| * SubCluster to forward the request. • The Router inserts a tuple into State |
| * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router |
| * crashes. • The Client timeouts and resubmits the application. • The Router |
| * selects one SubCluster to forward the request. • The Router inserts a tuple |
| * into State Store with the selected SubCluster (e.g. SC2) and the appId. • |
| * Because the tuple is already inserted in the State Store, it returns the |
| * previous selected SubCluster (e.g. SC1). • The Router submits the request |
| * to the selected SubCluster (e.g. SC1). |
| * <p> |
| * Scenario 2 – Crash after submission to the ResourceManager |
| * <p> |
| * • The Client submits an application to the Router. • The Router selects one |
| * SubCluster to forward the request. • The Router inserts a tuple into State |
| * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router |
| * submits the request to the selected SubCluster. • The Router crashes. • The |
| * Client timeouts and resubmit the application. • The Router selects one |
| * SubCluster to forward the request. • The Router inserts a tuple into State |
| * Store with the selected SubCluster (e.g. SC2) and the appId. • The State |
| * Store replies with the selected SubCluster (e.g. SC1). • The Router submits |
| * the request to the selected SubCluster (e.g. SC1). When a client re-submits |
| * the same application to the same RM, it does not raise an exception and |
| * replies with operation successful message. |
| * <p> |
| * In case of Client failure: identical behavior as {@code RMWebServices}. |
| * <p> |
| * In case of ResourceManager failure: |
| * <p> |
| * The Client submits an application to the Router. • The Router selects one |
| * SubCluster to forward the request. • The Router inserts a tuple into State |
| * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router |
| * submits the request to the selected SubCluster. • The entire SubCluster is |
| * down – all the RMs in HA or the master RM is not reachable. • The Router |
| * times out. • The Router selects a new SubCluster to forward the request. • |
| * The Router update a tuple into State Store with the selected SubCluster |
| * (e.g. SC2) and the appId. • The State Store replies with OK answer. • The |
| * Router submits the request to the selected SubCluster (e.g. SC2). |
| */ |
| @Override |
| public Response submitApplication(ApplicationSubmissionContextInfo newApp, |
| HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException { |
| |
| long startTime = clock.getTime(); |
| |
| if (newApp == null || newApp.getApplicationId() == null) { |
| routerMetrics.incrAppsFailedSubmitted(); |
| String errMsg = "Missing ApplicationSubmissionContextInfo or " |
| + "applicationSubmissionContex information."; |
| return Response |
| .status(Status.BAD_REQUEST) |
| .entity(errMsg) |
| .build(); |
| } |
| |
| ApplicationId applicationId = null; |
| try { |
| applicationId = ApplicationId.fromString(newApp.getApplicationId()); |
| } catch (IllegalArgumentException e) { |
| routerMetrics.incrAppsFailedSubmitted(); |
| return Response |
| .status(Status.BAD_REQUEST) |
| .entity(e.getLocalizedMessage()) |
| .build(); |
| } |
| |
| List<SubClusterId> blacklist = new ArrayList<SubClusterId>(); |
| |
| for (int i = 0; i < numSubmitRetries; ++i) { |
| |
| ApplicationSubmissionContext context = |
| RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf()); |
| |
| SubClusterId subClusterId = null; |
| try { |
| subClusterId = policyFacade.getHomeSubcluster(context, blacklist); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedSubmitted(); |
| return Response |
| .status(Status.SERVICE_UNAVAILABLE) |
| .entity(e.getLocalizedMessage()) |
| .build(); |
| } |
| LOG.info("submitApplication appId {} try #{} on SubCluster {}", |
| applicationId, i, subClusterId); |
| |
| ApplicationHomeSubCluster appHomeSubCluster = |
| ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); |
| |
| if (i == 0) { |
| try { |
| // persist the mapping of applicationId and the subClusterId which has |
| // been selected as its home |
| subClusterId = |
| federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedSubmitted(); |
| String errMsg = "Unable to insert the ApplicationId " + applicationId |
| + " into the FederationStateStore"; |
| return Response |
| .status(Status.SERVICE_UNAVAILABLE) |
| .entity(errMsg + " " + e.getLocalizedMessage()) |
| .build(); |
| } |
| } else { |
| try { |
| // update the mapping of applicationId and the home subClusterId to |
| // the new subClusterId we have selected |
| federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); |
| } catch (YarnException e) { |
| String errMsg = "Unable to update the ApplicationId " + applicationId |
| + " into the FederationStateStore"; |
| SubClusterId subClusterIdInStateStore; |
| try { |
| subClusterIdInStateStore = |
| federationFacade.getApplicationHomeSubCluster(applicationId); |
| } catch (YarnException e1) { |
| routerMetrics.incrAppsFailedSubmitted(); |
| return Response |
| .status(Status.SERVICE_UNAVAILABLE) |
| .entity(e1.getLocalizedMessage()) |
| .build(); |
| } |
| if (subClusterId == subClusterIdInStateStore) { |
| LOG.info("Application {} already submitted on SubCluster {}", |
| applicationId, subClusterId); |
| } else { |
| routerMetrics.incrAppsFailedSubmitted(); |
| return Response |
| .status(Status.SERVICE_UNAVAILABLE) |
| .entity(errMsg) |
| .build(); |
| } |
| } |
| } |
| |
| SubClusterInfo subClusterInfo; |
| try { |
| subClusterInfo = federationFacade.getSubCluster(subClusterId); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedSubmitted(); |
| return Response |
| .status(Status.SERVICE_UNAVAILABLE) |
| .entity(e.getLocalizedMessage()) |
| .build(); |
| } |
| |
| Response response = null; |
| try { |
| response = getOrCreateInterceptorForSubCluster(subClusterId, |
| subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, |
| hsr); |
| } catch (Exception e) { |
| LOG.warn("Unable to submit the application {} to SubCluster {}", |
| applicationId, subClusterId.getId(), e); |
| } |
| |
| if (response != null && |
| response.getStatus() == HttpServletResponse.SC_ACCEPTED) { |
| LOG.info("Application {} with appId {} submitted on {}", |
| context.getApplicationName(), applicationId, subClusterId); |
| |
| long stopTime = clock.getTime(); |
| routerMetrics.succeededAppsSubmitted(stopTime - startTime); |
| |
| return response; |
| } else { |
| // Empty response from the ResourceManager. |
| // Blacklist this subcluster for this request. |
| blacklist.add(subClusterId); |
| } |
| } |
| |
| routerMetrics.incrAppsFailedSubmitted(); |
| String errMsg = "Application " + newApp.getApplicationName() |
| + " with appId " + applicationId + " failed to be submitted."; |
| LOG.error(errMsg); |
| return Response |
| .status(Status.SERVICE_UNAVAILABLE) |
| .entity(errMsg) |
| .build(); |
| } |
| |
| /** |
| * The Yarn Router will forward to the respective Yarn RM in which the AM is |
| * running. |
| * <p> |
| * Possible failure: |
| * <p> |
| * Client: identical behavior as {@code RMWebServices}. |
| * <p> |
| * Router: the Client will timeout and resubmit the request. |
| * <p> |
| * ResourceManager: the Router will timeout and the call will fail. |
| * <p> |
| * State Store: the Router will timeout and it will retry depending on the |
| * FederationFacade settings - if the failure happened before the select |
| * operation. |
| */ |
| @Override |
| public AppInfo getApp(HttpServletRequest hsr, String appId, |
| Set<String> unselectedFields) { |
| |
| long startTime = clock.getTime(); |
| |
| ApplicationId applicationId = null; |
| try { |
| applicationId = ApplicationId.fromString(appId); |
| } catch (IllegalArgumentException e) { |
| routerMetrics.incrAppsFailedRetrieved(); |
| return null; |
| } |
| |
| SubClusterInfo subClusterInfo = null; |
| SubClusterId subClusterId = null; |
| try { |
| subClusterId = |
| federationFacade.getApplicationHomeSubCluster(applicationId); |
| if (subClusterId == null) { |
| routerMetrics.incrAppsFailedRetrieved(); |
| return null; |
| } |
| subClusterInfo = federationFacade.getSubCluster(subClusterId); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedRetrieved(); |
| return null; |
| } |
| |
| DefaultRequestInterceptorREST interceptor = |
| getOrCreateInterceptorForSubCluster( |
| subClusterId, subClusterInfo.getRMWebServiceAddress()); |
| AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); |
| |
| long stopTime = clock.getTime(); |
| routerMetrics.succeededAppsRetrieved(stopTime - startTime); |
| |
| return response; |
| } |
| |
| /** |
| * The Yarn Router will forward to the respective Yarn RM in which the AM is |
| * running. |
| * <p> |
| * Possible failures and behaviors: |
| * <p> |
| * Client: identical behavior as {@code RMWebServices}. |
| * <p> |
| * Router: the Client will timeout and resubmit the request. |
| * <p> |
| * ResourceManager: the Router will timeout and the call will fail. |
| * <p> |
| * State Store: the Router will timeout and it will retry depending on the |
| * FederationFacade settings - if the failure happened before the select |
| * operation. |
| */ |
| @Override |
| public Response updateAppState(AppState targetState, HttpServletRequest hsr, |
| String appId) throws AuthorizationException, YarnException, |
| InterruptedException, IOException { |
| |
| long startTime = clock.getTime(); |
| |
| ApplicationId applicationId = null; |
| try { |
| applicationId = ApplicationId.fromString(appId); |
| } catch (IllegalArgumentException e) { |
| routerMetrics.incrAppsFailedKilled(); |
| return Response |
| .status(Status.BAD_REQUEST) |
| .entity(e.getLocalizedMessage()) |
| .build(); |
| } |
| |
| SubClusterInfo subClusterInfo = null; |
| SubClusterId subClusterId = null; |
| try { |
| subClusterId = |
| federationFacade.getApplicationHomeSubCluster(applicationId); |
| subClusterInfo = federationFacade.getSubCluster(subClusterId); |
| } catch (YarnException e) { |
| routerMetrics.incrAppsFailedKilled(); |
| return Response |
| .status(Status.BAD_REQUEST) |
| .entity(e.getLocalizedMessage()) |
| .build(); |
| } |
| |
| Response response = getOrCreateInterceptorForSubCluster(subClusterId, |
| subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState, |
| hsr, appId); |
| |
| long stopTime = clock.getTime(); |
| routerMetrics.succeededAppsRetrieved(stopTime - startTime); |
| |
| return response; |
| } |
| |
| /** |
| * The Yarn Router will forward the request to all the Yarn RMs in parallel, |
| * after that it will group all the ApplicationReports by the ApplicationId. |
| * <p> |
| * Possible failure: |
| * <p> |
| * Client: identical behavior as {@code RMWebServices}. |
| * <p> |
| * Router: the Client will timeout and resubmit the request. |
| * <p> |
| * ResourceManager: the Router calls each Yarn RM in parallel by using one |
| * thread for each Yarn RM. In case a Yarn RM fails, a single call will |
| * timeout. However the Router will merge the ApplicationReports it got, and |
| * provides a partial list to the client. |
| * <p> |
| * State Store: the Router will timeout and it will retry depending on the |
| * FederationFacade settings - if the failure happened before the select |
| * operation. |
| */ |
| @Override |
| public AppsInfo getApps(final HttpServletRequest hsr, final String stateQuery, |
| final Set<String> statesQuery, final String finalStatusQuery, |
| final String userQuery, final String queueQuery, final String count, |
| final String startedBegin, final String startedEnd, |
| final String finishBegin, final String finishEnd, |
| final Set<String> applicationTypes, final Set<String> applicationTags, |
| final Set<String> unselectedFields) { |
| AppsInfo apps = new AppsInfo(); |
| long startTime = clock.getTime(); |
| |
| Map<SubClusterId, SubClusterInfo> subClustersActive = null; |
| try { |
| subClustersActive = federationFacade.getSubClusters(true); |
| } catch (YarnException e) { |
| routerMetrics.incrMultipleAppsFailedRetrieved(); |
| return null; |
| } |
| |
| // Send the requests in parallel |
| CompletionService<AppsInfo> compSvc = |
| new ExecutorCompletionService<>(this.threadpool); |
| |
| // HttpServletRequest does not work with ExecutorCompletionService. |
| // Create a duplicate hsr. |
| final HttpServletRequest hsrCopy = clone(hsr); |
| for (final SubClusterInfo info : subClustersActive.values()) { |
| compSvc.submit(new Callable<AppsInfo>() { |
| @Override |
| public AppsInfo call() { |
| DefaultRequestInterceptorREST interceptor = |
| getOrCreateInterceptorForSubCluster( |
| info.getSubClusterId(), info.getRMWebServiceAddress()); |
| AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, |
| statesQuery, finalStatusQuery, userQuery, queueQuery, count, |
| startedBegin, startedEnd, finishBegin, finishEnd, |
| applicationTypes, applicationTags, unselectedFields); |
| |
| if (rmApps == null) { |
| routerMetrics.incrMultipleAppsFailedRetrieved(); |
| LOG.error("Subcluster {} failed to return appReport.", |
| info.getSubClusterId()); |
| return null; |
| } |
| return rmApps; |
| } |
| }); |
| } |
| |
| // Collect all the responses in parallel |
| for (int i = 0; i < subClustersActive.size(); i++) { |
| try { |
| Future<AppsInfo> future = compSvc.take(); |
| AppsInfo appsResponse = future.get(); |
| |
| long stopTime = clock.getTime(); |
| routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); |
| |
| if (appsResponse != null) { |
| apps.addAll(appsResponse.getApps()); |
| } |
| } catch (Throwable e) { |
| routerMetrics.incrMultipleAppsFailedRetrieved(); |
| LOG.warn("Failed to get application report", e); |
| } |
| } |
| |
| if (apps.getApps().isEmpty()) { |
| return null; |
| } |
| |
| // Merge all the application reports got from all the available Yarn RMs |
| return RouterWebServiceUtil.mergeAppsInfo( |
| apps.getApps(), returnPartialReport); |
| } |
| |
| /** |
| * Get a copy of a HTTP request. This is for thread safety. |
| * @param hsr HTTP servlet request to copy. |
| * @return Copy of the HTTP request. |
| */ |
| private HttpServletRequestWrapper clone(final HttpServletRequest hsr) { |
| if (hsr == null) { |
| return null; |
| } |
| @SuppressWarnings("unchecked") |
| final Map<String, String[]> parameterMap = |
| (Map<String, String[]>) hsr.getParameterMap(); |
| final String pathInfo = hsr.getPathInfo(); |
| final String user = hsr.getRemoteUser(); |
| final Principal principal = hsr.getUserPrincipal(); |
| final String mediaType = |
| RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( |
| hsr, AppsInfo.class); |
| return new HttpServletRequestWrapper(hsr) { |
| @SuppressWarnings("unchecked") |
| public Map<String, String[]> getParameterMap() { |
| return parameterMap; |
| } |
| public String getPathInfo() { |
| return pathInfo; |
| } |
| public String getRemoteUser() { |
| return user; |
| } |
| public Principal getUserPrincipal() { |
| return principal; |
| } |
| public String getHeader(String value) { |
| // we override only Accept |
| if (value.equals(HttpHeaders.ACCEPT)) { |
| return mediaType; |
| } |
| return null; |
| } |
| }; |
| } |
| |
| /** |
| * The Yarn Router will forward to the request to all the SubClusters to find |
| * where the node is running. |
| * <p> |
| * Possible failure: |
| * <p> |
| * Client: identical behavior as {@code RMWebServices}. |
| * <p> |
| * Router: the Client will timeout and resubmit the request. |
| * <p> |
| * ResourceManager: the Router will timeout and the call will fail. |
| * <p> |
| * State Store: the Router will timeout and it will retry depending on the |
| * FederationFacade settings - if the failure happened before the select |
| * operation. |
| */ |
| @Override |
| public NodeInfo getNode(final String nodeId) { |
| Map<SubClusterId, SubClusterInfo> subClustersActive = null; |
| try { |
| subClustersActive = federationFacade.getSubClusters(true); |
| } catch (YarnException e) { |
| throw new NotFoundException(e.getMessage()); |
| } |
| |
| if (subClustersActive.isEmpty()) { |
| throw new NotFoundException( |
| FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); |
| } |
| |
| // Send the requests in parallel |
| CompletionService<NodeInfo> compSvc = |
| new ExecutorCompletionService<NodeInfo>(this.threadpool); |
| |
| for (final SubClusterInfo info : subClustersActive.values()) { |
| compSvc.submit(new Callable<NodeInfo>() { |
| @Override |
| public NodeInfo call() { |
| DefaultRequestInterceptorREST interceptor = |
| getOrCreateInterceptorForSubCluster( |
| info.getSubClusterId(), info.getRMWebServiceAddress()); |
| try { |
| NodeInfo nodeInfo = interceptor.getNode(nodeId); |
| return nodeInfo; |
| } catch (Exception e) { |
| LOG.error("Subcluster {} failed to return nodeInfo.", |
| info.getSubClusterId()); |
| return null; |
| } |
| } |
| }); |
| } |
| |
| // Collect all the responses in parallel |
| NodeInfo nodeInfo = null; |
| for (int i = 0; i < subClustersActive.size(); i++) { |
| try { |
| Future<NodeInfo> future = compSvc.take(); |
| NodeInfo nodeResponse = future.get(); |
| |
| // Check if the node was found in this SubCluster |
| if (nodeResponse != null) { |
| // Check if the node was already found in a different SubCluster and |
| // it has an old health report |
| if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < |
| nodeResponse.getLastHealthUpdate()) { |
| nodeInfo = nodeResponse; |
| } |
| } |
| } catch (Throwable e) { |
| LOG.warn("Failed to get node report ", e); |
| } |
| } |
| if (nodeInfo == null) { |
| throw new NotFoundException("nodeId, " + nodeId + ", is not found"); |
| } |
| return nodeInfo; |
| } |
| |
| /** |
| * The Yarn Router will forward the request to all the Yarn RMs in parallel, |
| * after that it will remove all the duplicated NodeInfo by using the NodeId. |
| * <p> |
| * Possible failure: |
| * <p> |
| * Client: identical behavior as {@code RMWebServices}. |
| * <p> |
| * Router: the Client will timeout and resubmit the request. |
| * <p> |
| * ResourceManager: the Router calls each Yarn RM in parallel by using one |
| * thread for each Yarn RM. In case a Yarn RM fails, a single call will |
| * timeout. However the Router will use the NodesInfo it got, and provides a |
| * partial list to the client. |
| * <p> |
| * State Store: the Router will timeout and it will retry depending on the |
| * FederationFacade settings - if the failure happened before the select |
| * operation. |
| */ |
| @Override |
| public NodesInfo getNodes(final String states) { |
| |
| NodesInfo nodes = new NodesInfo(); |
| |
| Map<SubClusterId, SubClusterInfo> subClustersActive = null; |
| try { |
| subClustersActive = federationFacade.getSubClusters(true); |
| } catch (YarnException e) { |
| LOG.error("Cannot get nodes: {}", e.getMessage()); |
| return new NodesInfo(); |
| } |
| |
| // Send the requests in parallel |
| CompletionService<NodesInfo> compSvc = |
| new ExecutorCompletionService<NodesInfo>(this.threadpool); |
| |
| for (final SubClusterInfo info : subClustersActive.values()) { |
| compSvc.submit(new Callable<NodesInfo>() { |
| @Override |
| public NodesInfo call() { |
| DefaultRequestInterceptorREST interceptor = |
| getOrCreateInterceptorForSubCluster( |
| info.getSubClusterId(), info.getRMWebServiceAddress()); |
| try { |
| NodesInfo nodesInfo = interceptor.getNodes(states); |
| return nodesInfo; |
| } catch (Exception e) { |
| LOG.error("Subcluster {} failed to return nodesInfo.", |
| info.getSubClusterId()); |
| return null; |
| } |
| } |
| }); |
| } |
| |
| // Collect all the responses in parallel |
| |
| for (int i = 0; i < subClustersActive.size(); i++) { |
| try { |
| Future<NodesInfo> future = compSvc.take(); |
| NodesInfo nodesResponse = future.get(); |
| |
| if (nodesResponse != null) { |
| nodes.addAll(nodesResponse.getNodes()); |
| } |
| } catch (Throwable e) { |
| LOG.warn("Failed to get nodes report ", e); |
| } |
| } |
| |
| // Delete duplicate from all the node reports got from all the available |
| // Yarn RMs. Nodes can be moved from one subclusters to another. In this |
| // operation they result LOST/RUNNING in the previous SubCluster and |
| // NEW/RUNNING in the new one. |
| |
| return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes()); |
| } |
| |
| @Override |
| public ClusterMetricsInfo getClusterMetricsInfo() { |
| ClusterMetricsInfo metrics = new ClusterMetricsInfo(); |
| |
| Map<SubClusterId, SubClusterInfo> subClustersActive = null; |
| try { |
| subClustersActive = federationFacade.getSubClusters(true); |
| } catch (YarnException e) { |
| LOG.error(e.getLocalizedMessage()); |
| return metrics; |
| } |
| |
| // Send the requests in parallel |
| CompletionService<ClusterMetricsInfo> compSvc = |
| new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool); |
| |
| for (final SubClusterInfo info : subClustersActive.values()) { |
| compSvc.submit(new Callable<ClusterMetricsInfo>() { |
| @Override |
| public ClusterMetricsInfo call() { |
| DefaultRequestInterceptorREST interceptor = |
| getOrCreateInterceptorForSubCluster( |
| info.getSubClusterId(), info.getRMWebServiceAddress()); |
| try { |
| ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); |
| return metrics; |
| } catch (Exception e) { |
| LOG.error("Subcluster {} failed to return Cluster Metrics.", |
| info.getSubClusterId()); |
| return null; |
| } |
| } |
| }); |
| } |
| |
| // Collect all the responses in parallel |
| |
| for (int i = 0; i < subClustersActive.size(); i++) { |
| try { |
| Future<ClusterMetricsInfo> future = compSvc.take(); |
| ClusterMetricsInfo metricsResponse = future.get(); |
| |
| if (metricsResponse != null) { |
| RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse); |
| } |
| } catch (Throwable e) { |
| LOG.warn("Failed to get nodes report ", e); |
| } |
| } |
| |
| return metrics; |
| } |
| |
| @Override |
| public ClusterInfo get() { |
| return getClusterInfo(); |
| } |
| |
| @Override |
| public ClusterInfo getClusterInfo() { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public SchedulerTypeInfo getSchedulerInfo() { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public String dumpSchedulerLogs(String time, HttpServletRequest hsr) |
| throws IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, |
| String appId, String time) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr, |
| Set<String> stateQueries, Set<String> typeQueries) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppState getAppState(HttpServletRequest hsr, String appId) |
| throws AuthorizationException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) |
| throws IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public LabelsToNodesInfo getLabelsToNodes(Set<String> labels) |
| throws IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels, |
| HttpServletRequest hsr) throws IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response replaceLabelsOnNode(Set<String> newNodeLabelsName, |
| HttpServletRequest hsr, String nodeId) throws Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr) |
| throws IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, |
| HttpServletRequest hsr) throws Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels, |
| HttpServletRequest hsr) throws Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId) |
| throws IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppPriority getAppPriority(HttpServletRequest hsr, String appId) |
| throws AuthorizationException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response updateApplicationPriority(AppPriority targetPriority, |
| HttpServletRequest hsr, String appId) throws AuthorizationException, |
| YarnException, InterruptedException, IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppQueue getAppQueue(HttpServletRequest hsr, String appId) |
| throws AuthorizationException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, |
| String appId) throws AuthorizationException, YarnException, |
| InterruptedException, IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response postDelegationToken(DelegationToken tokenData, |
| HttpServletRequest hsr) throws AuthorizationException, IOException, |
| InterruptedException, Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response postDelegationTokenExpiration(HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException, |
| Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response cancelDelegationToken(HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException, |
| Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response createNewReservation(HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response submitReservation(ReservationSubmissionRequestInfo resContext, |
| HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response updateReservation(ReservationUpdateRequestInfo resContext, |
| HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response deleteReservation(ReservationDeleteRequestInfo resContext, |
| HttpServletRequest hsr) |
| throws AuthorizationException, IOException, InterruptedException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response listReservation(String queue, String reservationId, |
| long startTime, long endTime, boolean includeResourceAllocations, |
| HttpServletRequest hsr) throws Exception { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId, |
| String type) throws AuthorizationException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId) |
| throws AuthorizationException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, |
| HttpServletRequest hsr, String appId) throws AuthorizationException, |
| YarnException, InterruptedException, IOException { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public AppAttemptInfo getAppAttempt(HttpServletRequest req, |
| HttpServletResponse res, String appId, String appAttemptId) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public ContainersInfo getContainers(HttpServletRequest req, |
| HttpServletResponse res, String appId, String appAttemptId) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public ContainerInfo getContainer(HttpServletRequest req, |
| HttpServletResponse res, String appId, String appAttemptId, |
| String containerId) { |
| throw new NotImplementedException(); |
| } |
| |
| @Override |
| public void setNextInterceptor(RESTRequestInterceptor next) { |
| throw new YarnRuntimeException("setNextInterceptor is being called on " |
| + "FederationInterceptorREST, which should be the last one " |
| + "in the chain. Check if the interceptor pipeline configuration " |
| + "is correct"); |
| } |
| |
| @Override |
| public void shutdown() { |
| if (threadpool != null) { |
| threadpool.shutdown(); |
| } |
| } |
| } |