blob: 109ae47c055842e95bcc425645b714a26f91cf23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.packing.roundrobin;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.utils.topology.TopologyTests;
import org.apache.heron.packing.AssertPacking;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.heron.spi.utils.PackingTestUtils;
public class RoundRobinPackingTest {
private static final String BOLT_NAME = "bolt";
private static final String SPOUT_NAME = "spout";
private static final double DELTA = 0.1;
private TopologyAPI.Topology getTopology(
int spoutParallelism, int boltParallelism,
org.apache.heron.api.Config topologyConfig) {
return TopologyTests.createTopology("testTopology", topologyConfig, SPOUT_NAME, BOLT_NAME,
spoutParallelism, boltParallelism);
}
private PackingPlan getRoundRobinPackingPlan(TopologyAPI.Topology topology) {
Config config = PackingTestUtils.newTestConfig(topology);
RoundRobinPacking packing = new RoundRobinPacking();
packing.initialize(config, topology);
return packing.pack();
}
private PackingPlan getRoundRobinRePackingPlan(
TopologyAPI.Topology topology, Map<String, Integer> componentChanges) {
Config config = PackingTestUtils.newTestConfig(topology);
RoundRobinPacking packing = new RoundRobinPacking();
packing.initialize(config, topology);
PackingPlan pp = packing.pack();
return packing.repack(pp, componentChanges);
}
@Test(expected = PackingException.class)
public void testCheckInsufficientRamFailure() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set insufficient RAM for container
ByteAmount containerRam = ByteAmount.fromGigabytes(0);
topologyConfig.setContainerRamRequested(containerRam);
TopologyAPI.Topology topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
getRoundRobinPackingPlan(topology);
}
@Test(expected = PackingException.class)
public void testCheckInsufficientCpuFailure() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set insufficient CPU for container
double containerCpu = 1.0;
topologyConfig.setContainerCpuRequested(containerCpu);
TopologyAPI.Topology topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
getRoundRobinPackingPlan(topology);
}
@Test
public void testDefaultResources() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// No explicit resources required
TopologyAPI.Topology topologyNoExplicitResourcesConfig =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanNoExplicitResourcesConfig =
getRoundRobinPackingPlan(topologyNoExplicitResourcesConfig);
Assert.assertEquals(numContainers,
packingPlanNoExplicitResourcesConfig.getContainers().size());
Assert.assertEquals(totalInstances, packingPlanNoExplicitResourcesConfig.getInstanceCount());
}
/**
* Test the scenario container level resource config are set
*/
@Test
public void testContainerRequestedResources() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
double containerCpu = 30;
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setContainerDiskRequested(containerDisk);
topologyConfig.setContainerCpuRequested(containerCpu);
TopologyAPI.Topology topologyExplicitResourcesConfig =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitResourcesConfig =
getRoundRobinPackingPlan(topologyExplicitResourcesConfig);
Assert.assertEquals(numContainers,
packingPlanExplicitResourcesConfig.getContainers().size());
Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
for (PackingPlan.ContainerPlan containerPlan
: packingPlanExplicitResourcesConfig.getContainers()) {
Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
Assert.assertTrue(String.format(// due to round-off when using divide()
"expected: %s but was: %s", containerRam, containerPlan.getRequiredResource().getRam()),
Math.abs(
containerRam.minus(containerPlan.getRequiredResource().getRam()).asBytes()) <= 1);
Assert.assertEquals(containerDisk, containerPlan.getRequiredResource().getDisk());
// All instances' resource requirement should be equal
// So the size of set should be 1
Set<Resource> resources = new HashSet<>();
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
resources.add(instancePlan.getResource());
}
Assert.assertEquals(1, resources.size());
int instancesCount = containerPlan.getInstances().size();
Assert.assertEquals(containerRam
.minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
resources.iterator().next().getRam());
Assert.assertEquals(
(containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
resources.iterator().next().getCpu(), DELTA);
}
}
/**
* Test the scenario container level resource config are set
*/
@Test
public void testContainerRequestedResourcesWhenRamPaddingSet() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
ByteAmount containerRamPadding = ByteAmount.fromMegabytes(512);
double containerCpu = 30;
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setContainerDiskRequested(containerDisk);
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setContainerRamPadding(containerRamPadding);
TopologyAPI.Topology topologyExplicitResourcesConfig =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitResourcesConfig =
getRoundRobinPackingPlan(topologyExplicitResourcesConfig);
Assert.assertEquals(numContainers,
packingPlanExplicitResourcesConfig.getContainers().size());
Assert.assertEquals(totalInstances, packingPlanExplicitResourcesConfig.getInstanceCount());
for (PackingPlan.ContainerPlan containerPlan
: packingPlanExplicitResourcesConfig.getContainers()) {
Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
Assert.assertTrue(String.format(// due to round-off when using divide()
"expected: %s but was: %s", containerRam, containerPlan.getRequiredResource().getRam()),
Math.abs(
containerRam.minus(containerPlan.getRequiredResource().getRam()).asBytes()) <= 1);
Assert.assertEquals(containerDisk, containerPlan.getRequiredResource().getDisk());
// All instances' resource requirement should be equal
// So the size of set should be 1
Set<Resource> resources = new HashSet<>();
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
resources.add(instancePlan.getResource());
}
Assert.assertEquals(1, resources.size());
int instancesCount = containerPlan.getInstances().size();
Assert.assertEquals(containerRam
.minus(containerRamPadding).divide(instancesCount),
resources.iterator().next().getRam());
Assert.assertEquals(
(containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) / instancesCount,
resources.iterator().next().getCpu(), DELTA);
}
}
/**
* Test the scenario RAM map config is completely set
*/
@Test
public void testCompleteRamMapRequestedWithExactlyEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromGigabytes(8);
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
TopologyAPI.Topology topologyExplicitRamMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitRamMap =
getRoundRobinPackingPlan(topologyExplicitRamMap);
AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
}
/**
* Test the scenario RAM map config is completely set
*/
@Test(expected = PackingException.class)
public void testCompleteRamMapRequestedWithLessThanEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromGigabytes(2);
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
TopologyAPI.Topology topologyExplicitRamMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
getRoundRobinPackingPlan(topologyExplicitRamMap);
}
/**
* Test the scenario RAM map config is completely set
*/
@Test
public void testCompleteRamMapRequestedWithMoreThanEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromBytes(Long.MAX_VALUE);
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
TopologyAPI.Topology topologyExplicitRamMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitRamMap =
getRoundRobinPackingPlan(topologyExplicitRamMap);
AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, containerRam);
Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
}
/**
* Test the scenario RAM map config is completely set
*/
@Test
public void testCompleteRamMapRequestedWithoutPaddingResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
// the value should be ignored, since we set the complete component RAM map
ByteAmount containerRam = ByteAmount.fromGigabytes(6);
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
ByteAmount spoutRam = ByteAmount.fromGigabytes(2);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
TopologyAPI.Topology topologyExplicitRamMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitRamMap =
getRoundRobinPackingPlan(topologyExplicitRamMap);
AssertPacking.assertContainers(packingPlanExplicitRamMap.getContainers(),
BOLT_NAME, SPOUT_NAME, boltRam, spoutRam, null);
Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
}
/**
* Test the scenario CPU map config is completely set and there are exactly enough resource
*/
@Test
public void testCompleteCpuMapRequestedWithExactlyEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
double containerCpu = 17;
// Explicit set component CPU map
double boltCpu = 4;
double spoutCpu = 4;
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
TopologyAPI.Topology topologyExplicitCpuMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitCpuMap =
getRoundRobinPackingPlan(topologyExplicitCpuMap);
AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
}
/**
* Test the scenario CPU map config is completely set and there are more than enough resource
*/
@Test
public void testCompleteCpuMapRequestedWithMoreThanEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
double containerCpu = 30;
// Explicit set component CPU map
double boltCpu = 4;
double spoutCpu = 4;
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
TopologyAPI.Topology topologyExplicitCpuMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitCpuMap =
getRoundRobinPackingPlan(topologyExplicitCpuMap);
AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
}
/**
* Test the scenario CPU map config is completely set and there are less than enough resource
*/
@Test(expected = PackingException.class)
public void testCompleteCpuMapRequestedWithLessThanEnoughResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
double containerCpu = 10;
// Explicit set component CPU map
double boltCpu = 4;
double spoutCpu = 4;
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
TopologyAPI.Topology topologyExplicitCpuMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitCpuMap = getRoundRobinPackingPlan(topologyExplicitCpuMap);
AssertPacking.assertContainers(packingPlanExplicitCpuMap.getContainers(),
BOLT_NAME, SPOUT_NAME, boltCpu, spoutCpu, null);
Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
}
/**
* Test the scenario CPU map config is completely set
* and there are exactly enough resource for instances, but not enough for padding
*/
@Test
public void testCompleteCpuMapRequestedWithoutPaddingResource() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
double containerCpu = 16;
// Explicit set component CPU map
double boltCpu = 4;
double spoutCpu = 4;
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
topologyConfig.setComponentCpu(SPOUT_NAME, spoutCpu);
TopologyAPI.Topology topologyExplicitCpuMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
getRoundRobinPackingPlan(topologyExplicitCpuMap);
}
/**
* Test the scenario RAM map config is partially set
*/
@Test
public void testPartialRamMap() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
ByteAmount containerRam = ByteAmount.fromGigabytes(10);
// Explicit set component RAM map
ByteAmount boltRam = ByteAmount.fromGigabytes(1);
topologyConfig.setContainerRamRequested(containerRam);
topologyConfig.setComponentRam(BOLT_NAME, boltRam);
TopologyAPI.Topology topologyExplicitRamMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitRamMap =
getRoundRobinPackingPlan(topologyExplicitRamMap);
Assert.assertEquals(totalInstances, packingPlanExplicitRamMap.getInstanceCount());
// RAM for bolt should be the value in component RAM map
for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitRamMap.getContainers()) {
Assert.assertEquals(containerRam, containerPlan.getRequiredResource().getRam());
int boltCount = 0;
int instancesCount = containerPlan.getInstances().size();
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
if (instancePlan.getComponentName().equals(BOLT_NAME)) {
Assert.assertEquals(boltRam, instancePlan.getResource().getRam());
boltCount++;
}
}
// Ram for spout should be:
// (containerRam - all RAM for bolt - RAM for padding) / (# of spouts)
int spoutCount = instancesCount - boltCount;
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
if (instancePlan.getComponentName().equals(SPOUT_NAME)) {
Assert.assertEquals(
containerRam
.minus(boltRam.multiply(boltCount))
.minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER)
.divide(spoutCount),
instancePlan.getResource().getRam());
}
}
}
}
/**
* Test the scenario CPU map config is partially set
*/
@Test
public void testPartialCpuMap() throws Exception {
int numContainers = 2;
int spoutParallelism = 4;
int boltParallelism = 3;
Integer totalInstances = spoutParallelism + boltParallelism;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
// Explicit set resources for container
double containerCpu = 17;
// Explicit set component CPU map
double boltCpu = 4;
topologyConfig.setContainerCpuRequested(containerCpu);
topologyConfig.setComponentCpu(BOLT_NAME, boltCpu);
TopologyAPI.Topology topologyExplicitCpuMap =
getTopology(spoutParallelism, boltParallelism, topologyConfig);
PackingPlan packingPlanExplicitCpuMap =
getRoundRobinPackingPlan(topologyExplicitCpuMap);
Assert.assertEquals(totalInstances, packingPlanExplicitCpuMap.getInstanceCount());
// CPU for bolt should be the value in component CPU map
for (PackingPlan.ContainerPlan containerPlan : packingPlanExplicitCpuMap.getContainers()) {
Assert.assertEquals(containerCpu, containerPlan.getRequiredResource().getCpu(), DELTA);
int boltCount = 0;
int instancesCount = containerPlan.getInstances().size();
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
if (instancePlan.getComponentName().equals(BOLT_NAME)) {
Assert.assertEquals(boltCpu, instancePlan.getResource().getCpu(), DELTA);
boltCount++;
}
}
// CPU for spout should be:
// (containerCpu - all CPU for bolt - CPU for padding) / (# of spouts)
int spoutCount = instancesCount - boltCount;
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
if (instancePlan.getComponentName().equals(SPOUT_NAME)) {
Assert.assertEquals(
(containerCpu
- boltCpu * boltCount
- RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER)
/ spoutCount,
instancePlan.getResource().getCpu(), DELTA);
}
}
}
}
/**
* test even packing of instances
*/
@Test
public void testEvenPacking() throws Exception {
int numContainers = 2;
int componentParallelism = 4;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
TopologyAPI.Topology topology =
getTopology(componentParallelism, componentParallelism, topologyConfig);
int numInstance = TopologyUtils.getTotalInstance(topology);
// Two components
Assert.assertEquals(2 * componentParallelism, numInstance);
PackingPlan output = getRoundRobinPackingPlan(topology);
Assert.assertEquals(numContainers, output.getContainers().size());
Assert.assertEquals((Integer) numInstance, output.getInstanceCount());
for (PackingPlan.ContainerPlan container : output.getContainers()) {
Assert.assertEquals(numInstance / numContainers, container.getInstances().size());
// Verify each container got 2 spout and 2 bolt and container 1 got
assertComponentCount(container, "spout", 2);
assertComponentCount(container, "bolt", 2);
}
}
/**
* test re-packing with same total instances
*/
@Test
public void testRepackingWithSameTotalInstances() throws Exception {
int numContainers = 2;
int componentParallelism = 4;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
TopologyAPI.Topology topology =
getTopology(componentParallelism, componentParallelism, topologyConfig);
int numInstance = TopologyUtils.getTotalInstance(topology);
// Two components
Assert.assertEquals(2 * componentParallelism, numInstance);
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, -1);
componentChanges.put(BOLT_NAME, +1);
PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
Assert.assertEquals(numContainers, output.getContainers().size());
Assert.assertEquals((Integer) numInstance, output.getInstanceCount());
int spoutCount = 0;
int boltCount = 0;
for (PackingPlan.ContainerPlan container : output.getContainers()) {
Assert.assertEquals(numInstance / numContainers, container.getInstances().size());
for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
spoutCount++;
} else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
boltCount++;
}
}
}
Assert.assertEquals(componentParallelism - 1, spoutCount);
Assert.assertEquals(componentParallelism + 1, boltCount);
}
/**
* test re-packing with more total instances
*/
@Test
public void testRepackingWithMoreTotalInstances() throws Exception {
int numContainers = 2;
int componentParallelism = 4;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
TopologyAPI.Topology topology =
getTopology(componentParallelism, componentParallelism, topologyConfig);
int numInstance = TopologyUtils.getTotalInstance(topology);
// Two components
Assert.assertEquals(2 * componentParallelism, numInstance);
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, +1);
componentChanges.put(BOLT_NAME, +1);
PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
Assert.assertEquals(numContainers + 1, output.getContainers().size());
Assert.assertEquals((Integer) (numInstance + 2), output.getInstanceCount());
int spoutCount = 0;
int boltCount = 0;
for (PackingPlan.ContainerPlan container : output.getContainers()) {
Assert.assertTrue((double) container.getInstances().size()
<= (double) numInstance / numContainers);
for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
spoutCount++;
} else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
boltCount++;
}
}
}
Assert.assertEquals(componentParallelism + 1, spoutCount);
Assert.assertEquals(componentParallelism + 1, boltCount);
}
/**
* test re-packing with fewer total instances
*/
@Test
public void testRepackingWithFewerTotalInstances() throws Exception {
int numContainers = 2;
int componentParallelism = 4;
// Set up the topology and its config
org.apache.heron.api.Config topologyConfig = new org.apache.heron.api.Config();
topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, numContainers);
TopologyAPI.Topology topology =
getTopology(componentParallelism, componentParallelism, topologyConfig);
int numInstance = TopologyUtils.getTotalInstance(topology);
// Two components
Assert.assertEquals(2 * componentParallelism, numInstance);
Map<String, Integer> componentChanges = new HashMap<>();
componentChanges.put(SPOUT_NAME, -2);
componentChanges.put(BOLT_NAME, -2);
PackingPlan output = getRoundRobinRePackingPlan(topology, componentChanges);
Assert.assertEquals(numContainers - 1, output.getContainers().size());
Assert.assertEquals((Integer) (numInstance - 4), output.getInstanceCount());
int spoutCount = 0;
int boltCount = 0;
for (PackingPlan.ContainerPlan container : output.getContainers()) {
Assert.assertTrue((double) container.getInstances().size()
<= (double) numInstance / numContainers);
for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
spoutCount++;
} else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
boltCount++;
}
}
}
Assert.assertEquals(componentParallelism - 2, spoutCount);
Assert.assertEquals(componentParallelism - 2, boltCount);
}
private static void assertComponentCount(
PackingPlan.ContainerPlan containerPlan, String componentName, int expectedCount) {
int count = 0;
for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
if (componentName.equals(instancePlan.getComponentName())) {
count++;
}
}
Assert.assertEquals(expectedCount, count);
}
}