Make RoundRobin packing algo more forgivable when container resource is not set. Don't check if container resource is not set (#3273)

diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 3f87bf7..0b3f4c8 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -149,13 +149,17 @@
         getRoundRobinAllocation(numContainer, parallelismMap);
 
     Resource containerResourceHint = getContainerResourceHint(roundRobinAllocation);
+    int largestContainerSize = getLargestContainerSize(roundRobinAllocation);
 
     // Get the RAM map for every instance
+    ByteAmount containerRamDefault =
+        instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding);
     Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
         calculateInstancesResourceMapInContainer(
         roundRobinAllocation,
         TopologyUtils.getComponentRamMapConfig(topology),
         containerResourceHint.getRam(),
+        containerRamDefault,
         instanceRamDefault,
         containerRamPadding,
         ByteAmount.ZERO,
@@ -163,11 +167,14 @@
         RAM);
 
     // Get the CPU map for every instance
+    float containerCPUDefault =
+        Math.round(instanceCpuDefault * largestContainerSize + containerCpuPadding);
     Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
         calculateInstancesResourceMapInContainer(
         roundRobinAllocation,
         CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
         CPUShare.fromDouble(containerResourceHint.getCpu()),
+        CPUShare.fromDouble(containerCPUDefault),
         CPUShare.fromDouble(instanceCpuDefault),
         CPUShare.fromDouble(containerCpuPadding),
         CPUShare.fromDouble(0.0),
@@ -205,22 +212,26 @@
       }
 
       // finalize container resource
+      containerCpu += containerCpuPadding;
+      if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
+        containerCpu = Math.min(containerCpu, containerResourceHint.getCpu());
+      }
+
+      containerRam = containerRam.plus(containerRamPadding);
       if (!containerResourceHint.getRam().equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
         containerRam = ByteAmount.fromBytes(
-            Math.min(containerRam.plus(containerRamPadding).asBytes(),
-                containerResourceHint.getRam().asBytes()));
-      } else {
-        containerRam = containerRam.plus(containerRamPadding);
+            Math.min(containerRam.asBytes(), containerResourceHint.getRam().asBytes()));
       }
 
-      if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
-        containerCpu = Math.min(containerCpu + containerCpuPadding, containerResourceHint.getCpu());
-      } else {
-        containerCpu += containerCpuPadding;
+      ByteAmount containerDisk = containerResourceHint.getDisk();
+      if (containerDisk.equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+        containerDisk = instanceDiskDefault
+            .multiply(largestContainerSize).plus(DEFAULT_DISK_PADDING_PER_CONTAINER);
       }
 
-      Resource resource = new Resource(Math.max(containerCpu, containerResourceHint.getCpu()),
-          containerRam, containerResourceHint.getDisk());
+      Resource resource = new Resource(
+          Math.max(containerCpu, containerResourceHint.getCpu()),
+          containerRam, containerDisk);
       PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
           containerId, new HashSet<>(instancePlanMap.values()), resource);
 
@@ -275,6 +286,7 @@
                 Map<Integer, List<InstanceId>> allocation,
                 Map<String, T> resMap,
                 T containerResHint,
+                T defaultContainerRes,
                 T instanceResDefault,
                 T containerResPadding,
                 T zero,
@@ -323,28 +335,30 @@
       }
 
       // calculate resource for the remaining unspecified instances if any
