| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; |
| |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator; |
| |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| /** |
| * This class interacts with the NodeQueueLoadMonitor to keep track of the |
| * mean and standard deviation of the configured metrics (queue length or queue |
| * wait time) used to characterize the queue load of a specific node. |
| * The NodeQueueLoadMonitor triggers an update (by calling the |
| * <code>update()</code> method) every time it performs a re-ordering of |
| * all nodes. |
| */ |
| public class QueueLimitCalculator { |
| |
| class Stats { |
| private final AtomicInteger mean = new AtomicInteger(0); |
| private final AtomicInteger stdev = new AtomicInteger(0); |
| |
| /** |
| * Not thread safe. Caller should synchronize on sorted nodes list. |
| */ |
| void update() { |
| List<NodeId> sortedNodes = nodeSelector.getSortedNodes(); |
| if (sortedNodes.size() > 0) { |
| // Calculate mean |
| int sum = 0; |
| for (NodeId n : sortedNodes) { |
| sum += getMetric(getNode(n)); |
| } |
| mean.set(sum / sortedNodes.size()); |
| |
| // Calculate stdev |
| int sqrSumMean = 0; |
| for (NodeId n : sortedNodes) { |
| int val = getMetric(getNode(n)); |
| sqrSumMean += Math.pow(val - mean.get(), 2); |
| } |
| stdev.set( |
| (int) Math.round(Math.sqrt( |
| sqrSumMean / (float) sortedNodes.size()))); |
| } |
| } |
| |
| private ClusterNode getNode(NodeId nId) { |
| return nodeSelector.getClusterNodes().get(nId); |
| } |
| |
| private int getMetric(ClusterNode cn) { |
| return (cn != null) ? ((LoadComparator)nodeSelector.getComparator()) |
| .getMetric(cn) : 0; |
| } |
| |
| public int getMean() { |
| return mean.get(); |
| } |
| |
| public int getStdev() { |
| return stdev.get(); |
| } |
| } |
| |
| private final NodeQueueLoadMonitor nodeSelector; |
| private final float sigma; |
| private final int rangeMin; |
| private final int rangeMax; |
| private final Stats stats = new Stats(); |
| |
| QueueLimitCalculator(NodeQueueLoadMonitor selector, float sigma, |
| int rangeMin, int rangeMax) { |
| this.nodeSelector = selector; |
| this.sigma = sigma; |
| this.rangeMax = rangeMax; |
| this.rangeMin = rangeMin; |
| } |
| |
| private int determineThreshold() { |
| return (int) (stats.getMean() + sigma * stats.getStdev()); |
| } |
| |
| void update() { |
| this.stats.update(); |
| } |
| |
| private int getThreshold() { |
| int thres = determineThreshold(); |
| return Math.min(rangeMax, Math.max(rangeMin, thres)); |
| } |
| |
| public ContainerQueuingLimit createContainerQueuingLimit() { |
| ContainerQueuingLimit containerQueuingLimit = |
| ContainerQueuingLimit.newInstance(); |
| if (nodeSelector.getComparator() == LoadComparator.QUEUE_WAIT_TIME) { |
| containerQueuingLimit.setMaxQueueWaitTimeInMs(getThreshold()); |
| containerQueuingLimit.setMaxQueueLength(-1); |
| } else { |
| containerQueuingLimit.setMaxQueueWaitTimeInMs(-1); |
| containerQueuingLimit.setMaxQueueLength(getThreshold()); |
| } |
| return containerQueuingLimit; |
| } |
| } |