YARN-6909. Use LightWeightedResource when number of resource types more than two. (Sunil G via wangda)

Change-Id: I90e021c5dea7abd9ec6bd73b2287c8adebe14595
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 92137ad..f7c699f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -31,9 +31,9 @@
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
 import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 /**
@@ -76,34 +76,27 @@
   @Public
   @Stable
   public static Resource newInstance(int memory, int vCores) {
-    if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
-      Resource ret = Records.newRecord(Resource.class);
-      ret.setMemorySize(memory);
-      ret.setVirtualCores(vCores);
-      return ret;
-    }
     return new LightWeightResource(memory, vCores);
   }
 
   @Public
   @Stable
   public static Resource newInstance(long memory, int vCores) {
-    if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
-      Resource ret = Records.newRecord(Resource.class);
-      ret.setMemorySize(memory);
-      ret.setVirtualCores(vCores);
-      return ret;
-    }
     return new LightWeightResource(memory, vCores);
   }
 
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public static Resource newInstance(Resource resource) {
-    Resource ret = Resource.newInstance(resource.getMemorySize(),
-        resource.getVirtualCores());
-    if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
-      Resource.copy(resource, ret);
+    Resource ret;
+    int numberOfKnownResourceTypes = ResourceUtils
+        .getNumberOfKnownResourceTypes();
+    if (numberOfKnownResourceTypes > 2) {
+      ret = new LightWeightResource(resource.getMemorySize(),
+          resource.getVirtualCores(), resource.getResources());
+    } else {
+      ret = new LightWeightResource(resource.getMemorySize(),
+          resource.getVirtualCores());
     }
     return ret;
   }
@@ -428,7 +421,7 @@
     int arrLenOther = otherResources.length;
 
     // compare memory and vcores first(in that order) to preserve
-    // existing behaviour
+    // existing behavior.
     for (int i = 0; i < arrLenThis; i++) {
       ResourceInformation otherEntry;
       try {
@@ -500,4 +493,23 @@
     }
     return Long.valueOf(value).intValue();
   }
+
+  /**
+   * Create ResourceInformation with basic fields.
+   * @param name Resource Type Name
+   * @param unit Default unit of provided resource type
+   * @param value Value associated with giveb resource
+   * @return ResourceInformation object
+   */
+  protected static ResourceInformation newDefaultInformation(String name,
+      String unit, long value) {
+    ResourceInformation ri = new ResourceInformation();
+    ri.setName(name);
+    ri.setValue(value);
+    ri.setResourceType(ResourceTypes.COUNTABLE);
+    ri.setUnitsWithoutValidation(unit);
+    ri.setMinimumAllocation(0);
+    ri.setMaximumAllocation(Long.MAX_VALUE);
+    return ri;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java
index a64d242..34efb55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java
@@ -23,14 +23,13 @@
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
-import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB;
-import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
-import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.*;
 
 /**
  * <p>
- * <code>LightResource</code> extends Resource to handle base resources such
+ * <code>LightWeightResource</code> extends Resource to handle base resources such
  * as memory and CPU.
  * TODO: We have a long term plan to use AbstractResource when additional
  * resource types are to be handled as well.
@@ -66,27 +65,42 @@
   private ResourceInformation memoryResInfo;
   private ResourceInformation vcoresResInfo;
 
-  public LightWeightResource(long memory, long vcores) {
-    this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI,
-        MEMORY_MB.getUnits(), memory);
-    this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI,
-        "", vcores);
+  public LightWeightResource(long memory, int vcores) {
+    int numberOfKnownResourceTypes = ResourceUtils
+        .getNumberOfKnownResourceTypes();
+    initResourceInformations(memory, vcores, numberOfKnownResourceTypes);
 
-    resources = new ResourceInformation[NUM_MANDATORY_RESOURCES];
-    resources[MEMORY_INDEX] = memoryResInfo;
-    resources[VCORES_INDEX] = vcoresResInfo;
+    if (numberOfKnownResourceTypes > 2) {
+      ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
+      for (int i = 2; i < numberOfKnownResourceTypes; i++) {
+        resources[i] = new ResourceInformation();
+        ResourceInformation.copy(types[i], resources[i]);
+      }
+    }
   }
 
-  private static ResourceInformation newDefaultInformation(String name,
-      String unit, long value) {
-    ResourceInformation ri = new ResourceInformation();
-    ri.setName(name);
-    ri.setValue(value);
-    ri.setResourceType(ResourceTypes.COUNTABLE);
-    ri.setUnitsWithoutValidation(unit);
-    ri.setMinimumAllocation(0);
-    ri.setMaximumAllocation(Long.MAX_VALUE);
-    return ri;
+  public LightWeightResource(long memory, int vcores,
+      ResourceInformation[] source) {
+    int numberOfKnownResourceTypes = ResourceUtils
+        .getNumberOfKnownResourceTypes();
+    initResourceInformations(memory, vcores, numberOfKnownResourceTypes);
+
+    for (int i = 2; i < numberOfKnownResourceTypes; i++) {
+      resources[i] = new ResourceInformation();
+      ResourceInformation.copy(source[i], resources[i]);
+    }
+  }
+
+  private void initResourceInformations(long memory, int vcores,
+      int numberOfKnownResourceTypes) {
+    this.memoryResInfo = newDefaultInformation(MEMORY_URI, MEMORY_MB.getUnits(),
+        memory);
+    this.vcoresResInfo = newDefaultInformation(VCORES_URI, VCORES.getUnits(),
+        vcores);
+
+    resources = new ResourceInformation[numberOfKnownResourceTypes];
+    resources[MEMORY_INDEX] = memoryResInfo;
+    resources[VCORES_INDEX] = vcoresResInfo;
   }
 
   @Override
@@ -135,21 +149,41 @@
       return false;
     }
 
+    if (resources.length > 2) {
+      ResourceInformation[] otherVectors = other.getResources();
+
+      if (resources.length != otherVectors.length) {
+        return false;
+      }
+
+      for (int i = 2; i < resources.length; i++) {
+        ResourceInformation a = resources[i];
+        ResourceInformation b = otherVectors[i];
+        if ((a != b) && ((a == null) || !a.equals(b))) {
+          return false;
+        }
+      }
+    }
+
     return true;
   }
 
   @Override
   public int compareTo(Resource other) {
     // compare memory and vcores first(in that order) to preserve
-    // existing behaviour
-    long diff = this.getMemorySize() - other.getMemorySize();
-    if (diff == 0) {
-      return this.getVirtualCores() - other.getVirtualCores();
-    } else if (diff > 0){
-      return 1;
-    } else {
-      return -1;
+    // existing behavior.
+    if (resources.length <= 2) {
+      long diff = this.getMemorySize() - other.getMemorySize();
+      if (diff == 0) {
+        return this.getVirtualCores() - other.getVirtualCores();
+      } else if (diff > 0) {
+        return 1;
+      } else {
+        return -1;
+      }
     }
+
+    return super.compareTo(other);
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
index 4ae64c2..401e0c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
@@ -35,7 +35,6 @@
 
 import java.util.Map;
 
-
 @Private
 @Unstable
 public class ResourcePBImpl extends Resource {
@@ -50,11 +49,14 @@
   static ResourceProto getProto(Resource r) {
     final ResourcePBImpl pb;
     if (r instanceof ResourcePBImpl) {
-      pb = (ResourcePBImpl)r;
+      pb = (ResourcePBImpl) r;
     } else {
       pb = new ResourcePBImpl();
       pb.setMemorySize(r.getMemorySize());
       pb.setVirtualCores(r.getVirtualCores());
+      for(ResourceInformation res : r.getResources()) {
+        pb.setResourceInformation(res.getName(), res);
+      }
     }
     return pb.getProto();
   }
@@ -111,7 +113,7 @@
   @Override
   public void setMemorySize(long memory) {
     maybeInitBuilder();
-    getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory);
+    resources[MEMORY_INDEX].setValue(memory);
   }
 
   @Override
@@ -123,7 +125,7 @@
   @Override
   public void setVirtualCores(int vCores) {
     maybeInitBuilder();
-    getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores);
+    resources[VCORES_INDEX].setValue(vCores);
   }
 
   private void initResources() {
@@ -131,31 +133,51 @@
       return;
     }
     ResourceProtoOrBuilder p = viaProto ? proto : builder;
-    initResourcesMap();
+    ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
     Map<String, Integer> indexMap = ResourceUtils.getResourceTypeIndex();
-    for (ResourceInformationProto entry : p.getResourceValueMapList()) {
-      ResourceTypes type =
-          entry.hasType() ? ProtoUtils.convertFromProtoFormat(entry.getType()) :
-              ResourceTypes.COUNTABLE;
+    resources = new ResourceInformation[types.length];
 
-      // When unit not specified in proto, use the default unit.
-      String units =
-          entry.hasUnits() ? entry.getUnits() : ResourceUtils.getDefaultUnit(
-              entry.getKey());
-      long value = entry.hasValue() ? entry.getValue() : 0L;
-      ResourceInformation ri = ResourceInformation
-          .newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
+    for (ResourceInformationProto entry : p.getResourceValueMapList()) {
       Integer index = indexMap.get(entry.getKey());
       if (index == null) {
-        LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping");
+        LOG.warn("Got unknown resource type: " + entry.getKey() + "; skipping");
       } else {
-        resources[index].setResourceType(ri.getResourceType());
-        resources[index].setUnits(ri.getUnits());
-        resources[index].setValue(value);
+        resources[index] = newDefaultInformation(types[index], entry);
       }
     }
+
+    resources[MEMORY_INDEX] = ResourceInformation
+        .newInstance(ResourceInformation.MEMORY_MB);
+    resources[VCORES_INDEX] = ResourceInformation
+        .newInstance(ResourceInformation.VCORES);
     this.setMemorySize(p.getMemory());
     this.setVirtualCores(p.getVirtualCores());
+
+    // Update missing resource information on respective index.
+    updateResourceInformationMap(types);
+  }
+
+  private void updateResourceInformationMap(ResourceInformation[] types) {
+    for (int i = 0; i < types.length; i++) {
+      if (resources[i] == null) {
+        resources[i] = ResourceInformation.newInstance(types[i]);
+      }
+    }
+  }
+
+  private static ResourceInformation newDefaultInformation(
+      ResourceInformation resourceInformation, ResourceInformationProto entry) {
+    ResourceInformation ri = new ResourceInformation();
+    ri.setName(resourceInformation.getName());
+    ri.setMinimumAllocation(resourceInformation.getMinimumAllocation());
+    ri.setMaximumAllocation(resourceInformation.getMaximumAllocation());
+    ri.setResourceType(entry.hasType()
+        ? ProtoUtils.convertFromProtoFormat(entry.getType())
+        : ResourceTypes.COUNTABLE);
+    ri.setUnits(
+        entry.hasUnits() ? entry.getUnits() : resourceInformation.getUnits());
+    ri.setValue(entry.hasValue() ? entry.getValue() : 0L);
+    return ri;
   }
 
   @Override
@@ -166,10 +188,8 @@
       throw new IllegalArgumentException(
           "resource and/or resourceInformation cannot be null");
     }
-    if (!resource.equals(resourceInformation.getName())) {
-      resourceInformation.setName(resource);
-    }
-    ResourceInformation storedResourceInfo = getResourceInformation(resource);
+    ResourceInformation storedResourceInfo = super.getResourceInformation(
+        resource);
     ResourceInformation.copy(resourceInformation, storedResourceInfo);
   }
 
@@ -195,25 +215,9 @@
     return super.getResourceValue(resource);
   }
 
-  private void initResourcesMap() {
-    if (resources == null) {
-      ResourceInformation[] types = ResourceUtils.getResourceTypesArray();
-      if (types == null) {
-        throw new YarnRuntimeException(
-            "Got null return value from ResourceUtils.getResourceTypes()");
-      }
-
-      resources = new ResourceInformation[types.length];
-      for (ResourceInformation entry : types) {
-        int index = ResourceUtils.getResourceTypeIndex().get(entry.getName());
-        resources[index] = ResourceInformation.newInstance(entry);
-      }
-    }
-  }
-
   synchronized private void mergeLocalToBuilder() {
     builder.clearResourceValueMap();
-    if(resources != null && resources.length != 0) {
+    if (resources != null && resources.length != 0) {
       for (ResourceInformation resInfo : resources) {
         ResourceInformationProto.Builder e = ResourceInformationProto
             .newBuilder();
@@ -236,4 +240,4 @@
     proto = builder.build();
     viaProto = true;
   }
-}  
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 484faef..07b5f5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -56,7 +56,7 @@
   private Map<String, N> nodeNameToNodeMap = new HashMap<>();
   private Map<String, List<N>> nodesPerRack = new HashMap<>();
 
-  private Resource clusterCapacity = Resources.clone(Resources.none());
+  private Resource clusterCapacity = Resources.createResource(0, 0);
   private Resource staleClusterCapacity = null;
 
   // Max allocation