+      T containerRes = containerResHint;
+      if (containerResHint.equals(notSpecified)) {
+        containerRes = defaultContainerRes;
+      }
+
       if (!unspecifiedInstances.isEmpty()) {
         T individualInstanceRes = instanceResDefault;
 
-        // If container resource is specified
-        if (!containerResHint.equals(notSpecified)) {
-          // discount resource for heron internal process (padding) and used (usedRes)
-          T remainingRes;
-          if (paddingThrottling) {
-            remainingRes = (T) containerResHint.minus(usedRes);
-          } else {
-            remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
-          }
-
-          if (remainingRes.lessOrEqual(zero)) {
-            throw new PackingException(String.format("Invalid packing plan generated. "
-                + "No enough %s to allocate for unspecified instances", resourceType));
-          }
-
-          // Split remaining resource evenly
-          individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
+        // discount resource for heron internal process (padding) and used (usedRes)
+        T remainingRes;
+        if (paddingThrottling) {
+          remainingRes = (T) containerRes.minus(usedRes);
+        } else {
+          remainingRes = (T) containerRes.minus(containerResPadding).minus(usedRes);
         }
 
+        if (remainingRes.lessOrEqual(zero)) {
+          throw new PackingException(String.format("Invalid packing plan generated. "
+              + "No enough %s to allocate for unspecified instances", resourceType));
+        }
+
+        // Split remaining resource evenly
+        individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
+
         // Put the results in resInsideContainer
         for (InstanceId instanceId : unspecifiedInstances) {
           resInsideContainer.put(instanceId, individualInstanceRes);
@@ -434,12 +448,11 @@
 
     return new Resource(
         TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
-            (double) Math.round(instanceCpuDefault * largestContainerSize + containerCpuPadding)),
+            NOT_SPECIFIED_CPU_SHARE),
         TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
-            instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding)),
+            NOT_SPECIFIED_BYTE_AMOUNT),
         TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
-            instanceDiskDefault.multiply(largestContainerSize)
-                .plus(DEFAULT_DISK_PADDING_PER_CONTAINER)));
+            NOT_SPECIFIED_BYTE_AMOUNT));
   }
 
   /**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 5c3d3ac..a642a60 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -390,6 +390,160 @@
             numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
   }
 
+  @Test
+  public void testFullRamMapWithoutContainerRequestedResources() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container resource is 6G
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    ByteAmount spoutRam = ByteAmount.fromMegabytes(500);
+    ByteAmount boltRam = ByteAmount.fromMegabytes(1000);
+    Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+    // Don't set container RAM
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.of(boltRam), Optional.empty(), boltParallelism,
+        Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      // Bolt and spout ram sizes are both fixed.
+      Assert.assertEquals(2, differentResources.size());
+    }
+  }
+
+  @Test
+  public void testNoRamMapWithoutContainerRequestedResources() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container resource is 6G
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+    // Container RAM is not set in config
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.empty(), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      Assert.assertEquals(1, differentResources.size());
+      int instancesCount = containerPlan.getInstances().size();
+      Assert.assertEquals(containerRam
+          .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+          differentResources.iterator().next().getRam());
+
+      Assert.assertEquals(
+          (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+          differentResources.iterator().next().getCpu(), DELTA);
+    }
+  }
+
+  @Test
+  public void testPartialRamMapWithoutContainerRequestedResources() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container resource is 6G
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+    // Don't set container RAM
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.of(boltRam), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      int instancesCount = containerPlan.getInstances().size();
+      if (instancesCount == 4) {
+        // Biggest container
+        Assert.assertEquals(1, differentResources.size());
+      } else {
+        // Smaller container
+        Assert.assertEquals(2, differentResources.size());
+      }
+    }
+  }
+
+  // Throw an error if default container resource (default instance resource * number of instances
+  // + padding) is not enough.
+  @Test(expected = PackingException.class)
+  public void testHugePartialRamMapWithoutContainerRequestedResources() throws Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+    ByteAmount boltRam = ByteAmount.fromGigabytes(10);
+
+    // Don't set container RAM
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.of(boltRam), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      Assert.assertEquals(1, differentResources.size());
+      int instancesCount = containerPlan.getInstances().size();
+      Assert.assertEquals(containerRam
+          .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+          differentResources.iterator().next().getRam());
+
+      Assert.assertEquals(
+          (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+          differentResources.iterator().next().getCpu(), DELTA);
+    }
+  }
+
   /**
    * Test the scenario RAM map config is partially set
    */