blob: 7074059ecf4e1ded168171495115583ee626b7a2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
public abstract class SchedulerNode {
private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
private Resource availableResource = Resource.newInstance(0, 0);
private Resource usedResource = Resource.newInstance(0, 0);
private Resource totalResourceCapability;
private RMContainer reservedContainer;
private volatile int numContainers;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
private final String nodeName;
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
this.totalResourceCapability = Resources.clone(node.getTotalCapability());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
nodeName = rmNode.getHostName();
public RMNode getRMNode() {
return this.rmNode;
* Get the ID of the node which contains both its hostname and port.
* @return the ID of the node
public NodeId getNodeID() {
return this.rmNode.getNodeID();
public String getHttpAddress() {
return this.rmNode.getHttpAddress();
* Get the name of the node for scheduling matching decisions.
* <p/>
* Typically this is the 'hostname' reported by the node, but it could be
* configured to be 'hostname:port' reported by the node via the
* {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
* The main usecase of this is Yarn minicluster to be able to differentiate
* node manager instances by their port number.
* @return name of the node for scheduling matching decisions.
public String getNodeName() {
return nodeName;
* Get rackname.
* @return rackname
public String getRackName() {
return this.rmNode.getRackName();
* The Scheduler has allocated containers on this node to the given
* application.
* @param rmContainer
* allocated container
public synchronized void allocateContainer(RMContainer rmContainer) {
Container container = rmContainer.getContainer();
launchedContainers.put(container.getId(), rmContainer);"Assigned container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress()
+ ", which has " + numContainers + " containers, "
+ getUsedResource() + " used and " + getAvailableResource()
+ " available after allocation");
* Get available resources on the node.
* @return available resources on the node
public synchronized Resource getAvailableResource() {
return this.availableResource;
* Get used resources on the node.
* @return used resources on the node
public synchronized Resource getUsedResource() {
return this.usedResource;
* Get total resources on the node.
* @return total resources on the node.
public Resource getTotalResource() {
return this.totalResourceCapability;
public synchronized boolean isValidContainer(ContainerId containerId) {
if (launchedContainers.containsKey(containerId)) {
return true;
return false;
private synchronized void updateResource(Container container) {
* Release an allocated container on this node.
* @param container
* container to be released
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container.getId())) {
LOG.error("Invalid container released " + container);
/* remove the containers from the nodemanger */
if (null != launchedContainers.remove(container.getId())) {
}"Released container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress()
+ ", which currently has " + numContainers + " containers, "
+ getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
* Reserve container for the attempt on this node.
public abstract void reserveResource(SchedulerApplicationAttempt attempt,
Priority priority, RMContainer container);
* Unreserve resources on this node.
public abstract void unreserveResource(SchedulerApplicationAttempt attempt);
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers="
+ getNumContainers() + " available="
+ getAvailableResource().getMemory() + " used="
+ getUsedResource().getMemory();
* Get number of active containers on the node.
* @return number of active containers on the node
public int getNumContainers() {
return numContainers;
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
protected synchronized void
setReservedContainer(RMContainer reservedContainer) {
this.reservedContainer = reservedContainer;
* Apply delta resource on node's available resource.
* @param deltaResource
* the delta of resource need to apply to node
public synchronized void
applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
public synchronized void recoverContainer(RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {