| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Evolving; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.Groups; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceOption; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; |
| import org.apache.hadoop.yarn.security.Permission; |
| import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; |
| import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; |
| import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; |
| import org.apache.hadoop.yarn.server.utils.Lock; |
| import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| @LimitedPrivate("yarn") |
| @Evolving |
| @SuppressWarnings("unchecked") |
| public class CapacityScheduler extends |
| AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements |
| PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { |
| |
| private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); |
| private YarnAuthorizationProvider authorizer; |
| |
| private CSQueue root; |
| // timeout to join when we stop this service |
| protected final long THREAD_JOIN_TIMEOUT_MS = 1000; |
| |
| static final Comparator<CSQueue> nonPartitionedQueueComparator = |
| new Comparator<CSQueue>() { |
| @Override |
| public int compare(CSQueue q1, CSQueue q2) { |
| if (q1.getUsedCapacity() < q2.getUsedCapacity()) { |
| return -1; |
| } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) { |
| return 1; |
| } |
| |
| return q1.getQueuePath().compareTo(q2.getQueuePath()); |
| } |
| }; |
| |
| static final PartitionedQueueComparator partitionedQueueComparator = |
| new PartitionedQueueComparator(); |
| |
| @Override |
| public void setConf(Configuration conf) { |
| yarnConf = conf; |
| } |
| |
| private void validateConf(Configuration conf) { |
| // validate scheduler memory allocation setting |
| int minMem = conf.getInt( |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); |
| int maxMem = conf.getInt( |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); |
| |
| if (minMem <= 0 || minMem > maxMem) { |
| throw new YarnRuntimeException("Invalid resource scheduler memory" |
| + " allocation configuration" |
| + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB |
| + "=" + minMem |
| + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB |
| + "=" + maxMem + ", min and max should be greater than 0" |
| + ", max should be no smaller than min."); |
| } |
| |
| // validate scheduler vcores allocation setting |
| int minVcores = conf.getInt( |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); |
| int maxVcores = conf.getInt( |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); |
| |
| if (minVcores <= 0 || minVcores > maxVcores) { |
| throw new YarnRuntimeException("Invalid resource scheduler vcores" |
| + " allocation configuration" |
| + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES |
| + "=" + minVcores |
| + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES |
| + "=" + maxVcores + ", min and max should be greater than 0" |
| + ", max should be no smaller than min."); |
| } |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return yarnConf; |
| } |
| |
| private CapacitySchedulerConfiguration conf; |
| private Configuration yarnConf; |
| |
| private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>(); |
| |
| private ResourceCalculator calculator; |
| private boolean usePortForNodeName; |
| |
| private boolean scheduleAsynchronously; |
| private AsyncScheduleThread asyncSchedulerThread; |
| private RMNodeLabelsManager labelManager; |
| private SchedulerHealth schedulerHealth = new SchedulerHealth(); |
| volatile long lastNodeUpdateTime; |
| |
| /** |
| * EXPERT |
| */ |
| private long asyncScheduleInterval; |
| private static final String ASYNC_SCHEDULER_INTERVAL = |
| CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX |
| + ".scheduling-interval-ms"; |
| private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; |
| |
| public CapacityScheduler() { |
| super(CapacityScheduler.class.getName()); |
| } |
| |
| @Override |
| public QueueMetrics getRootQueueMetrics() { |
| return root.getMetrics(); |
| } |
| |
| public CSQueue getRootQueue() { |
| return root; |
| } |
| |
| @Override |
| public CapacitySchedulerConfiguration getConfiguration() { |
| return conf; |
| } |
| |
| @Override |
| public synchronized RMContainerTokenSecretManager |
| getContainerTokenSecretManager() { |
| return this.rmContext.getContainerTokenSecretManager(); |
| } |
| |
| @Override |
| public ResourceCalculator getResourceCalculator() { |
| return calculator; |
| } |
| |
| @Override |
| public Comparator<CSQueue> getNonPartitionedQueueComparator() { |
| return nonPartitionedQueueComparator; |
| } |
| |
| @Override |
| public PartitionedQueueComparator getPartitionedQueueComparator() { |
| return partitionedQueueComparator; |
| } |
| |
| @Override |
| public int getNumClusterNodes() { |
| return nodeTracker.nodeCount(); |
| } |
| |
| @Override |
| public synchronized RMContext getRMContext() { |
| return this.rmContext; |
| } |
| |
| @Override |
| public synchronized void setRMContext(RMContext rmContext) { |
| this.rmContext = rmContext; |
| } |
| |
| private synchronized void initScheduler(Configuration configuration) throws |
| IOException { |
| this.conf = loadCapacitySchedulerConfiguration(configuration); |
| validateConf(this.conf); |
| this.minimumAllocation = this.conf.getMinimumAllocation(); |
| initMaximumResourceCapability(this.conf.getMaximumAllocation()); |
| this.calculator = this.conf.getResourceCalculator(); |
| this.usePortForNodeName = this.conf.getUsePortForNodeName(); |
| this.applications = |
| new ConcurrentHashMap<ApplicationId, |
| SchedulerApplication<FiCaSchedulerApp>>(); |
| this.labelManager = rmContext.getNodeLabelManager(); |
| authorizer = YarnAuthorizationProvider.getInstance(yarnConf); |
| initializeQueues(this.conf); |
| |
| scheduleAsynchronously = this.conf.getScheduleAynschronously(); |
| asyncScheduleInterval = |
| this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, |
| DEFAULT_ASYNC_SCHEDULER_INTERVAL); |
| if (scheduleAsynchronously) { |
| asyncSchedulerThread = new AsyncScheduleThread(this); |
| } |
| |
| LOG.info("Initialized CapacityScheduler with " + |
| "calculator=" + getResourceCalculator().getClass() + ", " + |
| "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + |
| "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + |
| "asynchronousScheduling=" + scheduleAsynchronously + ", " + |
| "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); |
| } |
| |
| private synchronized void startSchedulerThreads() { |
| if (scheduleAsynchronously) { |
| Preconditions.checkNotNull(asyncSchedulerThread, |
| "asyncSchedulerThread is null"); |
| asyncSchedulerThread.start(); |
| } |
| } |
| |
| @Override |
| public void serviceInit(Configuration conf) throws Exception { |
| Configuration configuration = new Configuration(conf); |
| super.serviceInit(conf); |
| initScheduler(configuration); |
| } |
| |
| @Override |
| public void serviceStart() throws Exception { |
| startSchedulerThreads(); |
| super.serviceStart(); |
| } |
| |
| @Override |
| public void serviceStop() throws Exception { |
| synchronized (this) { |
| if (scheduleAsynchronously && asyncSchedulerThread != null) { |
| asyncSchedulerThread.interrupt(); |
| asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); |
| } |
| } |
| super.serviceStop(); |
| } |
| |
| @Override |
| public synchronized void |
| reinitialize(Configuration conf, RMContext rmContext) throws IOException { |
| Configuration configuration = new Configuration(conf); |
| CapacitySchedulerConfiguration oldConf = this.conf; |
| this.conf = loadCapacitySchedulerConfiguration(configuration); |
| validateConf(this.conf); |
| try { |
| LOG.info("Re-initializing queues..."); |
| refreshMaximumAllocation(this.conf.getMaximumAllocation()); |
| reinitializeQueues(this.conf); |
| } catch (Throwable t) { |
| this.conf = oldConf; |
| refreshMaximumAllocation(this.conf.getMaximumAllocation()); |
| throw new IOException("Failed to re-init queues", t); |
| } |
| } |
| |
| long getAsyncScheduleInterval() { |
| return asyncScheduleInterval; |
| } |
| |
| private final static Random random = new Random(System.currentTimeMillis()); |
| |
| /** |
| * Schedule on all nodes by starting at a random point. |
| * @param cs |
| */ |
| static void schedule(CapacityScheduler cs) { |
| // First randomize the start point |
| int current = 0; |
| Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes(); |
| int start = random.nextInt(nodes.size()); |
| for (FiCaSchedulerNode node : nodes) { |
| if (current++ >= start) { |
| cs.allocateContainersToNode(node); |
| } |
| } |
| // Now, just get everyone to be safe |
| for (FiCaSchedulerNode node : nodes) { |
| cs.allocateContainersToNode(node); |
| } |
| try { |
| Thread.sleep(cs.getAsyncScheduleInterval()); |
| } catch (InterruptedException e) {} |
| } |
| |
| static class AsyncScheduleThread extends Thread { |
| |
| private final CapacityScheduler cs; |
| private AtomicBoolean runSchedules = new AtomicBoolean(false); |
| |
| public AsyncScheduleThread(CapacityScheduler cs) { |
| this.cs = cs; |
| setDaemon(true); |
| } |
| |
| @Override |
| public void run() { |
| while (true) { |
| if (!runSchedules.get()) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ie) {} |
| } else { |
| schedule(cs); |
| } |
| } |
| } |
| |
| public void beginSchedule() { |
| runSchedules.set(true); |
| } |
| |
| public void suspendSchedule() { |
| runSchedules.set(false); |
| } |
| |
| } |
| |
| @Private |
| public static final String ROOT_QUEUE = |
| CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; |
| |
| static class QueueHook { |
| public CSQueue hook(CSQueue queue) { |
| return queue; |
| } |
| } |
| private static final QueueHook noop = new QueueHook(); |
| |
| @VisibleForTesting |
| public synchronized UserGroupMappingPlacementRule |
| getUserGroupMappingPlacementRule() throws IOException { |
| boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); |
| LOG.info("Initialized queue mappings, override: " |
| + overrideWithQueueMappings); |
| |
| // Get new user/group mappings |
| List<UserGroupMappingPlacementRule.QueueMapping> newMappings = |
| conf.getQueueMappings(); |
| // check if mappings refer to valid queues |
| for (QueueMapping mapping : newMappings) { |
| String mappingQueue = mapping.getQueue(); |
| if (!mappingQueue |
| .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) |
| && !mappingQueue |
| .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { |
| CSQueue queue = queues.get(mappingQueue); |
| if (queue == null || !(queue instanceof LeafQueue)) { |
| throw new IOException("mapping contains invalid or non-leaf queue " |
| + mappingQueue); |
| } |
| } |
| } |
| |
| // initialize groups if mappings are present |
| if (newMappings.size() > 0) { |
| Groups groups = new Groups(conf); |
| return new UserGroupMappingPlacementRule(overrideWithQueueMappings, |
| newMappings, groups); |
| } |
| |
| return null; |
| } |
| |
| private void updatePlacementRules() throws IOException { |
| List<PlacementRule> placementRules = new ArrayList<>(); |
| |
| // Initialize UserGroupMappingPlacementRule |
| // TODO, need make this defineable by configuration. |
| UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule(); |
| if (null != ugRule) { |
| placementRules.add(ugRule); |
| } |
| |
| rmContext.getQueuePlacementManager().updateRules(placementRules); |
| } |
| |
| @Lock(CapacityScheduler.class) |
| private void initializeQueues(CapacitySchedulerConfiguration conf) |
| throws IOException { |
| |
| root = |
| parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, |
| queues, queues, noop); |
| labelManager.reinitializeQueueLabels(getQueueToLabels()); |
| LOG.info("Initialized root queue " + root); |
| updatePlacementRules(); |
| setQueueAcls(authorizer, queues); |
| } |
| |
| @Lock(CapacityScheduler.class) |
| private void reinitializeQueues(CapacitySchedulerConfiguration conf) |
| throws IOException { |
| // Parse new queues |
| Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>(); |
| CSQueue newRoot = |
| parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, |
| newQueues, queues, noop); |
| |
| // Ensure all existing queues are still present |
| validateExistingQueues(queues, newQueues); |
| |
| // Add new queues |
| addNewQueues(queues, newQueues); |
| |
| // Re-configure queues |
| root.reinitialize(newRoot, getClusterResource()); |
| updatePlacementRules(); |
| |
| // Re-calculate headroom for active applications |
| Resource clusterResource = getClusterResource(); |
| root.updateClusterResource(clusterResource, new ResourceLimits( |
| clusterResource)); |
| |
| labelManager.reinitializeQueueLabels(getQueueToLabels()); |
| setQueueAcls(authorizer, queues); |
| } |
| |
| @VisibleForTesting |
| public static void setQueueAcls(YarnAuthorizationProvider authorizer, |
| Map<String, CSQueue> queues) throws IOException { |
| List<Permission> permissions = new ArrayList<>(); |
| for (CSQueue queue : queues.values()) { |
| AbstractCSQueue csQueue = (AbstractCSQueue) queue; |
| permissions.add( |
| new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); |
| } |
| authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); |
| } |
| |
| private Map<String, Set<String>> getQueueToLabels() { |
| Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>(); |
| for (CSQueue queue : queues.values()) { |
| queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels()); |
| } |
| return queueToLabels; |
| } |
| |
| /** |
| * Ensure all existing queues are present. Queues cannot be deleted |
| * @param queues existing queues |
| * @param newQueues new queues |
| */ |
| @Lock(CapacityScheduler.class) |
| private void validateExistingQueues( |
| Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) |
| throws IOException { |
| // check that all static queues are included in the newQueues list |
| for (Map.Entry<String, CSQueue> e : queues.entrySet()) { |
| if (!(e.getValue() instanceof ReservationQueue)) { |
| String queueName = e.getKey(); |
| CSQueue oldQueue = e.getValue(); |
| CSQueue newQueue = newQueues.get(queueName); |
| if (null == newQueue) { |
| throw new IOException(queueName + " cannot be found during refresh!"); |
| } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { |
| throw new IOException(queueName + " is moved from:" |
| + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() |
| + " after refresh, which is not allowed."); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Add the new queues (only) to our list of queues... |
| * ... be careful, do not overwrite existing queues. |
| * @param queues |
| * @param newQueues |
| */ |
| @Lock(CapacityScheduler.class) |
| private void addNewQueues( |
| Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) |
| { |
| for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) { |
| String queueName = e.getKey(); |
| CSQueue queue = e.getValue(); |
| if (!queues.containsKey(queueName)) { |
| queues.put(queueName, queue); |
| } |
| } |
| } |
| |
| @Lock(CapacityScheduler.class) |
| static CSQueue parseQueue( |
| CapacitySchedulerContext csContext, |
| CapacitySchedulerConfiguration conf, |
| CSQueue parent, String queueName, Map<String, CSQueue> queues, |
| Map<String, CSQueue> oldQueues, |
| QueueHook hook) throws IOException { |
| CSQueue queue; |
| String fullQueueName = |
| (parent == null) ? queueName |
| : (parent.getQueuePath() + "." + queueName); |
| String[] childQueueNames = |
| conf.getQueues(fullQueueName); |
| boolean isReservableQueue = conf.isReservable(fullQueueName); |
| if (childQueueNames == null || childQueueNames.length == 0) { |
| if (null == parent) { |
| throw new IllegalStateException( |
| "Queue configuration missing child queue names for " + queueName); |
| } |
| // Check if the queue will be dynamically managed by the Reservation |
| // system |
| if (isReservableQueue) { |
| queue = |
| new PlanQueue(csContext, queueName, parent, |
| oldQueues.get(queueName)); |
| } else { |
| queue = |
| new LeafQueue(csContext, queueName, parent, |
| oldQueues.get(queueName)); |
| |
| // Used only for unit tests |
| queue = hook.hook(queue); |
| } |
| } else { |
| if (isReservableQueue) { |
| throw new IllegalStateException( |
| "Only Leaf Queues can be reservable for " + queueName); |
| } |
| ParentQueue parentQueue = |
| new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); |
| |
| // Used only for unit tests |
| queue = hook.hook(parentQueue); |
| |
| List<CSQueue> childQueues = new ArrayList<CSQueue>(); |
| for (String childQueueName : childQueueNames) { |
| CSQueue childQueue = |
| parseQueue(csContext, conf, queue, childQueueName, |
| queues, oldQueues, hook); |
| childQueues.add(childQueue); |
| } |
| parentQueue.setChildQueues(childQueues); |
| } |
| |
| if (queue instanceof LeafQueue && queues.containsKey(queueName) |
| && queues.get(queueName) instanceof LeafQueue) { |
| throw new IOException("Two leaf queues were named " + queueName |
| + ". Leaf queue names must be distinct"); |
| } |
| queues.put(queueName, queue); |
| |
| LOG.info("Initialized queue: " + queue); |
| return queue; |
| } |
| |
| public CSQueue getQueue(String queueName) { |
| if (queueName == null) { |
| return null; |
| } |
| return queues.get(queueName); |
| } |
| |
| private synchronized void addApplicationOnRecovery( |
| ApplicationId applicationId, String queueName, String user, |
| Priority priority) { |
| CSQueue queue = getQueue(queueName); |
| if (queue == null) { |
| //During a restart, this indicates a queue was removed, which is |
| //not presently supported |
| if (!YarnConfiguration.shouldRMFailFast(getConfig())) { |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppEvent(applicationId, RMAppEventType.KILL, |
| "Application killed on recovery as it was submitted to queue " + |
| queueName + " which no longer exists after restart.")); |
| return; |
| } else { |
| String queueErrorMsg = "Queue named " + queueName |
| + " missing during application recovery." |
| + " Queue removal during recovery is not presently supported by the" |
| + " capacity scheduler, please restart with all queues configured" |
| + " which were present before shutdown/restart."; |
| LOG.fatal(queueErrorMsg); |
| throw new QueueInvalidException(queueErrorMsg); |
| } |
| } |
| if (!(queue instanceof LeafQueue)) { |
| // During RM restart, this means leaf queue was converted to a parent |
| // queue, which is not supported for running apps. |
| if (!YarnConfiguration.shouldRMFailFast(getConfig())) { |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppEvent(applicationId, RMAppEventType.KILL, |
| "Application killed on recovery as it was submitted to queue " + |
| queueName + " which is no longer a leaf queue after restart.")); |
| return; |
| } else { |
| String queueErrorMsg = "Queue named " + queueName |
| + " is no longer a leaf queue during application recovery." |
| + " Changing a leaf queue to a parent queue during recovery is" |
| + " not presently supported by the capacity scheduler. Please" |
| + " restart with leaf queues before shutdown/restart continuing" |
| + " as leaf queues."; |
| LOG.fatal(queueErrorMsg); |
| throw new QueueInvalidException(queueErrorMsg); |
| } |
| } |
| // Submit to the queue |
| try { |
| queue.submitApplication(applicationId, user, queueName); |
| } catch (AccessControlException ace) { |
| // Ignore the exception for recovered app as the app was previously |
| // accepted. |
| } |
| queue.getMetrics().submitApp(user); |
| SchedulerApplication<FiCaSchedulerApp> application = |
| new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority); |
| applications.put(applicationId, application); |
| LOG.info("Accepted application " + applicationId + " from user: " + user |
| + ", in queue: " + queueName); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); |
| } |
| } |
| |
| private synchronized void addApplication(ApplicationId applicationId, |
| String queueName, String user, Priority priority) { |
| // Sanity checks. |
| CSQueue queue = getQueue(queueName); |
| if (queue == null) { |
| String message = "Application " + applicationId + |
| " submitted by user " + user + " to unknown queue: " + queueName; |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, |
| RMAppEventType.APP_REJECTED, message)); |
| return; |
| } |
| if (!(queue instanceof LeafQueue)) { |
| String message = "Application " + applicationId + |
| " submitted by user " + user + " to non-leaf queue: " + queueName; |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, |
| RMAppEventType.APP_REJECTED, message)); |
| return; |
| } |
| // Submit to the queue |
| try { |
| queue.submitApplication(applicationId, user, queueName); |
| } catch (AccessControlException ace) { |
| LOG.info("Failed to submit application " + applicationId + " to queue " |
| + queueName + " from user " + user, ace); |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, |
| RMAppEventType.APP_REJECTED, ace.toString())); |
| return; |
| } |
| // update the metrics |
| queue.getMetrics().submitApp(user); |
| SchedulerApplication<FiCaSchedulerApp> application = |
| new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority); |
| applications.put(applicationId, application); |
| LOG.info("Accepted application " + applicationId + " from user: " + user |
| + ", in queue: " + queueName); |
| rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); |
| } |
| |
| private synchronized void addApplicationAttempt( |
| ApplicationAttemptId applicationAttemptId, |
| boolean transferStateFromPreviousAttempt, |
| boolean isAttemptRecovering) { |
| SchedulerApplication<FiCaSchedulerApp> application = |
| applications.get(applicationAttemptId.getApplicationId()); |
| if (application == null) { |
| LOG.warn("Application " + applicationAttemptId.getApplicationId() + |
| " cannot be found in scheduler."); |
| return; |
| } |
| CSQueue queue = (CSQueue) application.getQueue(); |
| |
| FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, |
| application.getUser(), queue, queue.getActiveUsersManager(), rmContext, |
| application.getPriority(), isAttemptRecovering); |
| if (transferStateFromPreviousAttempt) { |
| attempt.transferStateFromPreviousAttempt( |
| application.getCurrentAppAttempt()); |
| } |
| application.setCurrentAppAttempt(attempt); |
| |
| // Update attempt priority to the latest to avoid race condition i.e |
| // SchedulerApplicationAttempt is created with old priority but it is not |
| // set to SchedulerApplication#setCurrentAppAttempt. |
| // Scenario would occur is |
| // 1. SchdulerApplicationAttempt is created with old priority. |
| // 2. updateApplicationPriority() updates SchedulerApplication. Since |
| // currentAttempt is null, it just return. |
| // 3. ScheduelerApplcationAttempt is set in |
| // SchedulerApplication#setCurrentAppAttempt. |
| attempt.setPriority(application.getPriority()); |
| |
| queue.submitApplicationAttempt(attempt, application.getUser()); |
| LOG.info("Added Application Attempt " + applicationAttemptId |
| + " to scheduler from user " + application.getUser() + " in queue " |
| + queue.getQueueName()); |
| if (isAttemptRecovering) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(applicationAttemptId |
| + " is recovering. Skipping notifying ATTEMPT_ADDED"); |
| } |
| } else { |
| rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppAttemptEvent(applicationAttemptId, |
| RMAppAttemptEventType.ATTEMPT_ADDED)); |
| } |
| } |
| |
| private synchronized void doneApplication(ApplicationId applicationId, |
| RMAppState finalState) { |
| SchedulerApplication<FiCaSchedulerApp> application = |
| applications.get(applicationId); |
| if (application == null){ |
| // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, |
| // ignore it. |
| LOG.warn("Couldn't find application " + applicationId); |
| return; |
| } |
| CSQueue queue = (CSQueue) application.getQueue(); |
| if (!(queue instanceof LeafQueue)) { |
| LOG.error("Cannot finish application " + "from non-leaf queue: " |
| + queue.getQueueName()); |
| } else { |
| queue.finishApplication(applicationId, application.getUser()); |
| } |
| application.stop(finalState); |
| applications.remove(applicationId); |
| } |
| |
| private synchronized void doneApplicationAttempt( |
| ApplicationAttemptId applicationAttemptId, |
| RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { |
| LOG.info("Application Attempt " + applicationAttemptId + " is done." + |
| " finalState=" + rmAppAttemptFinalState); |
| |
| FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); |
| SchedulerApplication<FiCaSchedulerApp> application = |
| applications.get(applicationAttemptId.getApplicationId()); |
| |
| if (application == null || attempt == null) { |
| LOG.info("Unknown application " + applicationAttemptId + " has completed!"); |
| return; |
| } |
| |
| // Release all the allocated, acquired, running containers |
| for (RMContainer rmContainer : attempt.getLiveContainers()) { |
| if (keepContainers |
| && rmContainer.getState().equals(RMContainerState.RUNNING)) { |
| // do not kill the running container in the case of work-preserving AM |
| // restart. |
| LOG.info("Skip killing " + rmContainer.getContainerId()); |
| continue; |
| } |
| super.completedContainer( |
| rmContainer, |
| SchedulerUtils.createAbnormalContainerStatus( |
| rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), |
| RMContainerEventType.KILL); |
| } |
| |
| // Release all reserved containers |
| for (RMContainer rmContainer : attempt.getReservedContainers()) { |
| super.completedContainer( |
| rmContainer, |
| SchedulerUtils.createAbnormalContainerStatus( |
| rmContainer.getContainerId(), "Application Complete"), |
| RMContainerEventType.KILL); |
| } |
| |
| // Clean up pending requests, metrics etc. |
| attempt.stop(rmAppAttemptFinalState); |
| |
| // Inform the queue |
| String queueName = attempt.getQueue().getQueueName(); |
| CSQueue queue = queues.get(queueName); |
| if (!(queue instanceof LeafQueue)) { |
| LOG.error("Cannot finish application " + "from non-leaf queue: " |
| + queueName); |
| } else { |
| queue.finishApplicationAttempt(attempt, queue.getQueueName()); |
| } |
| } |
| |
| // It is crucial to acquire leaf queue lock first to prevent: |
| // 1. Race condition when calculating the delta resource in |
| // SchedContainerChangeRequest |
| // 2. Deadlock with the scheduling thread. |
| private LeafQueue updateIncreaseRequests( |
| List<ContainerResourceChangeRequest> increaseRequests, |
| FiCaSchedulerApp app) { |
| if (null == increaseRequests || increaseRequests.isEmpty()) { |
| return null; |
| } |
| // Pre-process increase requests |
| List<SchedContainerChangeRequest> schedIncreaseRequests = |
| createSchedContainerChangeRequests(increaseRequests, true); |
| LeafQueue leafQueue = (LeafQueue) app.getQueue(); |
| synchronized(leafQueue) { |
| // make sure we aren't stopping/removing the application |
| // when the allocate comes in |
| if (app.isStopped()) { |
| return null; |
| } |
| // Process increase resource requests |
| if (app.updateIncreaseRequests(schedIncreaseRequests)) { |
| return leafQueue; |
| } |
| return null; |
| } |
| } |
| |
| @Override |
| @Lock(Lock.NoLock.class) |
| public Allocation allocate(ApplicationAttemptId applicationAttemptId, |
| List<ResourceRequest> ask, List<ContainerId> release, |
| List<String> blacklistAdditions, List<String> blacklistRemovals, |
| List<ContainerResourceChangeRequest> increaseRequests, |
| List<ContainerResourceChangeRequest> decreaseRequests) { |
| |
| FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); |
| if (application == null) { |
| return EMPTY_ALLOCATION; |
| } |
| |
| // Release containers |
| releaseContainers(release, application); |
| |
| // update increase requests |
| LeafQueue updateDemandForQueue = |
| updateIncreaseRequests(increaseRequests, application); |
| |
| // Decrease containers |
| decreaseContainers(decreaseRequests, application); |
| |
| // Sanity check for new allocation requests |
| SchedulerUtils.normalizeRequests( |
| ask, getResourceCalculator(), getClusterResource(), |
| getMinimumResourceCapability(), getMaximumResourceCapability()); |
| |
| Allocation allocation; |
| |
| synchronized (application) { |
| |
| // make sure we aren't stopping/removing the application |
| // when the allocate comes in |
| if (application.isStopped()) { |
| return EMPTY_ALLOCATION; |
| } |
| |
| // Process resource requests |
| if (!ask.isEmpty()) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("allocate: pre-update " + applicationAttemptId + |
| " ask size =" + ask.size()); |
| application.showRequests(); |
| } |
| |
| // Update application requests |
| if (application.updateResourceRequests(ask) |
| && (updateDemandForQueue == null)) { |
| updateDemandForQueue = (LeafQueue) application.getQueue(); |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("allocate: post-update"); |
| application.showRequests(); |
| } |
| } |
| |
| if (application.isWaitingForAMContainer()) { |
| // Allocate is for AM and update AM blacklist for this |
| application.updateAMBlacklist( |
| blacklistAdditions, blacklistRemovals); |
| } else { |
| application.updateBlacklist(blacklistAdditions, blacklistRemovals); |
| } |
| |
| |
| allocation = application.getAllocation(getResourceCalculator(), |
| getClusterResource(), getMinimumResourceCapability()); |
| } |
| |
| if (updateDemandForQueue != null && !application |
| .isWaitingForAMContainer()) { |
| updateDemandForQueue.getOrderingPolicy().demandUpdated(application); |
| } |
| |
| return allocation; |
| |
| } |
| |
| @Override |
| @Lock(Lock.NoLock.class) |
| public QueueInfo getQueueInfo(String queueName, |
| boolean includeChildQueues, boolean recursive) |
| throws IOException { |
| CSQueue queue = null; |
| queue = this.queues.get(queueName); |
| if (queue == null) { |
| throw new IOException("Unknown queue: " + queueName); |
| } |
| return queue.getQueueInfo(includeChildQueues, recursive); |
| } |
| |
| @Override |
| @Lock(Lock.NoLock.class) |
| public List<QueueUserACLInfo> getQueueUserAclInfo() { |
| UserGroupInformation user = null; |
| try { |
| user = UserGroupInformation.getCurrentUser(); |
| } catch (IOException ioe) { |
| // should never happen |
| return new ArrayList<QueueUserACLInfo>(); |
| } |
| |
| return root.getQueueUserAclInfo(user); |
| } |
| |
| private synchronized void nodeUpdate(RMNode nm) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("nodeUpdate: " + nm + |
| " clusterResources: " + getClusterResource()); |
| } |
| |
| Resource releaseResources = Resource.newInstance(0, 0); |
| |
| FiCaSchedulerNode node = getNode(nm.getNodeID()); |
| |
| List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); |
| List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); |
| List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); |
| for(UpdatedContainerInfo containerInfo : containerInfoList) { |
| newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); |
| completedContainers.addAll(containerInfo.getCompletedContainers()); |
| } |
| |
| // Processing the newly launched containers |
| for (ContainerStatus launchedContainer : newlyLaunchedContainers) { |
| containerLaunchedOnNode(launchedContainer.getContainerId(), node); |
| } |
| |
| // Processing the newly increased containers |
| List<Container> newlyIncreasedContainers = |
| nm.pullNewlyIncreasedContainers(); |
| for (Container container : newlyIncreasedContainers) { |
| containerIncreasedOnNode(container.getId(), node, container); |
| } |
| |
| // Process completed containers |
| int releasedContainers = 0; |
| for (ContainerStatus completedContainer : completedContainers) { |
| ContainerId containerId = completedContainer.getContainerId(); |
| RMContainer container = getRMContainer(containerId); |
| super.completedContainer(container, completedContainer, |
| RMContainerEventType.FINISHED); |
| if (container != null) { |
| releasedContainers++; |
| Resource rs = container.getAllocatedResource(); |
| if (rs != null) { |
| Resources.addTo(releaseResources, rs); |
| } |
| rs = container.getReservedResource(); |
| if (rs != null) { |
| Resources.addTo(releaseResources, rs); |
| } |
| } |
| } |
| |
| // If the node is decommissioning, send an update to have the total |
| // resource equal to the used resource, so no available resource to |
| // schedule. |
| // TODO: Fix possible race-condition when request comes in before |
| // update is propagated |
| if (nm.getState() == NodeState.DECOMMISSIONING) { |
| this.rmContext |
| .getDispatcher() |
| .getEventHandler() |
| .handle( |
| new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption |
| .newInstance(getSchedulerNode(nm.getNodeID()) |
| .getAllocatedResource(), 0))); |
| } |
| schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, |
| releaseResources); |
| schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); |
| |
| // Updating node resource utilization |
| node.setAggregatedContainersUtilization( |
| nm.getAggregatedContainersUtilization()); |
| node.setNodeUtilization(nm.getNodeUtilization()); |
| |
| // Now node data structures are upto date and ready for scheduling. |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Node being looked for scheduling " + nm + |
| " availableResource: " + node.getUnallocatedResource()); |
| } |
| } |
| |
| /** |
| * Process resource update on a node. |
| */ |
| private synchronized void updateNodeAndQueueResource(RMNode nm, |
| ResourceOption resourceOption) { |
| updateNodeResource(nm, resourceOption); |
| Resource clusterResource = getClusterResource(); |
| root.updateClusterResource(clusterResource, new ResourceLimits( |
| clusterResource)); |
| } |
| |
| /** |
| * Process node labels update on a node. |
| */ |
| private synchronized void updateLabelsOnNode(NodeId nodeId, |
| Set<String> newLabels) { |
| FiCaSchedulerNode node = nodeTracker.getNode(nodeId); |
| if (null == node) { |
| return; |
| } |
| |
| // Get new partition, we have only one partition per node |
| String newPartition; |
| if (newLabels.isEmpty()) { |
| newPartition = RMNodeLabelsManager.NO_LABEL; |
| } else { |
| newPartition = newLabels.iterator().next(); |
| } |
| |
| // old partition as well |
| String oldPartition = node.getPartition(); |
| |
| // Update resources of these containers |
| for (RMContainer rmContainer : node.getRunningContainers()) { |
| FiCaSchedulerApp application = |
| getApplicationAttempt(rmContainer.getApplicationAttemptId()); |
| if (null != application) { |
| application.nodePartitionUpdated(rmContainer, oldPartition, |
| newPartition); |
| } else { |
| LOG.warn("There's something wrong, some RMContainers running on" |
| + " a node, but we cannot find SchedulerApplicationAttempt for it. Node=" |
| + node.getNodeID() + " applicationAttemptId=" |
| + rmContainer.getApplicationAttemptId()); |
| continue; |
| } |
| } |
| |
| // Unreserve container on this node |
| RMContainer reservedContainer = node.getReservedContainer(); |
| if (null != reservedContainer) { |
| killReservedContainer(reservedContainer); |
| } |
| |
| // Update node labels after we've done this |
| node.updateLabels(newLabels); |
| } |
| |
| private void updateSchedulerHealth(long now, FiCaSchedulerNode node, |
| CSAssignment assignment) { |
| |
| NodeId nodeId = node.getNodeID(); |
| List<AssignmentInformation.AssignmentDetails> allocations = |
| assignment.getAssignmentInformation().getAllocationDetails(); |
| List<AssignmentInformation.AssignmentDetails> reservations = |
| assignment.getAssignmentInformation().getReservationDetails(); |
| if (!allocations.isEmpty()) { |
| ContainerId allocatedContainerId = |
| allocations.get(allocations.size() - 1).containerId; |
| String allocatedQueue = allocations.get(allocations.size() - 1).queue; |
| schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId, |
| allocatedQueue); |
| } |
| if (!reservations.isEmpty()) { |
| ContainerId reservedContainerId = |
| reservations.get(reservations.size() - 1).containerId; |
| String reservedQueue = reservations.get(reservations.size() - 1).queue; |
| schedulerHealth.updateReservation(now, nodeId, reservedContainerId, |
| reservedQueue); |
| } |
| schedulerHealth.updateSchedulerReservationCounts(assignment |
| .getAssignmentInformation().getNumReservations()); |
| schedulerHealth.updateSchedulerAllocationCounts(assignment |
| .getAssignmentInformation().getNumAllocations()); |
| schedulerHealth.updateSchedulerRunDetails(now, assignment |
| .getAssignmentInformation().getAllocated(), assignment |
| .getAssignmentInformation().getReserved()); |
| } |
| |
| @VisibleForTesting |
| protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { |
| if (rmContext.isWorkPreservingRecoveryEnabled() |
| && !rmContext.isSchedulerReadyForAllocatingContainers()) { |
| return; |
| } |
| // reset allocation and reservation stats before we start doing any work |
| updateSchedulerHealth(lastNodeUpdateTime, node, |
| new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); |
| |
| CSAssignment assignment; |
| |
| // Assign new containers... |
| // 1. Check for reserved applications |
| // 2. Schedule if there are no reservations |
| |
| RMContainer reservedContainer = node.getReservedContainer(); |
| if (reservedContainer != null) { |
| FiCaSchedulerApp reservedApplication = |
| getCurrentAttemptForContainer(reservedContainer.getContainerId()); |
| |
| // Try to fulfill the reservation |
| LOG.info("Trying to fulfill reservation for application " |
| + reservedApplication.getApplicationId() + " on node: " |
| + node.getNodeID()); |
| |
| LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); |
| assignment = |
| queue.assignContainers( |
| getClusterResource(), |
| node, |
| // TODO, now we only consider limits for parent for non-labeled |
| // resources, should consider labeled resources as well. |
| new ResourceLimits(labelManager.getResourceByLabel( |
| RMNodeLabelsManager.NO_LABEL, getClusterResource())), |
| SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); |
| if (assignment.isFulfilledReservation()) { |
| CSAssignment tmp = |
| new CSAssignment(reservedContainer.getReservedResource(), |
| assignment.getType()); |
| Resources.addTo(assignment.getAssignmentInformation().getAllocated(), |
| reservedContainer.getReservedResource()); |
| tmp.getAssignmentInformation().addAllocationDetails( |
| reservedContainer.getContainerId(), queue.getQueuePath()); |
| tmp.getAssignmentInformation().incrAllocations(); |
| updateSchedulerHealth(lastNodeUpdateTime, node, tmp); |
| schedulerHealth.updateSchedulerFulfilledReservationCounts(1); |
| } |
| } |
| |
| // Try to schedule more if there are no reservations to fulfill |
| if (node.getReservedContainer() == null) { |
| if (calculator.computeAvailableContainers(node.getUnallocatedResource(), |
| minimumAllocation) > 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to schedule on node: " + node.getNodeName() + |
| ", available: " + node.getUnallocatedResource()); |
| } |
| |
| assignment = root.assignContainers( |
| getClusterResource(), |
| node, |
| // TODO, now we only consider limits for parent for non-labeled |
| // resources, should consider labeled resources as well. |
| new ResourceLimits(labelManager.getResourceByLabel( |
| RMNodeLabelsManager.NO_LABEL, getClusterResource())), |
| SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); |
| if (Resources.greaterThan(calculator, getClusterResource(), |
| assignment.getResource(), Resources.none())) { |
| updateSchedulerHealth(lastNodeUpdateTime, node, assignment); |
| return; |
| } |
| |
| // Only do non-exclusive allocation when node has node-labels. |
| if (StringUtils.equals(node.getPartition(), |
| RMNodeLabelsManager.NO_LABEL)) { |
| return; |
| } |
| |
| // Only do non-exclusive allocation when the node-label supports that |
| try { |
| if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( |
| node.getPartition())) { |
| return; |
| } |
| } catch (IOException e) { |
| LOG.warn("Exception when trying to get exclusivity of node label=" |
| + node.getPartition(), e); |
| return; |
| } |
| |
| // Try to use NON_EXCLUSIVE |
| assignment = root.assignContainers( |
| getClusterResource(), |
| node, |
| // TODO, now we only consider limits for parent for non-labeled |
| // resources, should consider labeled resources as well. |
| new ResourceLimits(labelManager.getResourceByLabel( |
| RMNodeLabelsManager.NO_LABEL, getClusterResource())), |
| SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); |
| updateSchedulerHealth(lastNodeUpdateTime, node, assignment); |
| } |
| } else { |
| LOG.info("Skipping scheduling since node " |
| + node.getNodeID() |
| + " is reserved by application " |
| + node.getReservedContainer().getContainerId() |
| .getApplicationAttemptId()); |
| } |
| } |
| |
| @Override |
| public void handle(SchedulerEvent event) { |
| switch(event.getType()) { |
| case NODE_ADDED: |
| { |
| NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; |
| addNode(nodeAddedEvent.getAddedRMNode()); |
| recoverContainersOnNode(nodeAddedEvent.getContainerReports(), |
| nodeAddedEvent.getAddedRMNode()); |
| } |
| break; |
| case NODE_REMOVED: |
| { |
| NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; |
| removeNode(nodeRemovedEvent.getRemovedRMNode()); |
| } |
| break; |
| case NODE_RESOURCE_UPDATE: |
| { |
| NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = |
| (NodeResourceUpdateSchedulerEvent)event; |
| updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(), |
| nodeResourceUpdatedEvent.getResourceOption()); |
| } |
| break; |
| case NODE_LABELS_UPDATE: |
| { |
| NodeLabelsUpdateSchedulerEvent labelUpdateEvent = |
| (NodeLabelsUpdateSchedulerEvent) event; |
| |
| for (Entry<NodeId, Set<String>> entry : labelUpdateEvent |
| .getUpdatedNodeToLabels().entrySet()) { |
| NodeId id = entry.getKey(); |
| Set<String> labels = entry.getValue(); |
| updateLabelsOnNode(id, labels); |
| } |
| } |
| break; |
| case NODE_UPDATE: |
| { |
| NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; |
| RMNode node = nodeUpdatedEvent.getRMNode(); |
| setLastNodeUpdateTime(Time.now()); |
| nodeUpdate(node); |
| if (!scheduleAsynchronously) { |
| allocateContainersToNode(getNode(node.getNodeID())); |
| } |
| } |
| break; |
| case APP_ADDED: |
| { |
| AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; |
| String queueName = resolveReservationQueueName(appAddedEvent.getQueue(), |
| appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(), |
| appAddedEvent.getIsAppRecovering()); |
| if (queueName != null) { |
| if (!appAddedEvent.getIsAppRecovering()) { |
| addApplication(appAddedEvent.getApplicationId(), queueName, |
| appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); |
| } else { |
| addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, |
| appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); |
| } |
| } |
| } |
| break; |
| case APP_REMOVED: |
| { |
| AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; |
| doneApplication(appRemovedEvent.getApplicationID(), |
| appRemovedEvent.getFinalState()); |
| } |
| break; |
| case APP_ATTEMPT_ADDED: |
| { |
| AppAttemptAddedSchedulerEvent appAttemptAddedEvent = |
| (AppAttemptAddedSchedulerEvent) event; |
| addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), |
| appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), |
| appAttemptAddedEvent.getIsAttemptRecovering()); |
| } |
| break; |
| case APP_ATTEMPT_REMOVED: |
| { |
| AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = |
| (AppAttemptRemovedSchedulerEvent) event; |
| doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(), |
| appAttemptRemovedEvent.getFinalAttemptState(), |
| appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); |
| } |
| break; |
| case CONTAINER_EXPIRED: |
| { |
| ContainerExpiredSchedulerEvent containerExpiredEvent = |
| (ContainerExpiredSchedulerEvent) event; |
| ContainerId containerId = containerExpiredEvent.getContainerId(); |
| if (containerExpiredEvent.isIncrease()) { |
| rollbackContainerResource(containerId); |
| } else { |
| completedContainer(getRMContainer(containerId), |
| SchedulerUtils.createAbnormalContainerStatus( |
| containerId, |
| SchedulerUtils.EXPIRED_CONTAINER), |
| RMContainerEventType.EXPIRE); |
| } |
| } |
| break; |
| case KILL_RESERVED_CONTAINER: |
| { |
| ContainerPreemptEvent killReservedContainerEvent = |
| (ContainerPreemptEvent) event; |
| RMContainer container = killReservedContainerEvent.getContainer(); |
| killReservedContainer(container); |
| } |
| break; |
| case MARK_CONTAINER_FOR_PREEMPTION: |
| { |
| ContainerPreemptEvent preemptContainerEvent = |
| (ContainerPreemptEvent)event; |
| ApplicationAttemptId aid = preemptContainerEvent.getAppId(); |
| RMContainer containerToBePreempted = preemptContainerEvent.getContainer(); |
| markContainerForPreemption(aid, containerToBePreempted); |
| } |
| break; |
| case KILL_PREEMPTED_CONTAINER: |
| { |
| ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; |
| RMContainer containerToBeKilled = killContainerEvent.getContainer(); |
| killPreemptedContainer(containerToBeKilled); |
| } |
| break; |
| default: |
| LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); |
| } |
| } |
| |
| private synchronized void addNode(RMNode nodeManager) { |
| FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, |
| usePortForNodeName, nodeManager.getNodeLabels()); |
| nodeTracker.addNode(schedulerNode); |
| |
| // update this node to node label manager |
| if (labelManager != null) { |
| labelManager.activateNode(nodeManager.getNodeID(), |
| schedulerNode.getTotalResource()); |
| } |
| |
| Resource clusterResource = getClusterResource(); |
| root.updateClusterResource(clusterResource, new ResourceLimits( |
| clusterResource)); |
| |
| LOG.info("Added node " + nodeManager.getNodeAddress() + |
| " clusterResource: " + clusterResource); |
| |
| if (scheduleAsynchronously && getNumClusterNodes() == 1) { |
| asyncSchedulerThread.beginSchedule(); |
| } |
| } |
| |
| private synchronized void removeNode(RMNode nodeInfo) { |
| // update this node to node label manager |
| if (labelManager != null) { |
| labelManager.deactivateNode(nodeInfo.getNodeID()); |
| } |
| |
| NodeId nodeId = nodeInfo.getNodeID(); |
| FiCaSchedulerNode node = nodeTracker.getNode(nodeId); |
| if (node == null) { |
| LOG.error("Attempting to remove non-existent node " + nodeId); |
| return; |
| } |
| |
| // Remove running containers |
| List<RMContainer> runningContainers = node.getRunningContainers(); |
| for (RMContainer container : runningContainers) { |
| super.completedContainer(container, |
| SchedulerUtils.createAbnormalContainerStatus( |
| container.getContainerId(), |
| SchedulerUtils.LOST_CONTAINER), |
| RMContainerEventType.KILL); |
| } |
| |
| // Remove reservations, if any |
| RMContainer reservedContainer = node.getReservedContainer(); |
| if (reservedContainer != null) { |
| super.completedContainer(reservedContainer, |
| SchedulerUtils.createAbnormalContainerStatus( |
| reservedContainer.getContainerId(), |
| SchedulerUtils.LOST_CONTAINER), |
| RMContainerEventType.KILL); |
| } |
| |
| nodeTracker.removeNode(nodeId); |
| Resource clusterResource = getClusterResource(); |
| root.updateClusterResource(clusterResource, new ResourceLimits( |
| clusterResource)); |
| int numNodes = nodeTracker.nodeCount(); |
| |
| if (scheduleAsynchronously && numNodes == 0) { |
| asyncSchedulerThread.suspendSchedule(); |
| } |
| |
| LOG.info("Removed node " + nodeInfo.getNodeAddress() + |
| " clusterResource: " + getClusterResource()); |
| } |
| |
| private void rollbackContainerResource( |
| ContainerId containerId) { |
| RMContainer rmContainer = getRMContainer(containerId); |
| if (rmContainer == null) { |
| LOG.info("Cannot rollback resource for container " + containerId + |
| ". The container does not exist."); |
| return; |
| } |
| FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); |
| if (application == null) { |
| LOG.info("Cannot rollback resource for container " + containerId + |
| ". The application that the container belongs to does not exist."); |
| return; |
| } |
| LOG.info("Roll back resource for container " + containerId); |
| LeafQueue leafQueue = (LeafQueue) application.getQueue(); |
| synchronized(leafQueue) { |
| SchedulerNode schedulerNode = |
| getSchedulerNode(rmContainer.getAllocatedNode()); |
| SchedContainerChangeRequest decreaseRequest = |
| new SchedContainerChangeRequest(this.rmContext, schedulerNode, |
| rmContainer, rmContainer.getLastConfirmedResource()); |
| decreaseContainer(decreaseRequest, application); |
| } |
| } |
| |
| @Override |
| protected void completedContainerInternal( |
| RMContainer rmContainer, ContainerStatus containerStatus, |
| RMContainerEventType event) { |
| |
| Container container = rmContainer.getContainer(); |
| |
| // Get the application for the finished container |
| FiCaSchedulerApp application = |
| getCurrentAttemptForContainer(container.getId()); |
| ApplicationId appId = |
| container.getId().getApplicationAttemptId().getApplicationId(); |
| if (application == null) { |
| LOG.info("Container " + container + " of" + " finished application " |
| + appId + " completed with event " + event); |
| return; |
| } |
| |
| // Get the node on which the container was allocated |
| FiCaSchedulerNode node = getNode(container.getNodeId()); |
| |
| // Inform the queue |
| LeafQueue queue = (LeafQueue)application.getQueue(); |
| queue.completedContainer(getClusterResource(), application, node, |
| rmContainer, containerStatus, event, null, true); |
| |
| if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { |
| schedulerHealth.updatePreemption(Time.now(), container.getNodeId(), |
| container.getId(), queue.getQueuePath()); |
| schedulerHealth.updateSchedulerPreemptionCounts(1); |
| } else { |
| schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(), |
| container.getId(), queue.getQueuePath()); |
| } |
| } |
| |
| @Override |
| protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest, |
| SchedulerApplicationAttempt attempt) { |
| RMContainer rmContainer = decreaseRequest.getRMContainer(); |
| // Check container status before doing decrease |
| if (rmContainer.getState() != RMContainerState.RUNNING) { |
| LOG.info("Trying to decrease a container not in RUNNING state, container=" |
| + rmContainer + " state=" + rmContainer.getState().name()); |
| return; |
| } |
| FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; |
| LeafQueue queue = (LeafQueue) attempt.getQueue(); |
| try { |
| queue.decreaseContainer(getClusterResource(), decreaseRequest, app); |
| // Notify RMNode that the container can be pulled by NodeManager in the |
| // next heartbeat |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMNodeDecreaseContainerEvent( |
| decreaseRequest.getNodeId(), |
| Collections.singletonList(rmContainer.getContainer()))); |
| } catch (InvalidResourceRequestException e) { |
| LOG.warn("Error happens when checking decrease request, Ignoring.." |
| + " exception=", e); |
| } |
| } |
| |
| @Lock(Lock.NoLock.class) |
| @VisibleForTesting |
| @Override |
| public FiCaSchedulerApp getApplicationAttempt( |
| ApplicationAttemptId applicationAttemptId) { |
| return super.getApplicationAttempt(applicationAttemptId); |
| } |
| |
| @Lock(Lock.NoLock.class) |
| public FiCaSchedulerNode getNode(NodeId nodeId) { |
| return nodeTracker.getNode(nodeId); |
| } |
| |
| @Override |
| @Lock(Lock.NoLock.class) |
| public void recover(RMState state) throws Exception { |
| // NOT IMPLEMENTED |
| } |
| |
| @Override |
| public void killReservedContainer(RMContainer container) { |
| if(LOG.isDebugEnabled()){ |
| LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":" |
| + container.toString()); |
| } |
| // To think: What happens if this is no longer a reserved container, for |
| // e.g if the reservation became an allocation. |
| super.completedContainer(container, |
| SchedulerUtils.createAbnormalContainerStatus( |
| container.getContainerId(), |
| SchedulerUtils.UNRESERVED_CONTAINER), |
| RMContainerEventType.KILL); |
| } |
| |
| @Override |
| public void markContainerForPreemption(ApplicationAttemptId aid, |
| RMContainer cont) { |
| if(LOG.isDebugEnabled()){ |
| LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION |
| + ": appAttempt:" + aid.toString() + " container: " |
| + cont.toString()); |
| } |
| FiCaSchedulerApp app = getApplicationAttempt(aid); |
| if (app != null) { |
| app.markContainerForPreemption(cont.getContainerId()); |
| } |
| } |
| |
| @Override |
| public void killPreemptedContainer(RMContainer cont) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" |
| + cont.toString()); |
| } |
| super.completedContainer(cont, SchedulerUtils |
| .createPreemptedContainerStatus(cont.getContainerId(), |
| SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); |
| } |
| |
| @Override |
| public synchronized boolean checkAccess(UserGroupInformation callerUGI, |
| QueueACL acl, String queueName) { |
| CSQueue queue = getQueue(queueName); |
| if (queue == null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ACL not found for queue access-type " + acl |
| + " for queue " + queueName); |
| } |
| return false; |
| } |
| return queue.hasAccess(acl, callerUGI); |
| } |
| |
| @Override |
| public List<ApplicationAttemptId> getAppsInQueue(String queueName) { |
| CSQueue queue = queues.get(queueName); |
| if (queue == null) { |
| return null; |
| } |
| List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(); |
| queue.collectSchedulerApplications(apps); |
| return apps; |
| } |
| |
| private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( |
| Configuration configuration) throws IOException { |
| try { |
| InputStream CSInputStream = |
| this.rmContext.getConfigurationProvider() |
| .getConfigurationInputStream(configuration, |
| YarnConfiguration.CS_CONFIGURATION_FILE); |
| if (CSInputStream != null) { |
| configuration.addResource(CSInputStream); |
| return new CapacitySchedulerConfiguration(configuration, false); |
| } |
| return new CapacitySchedulerConfiguration(configuration, true); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private String getDefaultReservationQueueName(String planQueueName) { |
| return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; |
| } |
| |
| private synchronized String resolveReservationQueueName(String queueName, |
| ApplicationId applicationId, ReservationId reservationID, |
| boolean isRecovering) { |
| CSQueue queue = getQueue(queueName); |
| // Check if the queue is a plan queue |
| if ((queue == null) || !(queue instanceof PlanQueue)) { |
| return queueName; |
| } |
| if (reservationID != null) { |
| String resQName = reservationID.toString(); |
| queue = getQueue(resQName); |
| if (queue == null) { |
| // reservation has terminated during failover |
| if (isRecovering |
| && conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) { |
| // move to the default child queue of the plan |
| return getDefaultReservationQueueName(queueName); |
| } |
| String message = |
| "Application " + applicationId |
| + " submitted to a reservation which is not currently active: " |
| + resQName; |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, |
| RMAppEventType.APP_REJECTED, message)); |
| return null; |
| } |
| if (!queue.getParent().getQueueName().equals(queueName)) { |
| String message = |
| "Application: " + applicationId + " submitted to a reservation " |
| + resQName + " which does not belong to the specified queue: " |
| + queueName; |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppEvent(applicationId, |
| RMAppEventType.APP_REJECTED, message)); |
| return null; |
| } |
| // use the reservation queue to run the app |
| queueName = resQName; |
| } else { |
| // use the default child queue of the plan for unreserved apps |
| queueName = getDefaultReservationQueueName(queueName); |
| } |
| return queueName; |
| } |
| |
| @Override |
| public synchronized void removeQueue(String queueName) |
| throws SchedulerDynamicEditException { |
| LOG.info("Removing queue: " + queueName); |
| CSQueue q = this.getQueue(queueName); |
| if (!(q instanceof ReservationQueue)) { |
| throw new SchedulerDynamicEditException("The queue that we are asked " |
| + "to remove (" + queueName + ") is not a ReservationQueue"); |
| } |
| ReservationQueue disposableLeafQueue = (ReservationQueue) q; |
| // at this point we should have no more apps |
| if (disposableLeafQueue.getNumApplications() > 0) { |
| throw new SchedulerDynamicEditException("The queue " + queueName |
| + " is not empty " + disposableLeafQueue.getApplications().size() |
| + " active apps " + disposableLeafQueue.getPendingApplications().size() |
| + " pending apps"); |
| } |
| |
| ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); |
| this.queues.remove(queueName); |
| LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); |
| } |
| |
| @Override |
| public synchronized void addQueue(Queue queue) |
| throws SchedulerDynamicEditException { |
| |
| if (!(queue instanceof ReservationQueue)) { |
| throw new SchedulerDynamicEditException("Queue " + queue.getQueueName() |
| + " is not a ReservationQueue"); |
| } |
| |
| ReservationQueue newQueue = (ReservationQueue) queue; |
| |
| if (newQueue.getParent() == null |
| || !(newQueue.getParent() instanceof PlanQueue)) { |
| throw new SchedulerDynamicEditException("ParentQueue for " |
| + newQueue.getQueueName() |
| + " is not properly set (should be set and be a PlanQueue)"); |
| } |
| |
| PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); |
| String queuename = newQueue.getQueueName(); |
| parentPlan.addChildQueue(newQueue); |
| this.queues.put(queuename, newQueue); |
| LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); |
| } |
| |
| @Override |
| public synchronized void setEntitlement(String inQueue, |
| QueueEntitlement entitlement) throws SchedulerDynamicEditException, |
| YarnException { |
| LeafQueue queue = getAndCheckLeafQueue(inQueue); |
| ParentQueue parent = (ParentQueue) queue.getParent(); |
| |
| if (!(queue instanceof ReservationQueue)) { |
| throw new SchedulerDynamicEditException("Entitlement can not be" |
| + " modified dynamically since queue " + inQueue |
| + " is not a ReservationQueue"); |
| } |
| |
| if (!(parent instanceof PlanQueue)) { |
| throw new SchedulerDynamicEditException("The parent of ReservationQueue " |
| + inQueue + " must be an PlanQueue"); |
| } |
| |
| ReservationQueue newQueue = (ReservationQueue) queue; |
| |
| float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); |
| float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity(); |
| |
| if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { |
| // note: epsilon checks here are not ok, as the epsilons might accumulate |
| // and become a problem in aggregate |
| if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 |
| && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { |
| return; |
| } |
| newQueue.setEntitlement(entitlement); |
| } else { |
| throw new SchedulerDynamicEditException( |
| "Sum of child queues would exceed 100% for PlanQueue: " |
| + parent.getQueueName()); |
| } |
| LOG.info("Set entitlement for ReservationQueue " + inQueue + " to " |
| + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")"); |
| } |
| |
| @Override |
| public synchronized String moveApplication(ApplicationId appId, |
| String targetQueueName) throws YarnException { |
| FiCaSchedulerApp app = |
| getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); |
| String sourceQueueName = app.getQueue().getQueueName(); |
| LeafQueue source = getAndCheckLeafQueue(sourceQueueName); |
| String destQueueName = handleMoveToPlanQueue(targetQueueName); |
| LeafQueue dest = getAndCheckLeafQueue(destQueueName); |
| // Validation check - ACLs, submission limits for user & queue |
| String user = app.getUser(); |
| try { |
| dest.submitApplication(appId, user, destQueueName); |
| } catch (AccessControlException e) { |
| throw new YarnException(e); |
| } |
| // Move all live containers |
| for (RMContainer rmContainer : app.getLiveContainers()) { |
| source.detachContainer(getClusterResource(), app, rmContainer); |
| // attach the Container to another queue |
| dest.attachContainer(getClusterResource(), app, rmContainer); |
| } |
| // Detach the application.. |
| source.finishApplicationAttempt(app, sourceQueueName); |
| source.getParent().finishApplication(appId, app.getUser()); |
| // Finish app & update metrics |
| app.move(dest); |
| // Submit to a new queue |
| dest.submitApplicationAttempt(app, user); |
| applications.get(appId).setQueue(dest); |
| LOG.info("App: " + app.getApplicationId() + " successfully moved from " |
| + sourceQueueName + " to: " + destQueueName); |
| return targetQueueName; |
| } |
| |
| /** |
| * Check that the String provided in input is the name of an existing, |
| * LeafQueue, if successful returns the queue. |
| * |
| * @param queue |
| * @return the LeafQueue |
| * @throws YarnException |
| */ |
| private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { |
| CSQueue ret = this.getQueue(queue); |
| if (ret == null) { |
| throw new YarnException("The specified Queue: " + queue |
| + " doesn't exist"); |
| } |
| if (!(ret instanceof LeafQueue)) { |
| throw new YarnException("The specified Queue: " + queue |
| + " is not a Leaf Queue. Move is supported only for Leaf Queues."); |
| } |
| return (LeafQueue) ret; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() { |
| if (calculator.getClass().getName() |
| .equals(DefaultResourceCalculator.class.getName())) { |
| return EnumSet.of(SchedulerResourceTypes.MEMORY); |
| } |
| return EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); |
| } |
| |
| @Override |
| public Resource getMaximumResourceCapability(String queueName) { |
| CSQueue queue = getQueue(queueName); |
| if (queue == null) { |
| LOG.error("Unknown queue: " + queueName); |
| return getMaximumResourceCapability(); |
| } |
| if (!(queue instanceof LeafQueue)) { |
| LOG.error("queue " + queueName + " is not an leaf queue"); |
| return getMaximumResourceCapability(); |
| } |
| return ((LeafQueue)queue).getMaximumAllocation(); |
| } |
| |
| private String handleMoveToPlanQueue(String targetQueueName) { |
| CSQueue dest = getQueue(targetQueueName); |
| if (dest != null && dest instanceof PlanQueue) { |
| // use the default child reservation queue of the plan |
| targetQueueName = targetQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; |
| } |
| return targetQueueName; |
| } |
| |
| @Override |
| public Set<String> getPlanQueues() { |
| Set<String> ret = new HashSet<String>(); |
| for (Map.Entry<String, CSQueue> l : queues.entrySet()) { |
| if (l.getValue() instanceof PlanQueue) { |
| ret.add(l.getKey()); |
| } |
| } |
| return ret; |
| } |
| |
| public SchedulerHealth getSchedulerHealth() { |
| return this.schedulerHealth; |
| } |
| |
| private void setLastNodeUpdateTime(long time) { |
| this.lastNodeUpdateTime = time; |
| } |
| |
| @Override |
| public Priority checkAndGetApplicationPriority(Priority priorityFromContext, |
| String user, String queueName, ApplicationId applicationId) |
| throws YarnException { |
| Priority appPriority = null; |
| |
| // ToDo: Verify against priority ACLs |
| |
| // Verify the scenario where priority is null from submissionContext. |
| if (null == priorityFromContext) { |
| // Get the default priority for the Queue. If Queue is non-existent, then |
| // use default priority |
| priorityFromContext = getDefaultPriorityForQueue(queueName); |
| |
| LOG.info("Application '" + applicationId |
| + "' is submitted without priority " |
| + "hence considering default queue/cluster priority: " |
| + priorityFromContext.getPriority()); |
| } |
| |
| // Verify whether submitted priority is lesser than max priority |
| // in the cluster. If it is out of found, defining a max cap. |
| if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) { |
| priorityFromContext = Priority |
| .newInstance(getMaxClusterLevelAppPriority().getPriority()); |
| } |
| |
| appPriority = priorityFromContext; |
| |
| LOG.info("Priority '" + appPriority.getPriority() |
| + "' is acceptable in queue : " + queueName + " for application: " |
| + applicationId + " for the user: " + user); |
| |
| return appPriority; |
| } |
| |
| private Priority getDefaultPriorityForQueue(String queueName) { |
| Queue queue = getQueue(queueName); |
| if (null == queue || null == queue.getDefaultApplicationPriority()) { |
| // Return with default application priority |
| return Priority.newInstance(CapacitySchedulerConfiguration |
| .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); |
| } |
| |
| return Priority.newInstance(queue.getDefaultApplicationPriority() |
| .getPriority()); |
| } |
| |
| @Override |
| public void updateApplicationPriority(Priority newPriority, |
| ApplicationId applicationId) throws YarnException { |
| Priority appPriority = null; |
| SchedulerApplication<FiCaSchedulerApp> application = applications |
| .get(applicationId); |
| |
| if (application == null) { |
| throw new YarnException("Application '" + applicationId |
| + "' is not present, hence could not change priority."); |
| } |
| |
| RMApp rmApp = rmContext.getRMApps().get(applicationId); |
| appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), |
| rmApp.getQueue(), applicationId); |
| |
| if (application.getPriority().equals(appPriority)) { |
| return; |
| } |
| |
| // Update new priority in Submission Context to keep track in HA |
| rmApp.getApplicationSubmissionContext().setPriority(appPriority); |
| |
| // Update to state store |
| ApplicationStateData appState = |
| ApplicationStateData.newInstance(rmApp.getSubmitTime(), |
| rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), |
| rmApp.getUser(), rmApp.getCallerContext()); |
| rmContext.getStateStore().updateApplicationStateSynchronously(appState, |
| false); |
| |
| // As we use iterator over a TreeSet for OrderingPolicy, once we change |
| // priority then reinsert back to make order correct. |
| LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); |
| synchronized (queue) { |
| queue.getOrderingPolicy().removeSchedulableEntity( |
| application.getCurrentAppAttempt()); |
| |
| // Update new priority in SchedulerApplication |
| application.setPriority(appPriority); |
| |
| queue.getOrderingPolicy().addSchedulableEntity( |
| application.getCurrentAppAttempt()); |
| } |
| |
| // Update the changed application state to timeline server |
| rmContext.getSystemMetricsPublisher().appUpdated(rmApp, |
| System.currentTimeMillis()); |
| |
| LOG.info("Priority '" + appPriority + "' is updated in queue :" |
| + rmApp.getQueue() + " for application: " + applicationId |
| + " for the user: " + rmApp.getUser()); |
| } |
| } |