| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; |
| import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; |
| import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; |
| import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; |
| import org.apache.hadoop.yarn.api.records.RejectionReason; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceSizing; |
| import org.apache.hadoop.yarn.api.records.SchedulingRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.stream.Collectors; |
| |
| /** |
| * An ApplicationMasterServiceProcessor that performs Constrained placement of |
| * Scheduling Requests. It does the following: |
| * 1. All initialization. |
| * 2. Intercepts placement constraints from the register call and adds it to |
| * the placement constraint manager. |
| * 3. Dispatches Scheduling Requests to the Planner. |
| */ |
| public class PlacementConstraintProcessor extends AbstractPlacementProcessor { |
| |
| /** |
| * Wrapper over the SchedulingResponse that wires in the placement attempt |
| * and last attempted Node. |
| */ |
| static final class Response extends SchedulingResponse { |
| |
| private final int placementAttempt; |
| private final SchedulerNode attemptedNode; |
| |
| private Response(boolean isSuccess, ApplicationId applicationId, |
| SchedulingRequest schedulingRequest, int placementAttempt, |
| SchedulerNode attemptedNode) { |
| super(isSuccess, applicationId, schedulingRequest); |
| this.placementAttempt = placementAttempt; |
| this.attemptedNode = attemptedNode; |
| } |
| } |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(PlacementConstraintProcessor.class); |
| |
| private ExecutorService schedulingThreadPool; |
| private int retryAttempts; |
| private Map<ApplicationId, List<BatchedRequests>> requestsToRetry = |
| new ConcurrentHashMap<>(); |
| private Map<ApplicationId, List<SchedulingRequest>> requestsToReject = |
| new ConcurrentHashMap<>(); |
| |
| private BatchedRequests.IteratorType iteratorType; |
| private PlacementDispatcher placementDispatcher; |
| |
| |
| @Override |
| public void init(ApplicationMasterServiceContext amsContext, |
| ApplicationMasterServiceProcessor nextProcessor) { |
| LOG.info("Initializing Constraint Placement Processor:"); |
| super.init(amsContext, nextProcessor); |
| |
| // Only the first class is considered - even if a comma separated |
| // list is provided. (This is for simplicity, since getInstances does a |
| // lot of good things by handling things correctly) |
| List<ConstraintPlacementAlgorithm> instances = |
| ((RMContextImpl) amsContext).getYarnConfiguration().getInstances( |
| YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS, |
| ConstraintPlacementAlgorithm.class); |
| ConstraintPlacementAlgorithm algorithm = null; |
| if (instances != null && !instances.isEmpty()) { |
| algorithm = instances.get(0); |
| } else { |
| algorithm = new DefaultPlacementAlgorithm(); |
| } |
| LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName()); |
| |
| String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration() |
| .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR, |
| BatchedRequests.IteratorType.SERIAL.name()); |
| LOG.info("Placement Algorithm Iterator[{}]", iteratorName); |
| try { |
| iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName); |
| } catch (IllegalArgumentException e) { |
| throw new YarnRuntimeException( |
| "Could not instantiate Placement Algorithm Iterator: ", e); |
| } |
| |
| int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( |
| YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE, |
| YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE); |
| this.placementDispatcher = new PlacementDispatcher(); |
| this.placementDispatcher.init( |
| ((RMContextImpl)amsContext), algorithm, algoPSize); |
| LOG.info("Planning Algorithm pool size [{}]", algoPSize); |
| |
| int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( |
| YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE, |
| YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE); |
| this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize); |
| LOG.info("Scheduler pool size [{}]", schedPSize); |
| |
| // Number of times a request that is not satisfied by the scheduler |
| // can be retried. |
| this.retryAttempts = |
| ((RMContextImpl) amsContext).getYarnConfiguration().getInt( |
| YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS); |
| LOG.info("Num retry attempts [{}]", this.retryAttempts); |
| } |
| |
| @Override |
| public void allocate(ApplicationAttemptId appAttemptId, |
| AllocateRequest request, AllocateResponse response) throws YarnException { |
| // Copy the scheduling request since we will clear it later after sending |
| // to dispatcher |
| List<SchedulingRequest> schedulingRequests = |
| new ArrayList<>(request.getSchedulingRequests()); |
| dispatchRequestsForPlacement(appAttemptId, schedulingRequests); |
| reDispatchRetryableRequests(appAttemptId); |
| schedulePlacedRequests(appAttemptId); |
| |
| // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest |
| // added to scheduler. |
| request.setSchedulingRequests(Collections.emptyList()); |
| |
| nextAMSProcessor.allocate(appAttemptId, request, response); |
| |
| handleRejectedRequests(appAttemptId, response); |
| } |
| |
| private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, |
| List<SchedulingRequest> schedulingRequests) { |
| if (schedulingRequests != null && !schedulingRequests.isEmpty()) { |
| SchedulerApplicationAttempt appAttempt = |
| scheduler.getApplicationAttempt(appAttemptId); |
| String queueName = null; |
| if(appAttempt != null) { |
| queueName = appAttempt.getQueueName(); |
| } |
| Resource maxAllocation = |
| scheduler.getMaximumResourceCapability(queueName); |
| // Normalize the Requests before dispatching |
| schedulingRequests.forEach(req -> { |
| Resource reqResource = req.getResourceSizing().getResources(); |
| req.getResourceSizing().setResources( |
| this.scheduler.getNormalizedResource(reqResource, maxAllocation)); |
| }); |
| this.placementDispatcher.dispatch(new BatchedRequests(iteratorType, |
| appAttemptId.getApplicationId(), schedulingRequests, 1)); |
| } |
| } |
| |
| private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) { |
| List<BatchedRequests> reqsToRetry = |
| this.requestsToRetry.get(appAttId.getApplicationId()); |
| if (reqsToRetry != null && !reqsToRetry.isEmpty()) { |
| synchronized (reqsToRetry) { |
| for (BatchedRequests bReq: reqsToRetry) { |
| this.placementDispatcher.dispatch(bReq); |
| } |
| reqsToRetry.clear(); |
| } |
| } |
| } |
| |
| private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { |
| ApplicationId applicationId = appAttemptId.getApplicationId(); |
| List<PlacedSchedulingRequest> placedSchedulingRequests = |
| this.placementDispatcher.pullPlacedRequests(applicationId); |
| for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) { |
| SchedulingRequest sReq = placedReq.getSchedulingRequest(); |
| for (SchedulerNode node : placedReq.getNodes()) { |
| final SchedulingRequest sReqClone = |
| SchedulingRequest.newInstance(sReq.getAllocationRequestId(), |
| sReq.getPriority(), sReq.getExecutionType(), |
| sReq.getAllocationTags(), |
| ResourceSizing.newInstance( |
| sReq.getResourceSizing().getResources()), |
| sReq.getPlacementConstraint()); |
| SchedulerApplicationAttempt applicationAttempt = |
| this.scheduler.getApplicationAttempt(appAttemptId); |
| Runnable task = () -> { |
| boolean success = |
| scheduler.attemptAllocationOnNode( |
| applicationAttempt, sReqClone, node); |
| if (!success) { |
| LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", |
| placedReq.getPlacementAttempt(), sReqClone); |
| } |
| handleSchedulingResponse( |
| new Response(success, applicationId, sReqClone, |
| placedReq.getPlacementAttempt(), node)); |
| }; |
| this.schedulingThreadPool.submit(task); |
| } |
| } |
| } |
| |
| private void handleRejectedRequests(ApplicationAttemptId appAttemptId, |
| AllocateResponse response) { |
| List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests = |
| this.placementDispatcher.pullRejectedRequests( |
| appAttemptId.getApplicationId()); |
| if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) { |
| LOG.warn("Following requests of [{}] were rejected by" + |
| " the PlacementAlgorithmOutput Algorithm: {}", |
| appAttemptId.getApplicationId(), rejectedAlgoRequests); |
| rejectedAlgoRequests.stream() |
| .filter(req -> req.getPlacementAttempt() < retryAttempts) |
| .forEach(req -> handleSchedulingResponse( |
| new Response(false, appAttemptId.getApplicationId(), |
| req.getSchedulingRequest(), req.getPlacementAttempt(), |
| null))); |
| ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, |
| rejectedAlgoRequests.stream() |
| .filter(req -> req.getPlacementAttempt() >= retryAttempts) |
| .map(sr -> RejectedSchedulingRequest.newInstance( |
| RejectionReason.COULD_NOT_PLACE_ON_NODE, |
| sr.getSchedulingRequest())) |
| .collect(Collectors.toList())); |
| } |
| List<SchedulingRequest> rejectedRequests = |
| this.requestsToReject.get(appAttemptId.getApplicationId()); |
| if (rejectedRequests != null && !rejectedRequests.isEmpty()) { |
| synchronized (rejectedRequests) { |
| LOG.warn("Following requests of [{}] exhausted all retry attempts " + |
| "trying to schedule on placed node: {}", |
| appAttemptId.getApplicationId(), rejectedRequests); |
| ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, |
| rejectedRequests.stream() |
| .map(sr -> RejectedSchedulingRequest.newInstance( |
| RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr)) |
| .collect(Collectors.toList())); |
| rejectedRequests.clear(); |
| } |
| } |
| } |
| |
| @Override |
| public void finishApplicationMaster(ApplicationAttemptId appAttemptId, |
| FinishApplicationMasterRequest request, |
| FinishApplicationMasterResponse response) { |
| placementDispatcher.clearApplicationState(appAttemptId.getApplicationId()); |
| requestsToReject.remove(appAttemptId.getApplicationId()); |
| requestsToRetry.remove(appAttemptId.getApplicationId()); |
| super.finishApplicationMaster(appAttemptId, request, response); |
| } |
| |
| private void handleSchedulingResponse(SchedulingResponse schedulerResponse) { |
| int placementAttempt = ((Response)schedulerResponse).placementAttempt; |
| // Retry this placement as it is not successful and we are still |
| // under max retry. The req is batched with other unsuccessful |
| // requests from the same app |
| if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) { |
| List<BatchedRequests> reqsToRetry = |
| requestsToRetry.computeIfAbsent( |
| schedulerResponse.getApplicationId(), |
| k -> new ArrayList<>()); |
| synchronized (reqsToRetry) { |
| addToRetryList(schedulerResponse, placementAttempt, reqsToRetry); |
| } |
| LOG.warn("Going to retry request for application [{}] after [{}]" + |
| " attempts: [{}]", schedulerResponse.getApplicationId(), |
| placementAttempt, schedulerResponse.getSchedulingRequest()); |
| } else { |
| if (!schedulerResponse.isSuccess()) { |
| LOG.warn("Not retrying request for application [{}] after [{}]" + |
| " attempts: [{}]", schedulerResponse.getApplicationId(), |
| placementAttempt, schedulerResponse.getSchedulingRequest()); |
| List<SchedulingRequest> reqsToReject = |
| requestsToReject.computeIfAbsent( |
| schedulerResponse.getApplicationId(), |
| k -> new ArrayList<>()); |
| synchronized (reqsToReject) { |
| reqsToReject.add(schedulerResponse.getSchedulingRequest()); |
| } |
| } |
| } |
| } |
| |
| private void addToRetryList(SchedulingResponse schedulerResponse, |
| int placementAttempt, List<BatchedRequests> reqsToRetry) { |
| boolean isAdded = false; |
| for (BatchedRequests br : reqsToRetry) { |
| if (br.getPlacementAttempt() == placementAttempt + 1) { |
| br.addToBatch(schedulerResponse.getSchedulingRequest()); |
| br.addToBlacklist( |
| schedulerResponse.getSchedulingRequest().getAllocationTags(), |
| ((Response) schedulerResponse).attemptedNode); |
| isAdded = true; |
| break; |
| } |
| } |
| if (!isAdded) { |
| BatchedRequests br = new BatchedRequests(iteratorType, |
| schedulerResponse.getApplicationId(), |
| Lists.newArrayList(schedulerResponse.getSchedulingRequest()), |
| placementAttempt + 1); |
| reqsToRetry.add(br); |
| br.addToBlacklist( |
| schedulerResponse.getSchedulingRequest().getAllocationTags(), |
| ((Response) schedulerResponse).attemptedNode); |
| } |
| } |
| } |