Make RoundRobin packing algo more forgivable when container resource is not set. Don't check if container resource is not set (#3273)
diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 3f87bf7..0b3f4c8 100644
--- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -149,13 +149,17 @@
getRoundRobinAllocation(numContainer, parallelismMap);
Resource containerResourceHint = getContainerResourceHint(roundRobinAllocation);
+ int largestContainerSize = getLargestContainerSize(roundRobinAllocation);
// Get the RAM map for every instance
+ ByteAmount containerRamDefault =
+ instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding);
Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
calculateInstancesResourceMapInContainer(
roundRobinAllocation,
TopologyUtils.getComponentRamMapConfig(topology),
containerResourceHint.getRam(),
+ containerRamDefault,
instanceRamDefault,
containerRamPadding,
ByteAmount.ZERO,
@@ -163,11 +167,14 @@
RAM);
// Get the CPU map for every instance
+ float containerCPUDefault =
+ Math.round(instanceCpuDefault * largestContainerSize + containerCpuPadding);
Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
calculateInstancesResourceMapInContainer(
roundRobinAllocation,
CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
CPUShare.fromDouble(containerResourceHint.getCpu()),
+ CPUShare.fromDouble(containerCPUDefault),
CPUShare.fromDouble(instanceCpuDefault),
CPUShare.fromDouble(containerCpuPadding),
CPUShare.fromDouble(0.0),
@@ -205,22 +212,26 @@
}
// finalize container resource
+ containerCpu += containerCpuPadding;
+ if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
+ containerCpu = Math.min(containerCpu, containerResourceHint.getCpu());
+ }
+
+ containerRam = containerRam.plus(containerRamPadding);
if (!containerResourceHint.getRam().equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
containerRam = ByteAmount.fromBytes(
- Math.min(containerRam.plus(containerRamPadding).asBytes(),
- containerResourceHint.getRam().asBytes()));
- } else {
- containerRam = containerRam.plus(containerRamPadding);
+ Math.min(containerRam.asBytes(), containerResourceHint.getRam().asBytes()));
}
- if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
- containerCpu = Math.min(containerCpu + containerCpuPadding, containerResourceHint.getCpu());
- } else {
- containerCpu += containerCpuPadding;
+ ByteAmount containerDisk = containerResourceHint.getDisk();
+ if (containerDisk.equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+ containerDisk = instanceDiskDefault
+ .multiply(largestContainerSize).plus(DEFAULT_DISK_PADDING_PER_CONTAINER);
}
- Resource resource = new Resource(Math.max(containerCpu, containerResourceHint.getCpu()),
- containerRam, containerResourceHint.getDisk());
+ Resource resource = new Resource(
+ Math.max(containerCpu, containerResourceHint.getCpu()),
+ containerRam, containerDisk);
PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
containerId, new HashSet<>(instancePlanMap.values()), resource);
@@ -275,6 +286,7 @@
Map<Integer, List<InstanceId>> allocation,
Map<String, T> resMap,
T containerResHint,
+ T defaultContainerRes,
T instanceResDefault,
T containerResPadding,
T zero,
@@ -323,28 +335,30 @@
}
// calculate resource for the remaining unspecified instances if any
+ T containerRes = containerResHint;
+ if (containerResHint.equals(notSpecified)) {
+ containerRes = defaultContainerRes;
+ }
+
if (!unspecifiedInstances.isEmpty()) {
T individualInstanceRes = instanceResDefault;
- // If container resource is specified
- if (!containerResHint.equals(notSpecified)) {
- // discount resource for heron internal process (padding) and used (usedRes)
- T remainingRes;
- if (paddingThrottling) {
- remainingRes = (T) containerResHint.minus(usedRes);
- } else {
- remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);
- }
-
- if (remainingRes.lessOrEqual(zero)) {
- throw new PackingException(String.format("Invalid packing plan generated. "
- + "No enough %s to allocate for unspecified instances", resourceType));
- }
-
- // Split remaining resource evenly
- individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
+ // discount resource for heron internal process (padding) and used (usedRes)
+ T remainingRes;
+ if (paddingThrottling) {
+ remainingRes = (T) containerRes.minus(usedRes);
+ } else {
+ remainingRes = (T) containerRes.minus(containerResPadding).minus(usedRes);
}
+ if (remainingRes.lessOrEqual(zero)) {
+ throw new PackingException(String.format("Invalid packing plan generated. "
+ + "No enough %s to allocate for unspecified instances", resourceType));
+ }
+
+ // Split remaining resource evenly
+ individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
+
// Put the results in resInsideContainer
for (InstanceId instanceId : unspecifiedInstances) {
resInsideContainer.put(instanceId, individualInstanceRes);
@@ -434,12 +448,11 @@
return new Resource(
TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
- (double) Math.round(instanceCpuDefault * largestContainerSize + containerCpuPadding)),
+ NOT_SPECIFIED_CPU_SHARE),
TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
- instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding)),
+ NOT_SPECIFIED_BYTE_AMOUNT),
TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
- instanceDiskDefault.multiply(largestContainerSize)
- .plus(DEFAULT_DISK_PADDING_PER_CONTAINER)));
+ NOT_SPECIFIED_BYTE_AMOUNT));
}
/**
diff --git a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 5c3d3ac..a642a60 100644
--- a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -390,6 +390,160 @@
numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
}
+ @Test
+ public void testFullRamMapWithoutContainerRequestedResources() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container resource is 6G
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ ByteAmount spoutRam = ByteAmount.fromMegabytes(500);
+ ByteAmount boltRam = ByteAmount.fromMegabytes(1000);
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ // Don't set container RAM
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(), containerResource);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ // All instances' resource requirement should be equal
+ // So the size of set should be 1
+ Set<Resource> differentResources = new HashSet<>();
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ differentResources.add(instancePlan.getResource());
+ }
+
+ // Bolt and spout ram sizes are both fixed.
+ Assert.assertEquals(2, differentResources.size());
+ }
+ }
+
+ @Test
+ public void testNoRamMapWithoutContainerRequestedResources() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container resource is 6G
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ // Container RAM is not set in config
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.empty(), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(), containerResource);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ // All instances' resource requirement should be equal
+ // So the size of set should be 1
+ Set<Resource> differentResources = new HashSet<>();
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ differentResources.add(instancePlan.getResource());
+ }
+
+ Assert.assertEquals(1, differentResources.size());
+ int instancesCount = containerPlan.getInstances().size();
+ Assert.assertEquals(containerRam
+ .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+ differentResources.iterator().next().getRam());
+
+ Assert.assertEquals(
+ (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+ differentResources.iterator().next().getCpu(), DELTA);
+ }
+ }
+
+ @Test
+ public void testPartialRamMapWithoutContainerRequestedResources() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container resource is 6G
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+
+ // Don't set container RAM
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(), containerResource);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ // All instances' resource requirement should be equal
+ // So the size of set should be 1
+ Set<Resource> differentResources = new HashSet<>();
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ differentResources.add(instancePlan.getResource());
+ }
+
+ int instancesCount = containerPlan.getInstances().size();
+ if (instancesCount == 4) {
+ // Biggest container
+ Assert.assertEquals(1, differentResources.size());
+ } else {
+ // Smaller container
+ Assert.assertEquals(2, differentResources.size());
+ }
+ }
+ }
+
+ // Throw an error if default container resource (default instance resource * number of instances
+ // + padding) is not enough.
+ @Test(expected = PackingException.class)
+ public void testHugePartialRamMapWithoutContainerRequestedResources() throws Exception {
+ // Explicit set resources for container
+ ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+ ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+ double containerCpu = 30;
+ Resource containerResource = new Resource(containerCpu, containerRam, containerDisk);
+ ByteAmount boltRam = ByteAmount.fromGigabytes(10);
+
+ // Don't set container RAM
+ topologyConfig.setContainerDiskRequested(containerDisk);
+ topologyConfig.setContainerCpuRequested(containerCpu);
+ topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+ topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+ PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+ Optional.of(boltRam), Optional.empty(), boltParallelism,
+ Optional.empty(), Optional.empty(), spoutParallelism,
+ numContainers, getDefaultPadding(), containerResource);
+
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ // All instances' resource requirement should be equal
+ // So the size of set should be 1
+ Set<Resource> differentResources = new HashSet<>();
+ for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
+ differentResources.add(instancePlan.getResource());
+ }
+
+ Assert.assertEquals(1, differentResources.size());
+ int instancesCount = containerPlan.getInstances().size();
+ Assert.assertEquals(containerRam
+ .minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+ differentResources.iterator().next().getRam());
+
+ Assert.assertEquals(
+ (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
+ differentResources.iterator().next().getCpu(), DELTA);
+ }
+ }
+
/**
* Test the scenario RAM map config is partially set
*/