| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class AbstractSchedulerPlanFollower implements PlanFollower { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class); |
| |
| protected Collection<Plan> plans = new ArrayList<Plan>(); |
| protected YarnScheduler scheduler; |
| protected Clock clock; |
| |
| @Override |
| public void init(Clock clock, ResourceScheduler sched, |
| Collection<Plan> plans) { |
| this.clock = clock; |
| this.scheduler = sched; |
| this.plans.addAll(plans); |
| } |
| |
| @Override |
| public synchronized void run() { |
| for (Plan plan : plans) { |
| synchronizePlan(plan, true); |
| } |
| } |
| |
| @Override |
| public synchronized void setPlans(Collection<Plan> plans) { |
| this.plans.clear(); |
| this.plans.addAll(plans); |
| } |
| |
| @Override |
| public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) { |
| String planQueueName = plan.getQueueName(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Running plan follower edit policy for plan: " + planQueueName); |
| } |
| // align with plan step |
| long step = plan.getStep(); |
| long now = clock.getTime(); |
| if (now % step != 0) { |
| now += step - (now % step); |
| } |
| Queue planQueue = getPlanQueue(planQueueName); |
| if (planQueue == null) { |
| return; |
| } |
| |
| // first we publish to the plan the current availability of resources |
| Resource clusterResources = scheduler.getClusterResource(); |
| Resource planResources = |
| getPlanResources(plan, planQueue, clusterResources); |
| Set<ReservationAllocation> currentReservations = |
| plan.getReservationsAtTime(now); |
| Set<String> curReservationNames = new HashSet<String>(); |
| Resource reservedResources = Resource.newInstance(0, 0); |
| int numRes = getReservedResources(now, currentReservations, |
| curReservationNames, reservedResources); |
| // create the default reservation queue if it doesnt exist |
| String defReservationId = getReservationIdFromQueueName(planQueueName) |
| + ReservationConstants.DEFAULT_QUEUE_SUFFIX; |
| String defReservationQueue = |
| getReservationQueueName(planQueueName, defReservationId); |
| createDefaultReservationQueue(planQueueName, planQueue, defReservationId); |
| curReservationNames.add(defReservationId); |
| // if the resources dedicated to this plan has shrunk invoke replanner |
| boolean shouldResize = false; |
| if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(), |
| clusterResources, planResources, reservedResources)) { |
| if (shouldReplan) { |
| try { |
| plan.getReplanner().plan(plan, null); |
| } catch (PlanningException e) { |
| LOG.warn("Exception while trying to replan: {}", planQueueName, e); |
| } |
| } else { |
| shouldResize = true; |
| } |
| } |
| // identify the reservations that have expired and new reservations that |
| // have to be activated |
| List<? extends Queue> resQueues = getChildReservationQueues(planQueue); |
| Set<String> expired = new HashSet<String>(); |
| for (Queue resQueue : resQueues) { |
| String resQueueName = resQueue.getQueueName(); |
| String reservationId = getReservationIdFromQueueName(resQueueName); |
| if (curReservationNames.contains(reservationId)) { |
| // it is already existing reservation, so needed not create new |
| // reservation queue |
| curReservationNames.remove(reservationId); |
| } else { |
| // the reservation has termination, mark for cleanup |
| expired.add(reservationId); |
| } |
| } |
| // garbage collect expired reservations |
| cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired, |
| defReservationQueue); |
| // Add new reservations and update existing ones |
| float totalAssignedCapacity = 0f; |
| if (currentReservations != null) { |
| // first release all excess capacity in default queue |
| try { |
| setQueueEntitlement(planQueueName, defReservationQueue, 0f, 1.0f); |
| } catch (YarnException e) { |
| LOG.warn( |
| "Exception while trying to release default queue capacity for plan: {}", |
| planQueueName, e); |
| } |
| // sort allocations from the one giving up the most resources, to the |
| // one asking for the most avoid order-of-operation errors that |
| // temporarily violate 100% capacity bound |
| List<ReservationAllocation> sortedAllocations = sortByDelta( |
| new ArrayList<ReservationAllocation>(currentReservations), now, plan); |
| for (ReservationAllocation res : sortedAllocations) { |
| String currResId = res.getReservationId().toString(); |
| if (curReservationNames.contains(currResId)) { |
| addReservationQueue(planQueueName, planQueue, currResId); |
| } |
| Resource capToAssign = res.getResourcesAtTime(now); |
| float targetCapacity = 0f; |
| if (planResources.getMemorySize() > 0 |
| && planResources.getVirtualCores() > 0) { |
| if (shouldResize) { |
| capToAssign = calculateReservationToPlanProportion( |
| plan.getResourceCalculator(), planResources, reservedResources, |
| capToAssign); |
| } |
| targetCapacity = |
| calculateReservationToPlanRatio(plan.getResourceCalculator(), |
| clusterResources, planResources, capToAssign); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "Assigning capacity of {} to queue {} with target capacity {}", |
| capToAssign, currResId, targetCapacity); |
| } |
| // set maxCapacity to 100% unless the job requires gang, in which |
| // case we stick to capacity (as running early/before is likely a |
| // waste of resources) |
| float maxCapacity = 1.0f; |
| if (res.containsGangs()) { |
| maxCapacity = targetCapacity; |
| } |
| try { |
| setQueueEntitlement(planQueueName, currResId, targetCapacity, |
| maxCapacity); |
| } catch (YarnException e) { |
| LOG.warn("Exception while trying to size reservation for plan: {}", |
| currResId, planQueueName, e); |
| } |
| totalAssignedCapacity += targetCapacity; |
| } |
| } |
| // compute the default queue capacity |
| float defQCap = 1.0f - totalAssignedCapacity; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "PlanFollowerEditPolicyTask: total Plan Capacity: {} " |
| + "currReservation: {} default-queue capacity: {}", |
| planResources, numRes, defQCap); |
| } |
| // set the default queue to eat-up all remaining capacity |
| try { |
| setQueueEntitlement(planQueueName, defReservationQueue, defQCap, 1.0f); |
| } catch (YarnException e) { |
| LOG.warn( |
| "Exception while trying to reclaim default queue capacity for plan: {}", |
| planQueueName, e); |
| } |
| // garbage collect finished reservations from plan |
| try { |
| plan.archiveCompletedReservations(now); |
| } catch (PlanningException e) { |
| LOG.error("Exception in archiving completed reservations: ", e); |
| } |
| LOG.info("Finished iteration of plan follower edit policy for plan: " |
| + planQueueName); |
| // Extension: update plan with app states, |
| // useful to support smart replanning |
| } |
| |
| protected String getReservationIdFromQueueName(String resQueueName) { |
| return resQueueName; |
| } |
| |
| protected void setQueueEntitlement(String planQueueName, String currResId, |
| float targetCapacity, float maxCapacity) throws YarnException { |
| String reservationQueueName = |
| getReservationQueueName(planQueueName, currResId); |
| scheduler.setEntitlement(reservationQueueName, |
| new QueueEntitlement(targetCapacity, maxCapacity)); |
| } |
| |
| // Schedulers have different ways of naming queues. See YARN-2773 |
| protected String getReservationQueueName(String planQueueName, |
| String reservationId) { |
| return reservationId; |
| } |
| |
| /** |
| * First sets entitlement of queues to zero to prevent new app submission. |
| * Then move all apps in the set of queues to the parent plan queue's default |
| * reservation queue if move is enabled. Finally cleanups the queue by killing |
| * any apps (if move is disabled or move failed) and removing the queue |
| * |
| * @param planQueueName the name of {@code PlanQueue} |
| * @param shouldMove flag to indicate if any running apps should be moved or |
| * killed |
| * @param toRemove the remnant apps to clean up |
| * @param defReservationQueue the default {@code ReservationQueue} of the |
| * {@link Plan} |
| */ |
| protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove, |
| Set<String> toRemove, String defReservationQueue) { |
| for (String expiredReservationId : toRemove) { |
| try { |
| // reduce entitlement to 0 |
| String expiredReservation = |
| getReservationQueueName(planQueueName, expiredReservationId); |
| setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f); |
| if (shouldMove) { |
| moveAppsInQueueSync(expiredReservation, defReservationQueue); |
| } |
| List<ApplicationAttemptId> appsInQueue = scheduler. |
| getAppsInQueue(expiredReservation); |
| int size = (appsInQueue == null ? 0 : appsInQueue.size()); |
| if (size > 0) { |
| scheduler.killAllAppsInQueue(expiredReservation); |
| LOG.info("Killing applications in queue: {}", expiredReservation); |
| } else { |
| scheduler.removeQueue(expiredReservation); |
| LOG.info("Queue: " + expiredReservation + " removed"); |
| } |
| } catch (YarnException e) { |
| LOG.warn("Exception while trying to expire reservation: {}", |
| expiredReservationId, e); |
| } |
| } |
| } |
| |
| /** |
| * Move all apps in the set of queues to the parent plan queue's default |
| * reservation queue in a synchronous fashion |
| */ |
| private void moveAppsInQueueSync(String expiredReservation, |
| String defReservationQueue) { |
| List<ApplicationAttemptId> activeApps = |
| scheduler.getAppsInQueue(expiredReservation); |
| if (activeApps.isEmpty()) { |
| return; |
| } |
| for (ApplicationAttemptId app : activeApps) { |
| // fallback to parent's default queue |
| try { |
| scheduler.moveApplication(app.getApplicationId(), defReservationQueue); |
| } catch (YarnException e) { |
| LOG.warn( |
| "Encountered unexpected error during migration of application: {}" |
| + " from reservation: {}", |
| app, expiredReservation, e); |
| } |
| } |
| } |
| |
| protected int getReservedResources(long now, |
| Set<ReservationAllocation> currentReservations, |
| Set<String> curReservationNames, Resource reservedResources) { |
| int numRes = 0; |
| if (currentReservations != null) { |
| numRes = currentReservations.size(); |
| for (ReservationAllocation reservation : currentReservations) { |
| curReservationNames.add(reservation.getReservationId().toString()); |
| Resources.addTo(reservedResources, reservation.getResourcesAtTime(now)); |
| } |
| } |
| return numRes; |
| } |
| |
| /** |
| * Sort in the order from the least new amount of resources asked (likely |
| * negative) to the highest. This prevents "order-of-operation" errors related |
| * to exceeding 100% capacity temporarily. |
| * |
| * @param currentReservations the currently active reservations |
| * @param now the current time |
| * @param plan the {@link Plan} that is being considered |
| * |
| * @return the sorted list of {@link ReservationAllocation}s |
| */ |
| protected List<ReservationAllocation> sortByDelta( |
| List<ReservationAllocation> currentReservations, long now, Plan plan) { |
| Collections.sort(currentReservations, |
| new ReservationAllocationComparator(now, this, plan)); |
| return currentReservations; |
| } |
| |
| /** |
| * Get queue associated with reservable queue named. |
| * |
| * @param planQueueName name of the reservable queue |
| * @return queue associated with the reservable queue |
| */ |
| protected abstract Queue getPlanQueue(String planQueueName); |
| |
| /** |
| * Resizes reservations based on currently available resources. |
| */ |
| private Resource calculateReservationToPlanProportion( |
| ResourceCalculator rescCalculator, Resource availablePlanResources, |
| Resource totalReservationResources, Resource reservationResources) { |
| return Resources.multiply(availablePlanResources, Resources.ratio( |
| rescCalculator, reservationResources, totalReservationResources)); |
| } |
| |
| /** |
| * Calculates ratio of reservationResources to planResources. |
| */ |
| private float calculateReservationToPlanRatio( |
| ResourceCalculator rescCalculator, Resource clusterResources, |
| Resource planResources, Resource reservationResources) { |
| return Resources.divide(rescCalculator, clusterResources, |
| reservationResources, planResources); |
| } |
| |
| /** |
| * Check if plan resources are less than expected reservation resources. |
| */ |
| private boolean arePlanResourcesLessThanReservations( |
| ResourceCalculator rescCalculator, Resource clusterResources, |
| Resource planResources, Resource reservedResources) { |
| return Resources.greaterThan(rescCalculator, clusterResources, |
| reservedResources, planResources); |
| } |
| |
| /** |
| * Get a list of reservation queues for this planQueue. |
| * |
| * @param planQueue the queue for the current {@link Plan} |
| * |
| * @return the queues corresponding to the reservations |
| */ |
| protected abstract List<? extends Queue> getChildReservationQueues( |
| Queue planQueue); |
| |
| /** |
| * Add a new reservation queue for reservation currResId for this planQueue. |
| */ |
| protected abstract void addReservationQueue(String planQueueName, Queue queue, |
| String currResId); |
| |
| /** |
| * Creates the default reservation queue for use when no reservation is used |
| * for applications submitted to this planQueue. |
| * |
| * @param planQueueName name of the reservable queue |
| * @param queue the queue for the current {@link Plan} |
| * @param defReservationQueue name of the default {@code ReservationQueue} |
| */ |
| protected abstract void createDefaultReservationQueue(String planQueueName, |
| Queue queue, String defReservationQueue); |
| |
| /** |
| * Get plan resources for this planQueue. |
| * |
| * @param plan the current {@link Plan} being considered |
| * @param clusterResources the resources available in the cluster |
| * |
| * @return the resources allocated to the specified {@link Plan} |
| */ |
| protected abstract Resource getPlanResources(Plan plan, Queue queue, |
| Resource clusterResources); |
| |
| /** |
| * Get reservation queue resources if it exists otherwise return null. |
| * |
| * @param plan the current {@link Plan} being considered |
| * @param reservationId the identifier of the reservation |
| * |
| * @return the resources allocated to the specified reservation |
| */ |
| protected abstract Resource getReservationQueueResourceIfExists(Plan plan, |
| ReservationId reservationId); |
| |
| private static class ReservationAllocationComparator |
| implements Comparator<ReservationAllocation> { |
| AbstractSchedulerPlanFollower planFollower; |
| long now; |
| Plan plan; |
| |
| ReservationAllocationComparator(long now, |
| AbstractSchedulerPlanFollower planFollower, Plan plan) { |
| this.now = now; |
| this.planFollower = planFollower; |
| this.plan = plan; |
| } |
| |
| private Resource getUnallocatedReservedResources( |
| ReservationAllocation reservation) { |
| Resource resResource; |
| Resource reservationResource = |
| planFollower.getReservationQueueResourceIfExists(plan, |
| reservation.getReservationId()); |
| if (reservationResource != null) { |
| resResource = Resources.subtract(reservation.getResourcesAtTime(now), |
| reservationResource); |
| } else { |
| resResource = reservation.getResourcesAtTime(now); |
| } |
| return resResource; |
| } |
| |
| @Override |
| public int compare(ReservationAllocation lhs, ReservationAllocation rhs) { |
| // compute delta between current and previous reservation, and compare |
| // based on that |
| Resource lhsRes = getUnallocatedReservedResources(lhs); |
| Resource rhsRes = getUnallocatedReservedResources(rhs); |
| return lhsRes.compareTo(rhsRes); |
| } |
| } |
| } |