blob: 1dcb900ec634a46cb9a3b488ff9b5028ed80f29e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.packing.roundrobin;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.packing.builder.Container;
import org.apache.heron.packing.builder.ContainerIdScorer;
import org.apache.heron.packing.builder.HomogeneityScorer;
import org.apache.heron.packing.builder.InstanceCountScorer;
import org.apache.heron.packing.builder.PackingPlanBuilder;
import org.apache.heron.packing.builder.Scorer;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
import org.apache.heron.spi.packing.IRepacking;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED;
import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED;
import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_PADDING_PERCENTAGE;
import static org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED;
/**
* ResourceCompliantRoundRobin packing algorithm
* <p>
* This IPacking implementation generates a PackingPlan using a round robin algorithm.
* <p>
* Following semantics are guaranteed:
* 1. Supports heterogeneous containers.
* The user provides the number of containers to use as well as
* the maximum container size and a padding percentage.
* The padding percentage whose values range from [0, 100], determines the per container
* resources allocated for system-related processes (e.g., the stream manager).
* <p>
* 2. The user provides the maximum CPU, RAM and Disk that can be used by each container through
* the org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED,
* org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED,
* org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED parameters.
* If the parameters are not specified then a default value is used for the maximum container
* size.
* <p>
* 3. The user provides a percentage of each container size that will be used for padding
* through the org.apache.heron.api.Config.TOPOLOGY_CONTAINER_PADDING_PERCENTAGE
* If the parameter is not specified then a default value of 10 is used (10% of the container size)
* <p>
* 4. The RAM required for one instance is calculated as:
* value in org.apache.heron.api.Config.TOPOLOGY_COMPONENT_RAMMAP if exists, otherwise,
* the default RAM value for one instance.
* <p>
* 5. The CPU required for one instance is calculated as the default CPU value for one instance.
* <p>
* 6. The disk required for one instance is calculated as the default disk value for one instance.
* <p>
* 7. The RAM required for a container is calculated as:
* (RAM for instances in container) + (paddingPercentage * RAM for instances in container)
* <p>
* 8. The CPU required for a container is calculated as:
* (CPU for instances in container) + (paddingPercentage * CPU for instances in container)
* <p>
* 9. The disk required for a container is calculated as:
* (disk for instances in container) + ((paddingPercentage * disk for instances in container)
* <p>
* 10. The pack() return null if PackingPlan fails to pass the safe check, for instance,
* the size of RAM for an instance is less than the minimal required value.
*/
public class ResourceCompliantRRPacking implements IPacking, IRepacking {
static final int DEFAULT_CONTAINER_PADDING_PERCENTAGE = 10;
private static final int DEFAULT_NUMBER_INSTANCES_PER_CONTAINER = 4;
private static final Logger LOG = Logger.getLogger(ResourceCompliantRRPacking.class.getName());
private TopologyAPI.Topology topology;
private Resource defaultInstanceResources;
private int numContainers;
//ContainerId to examine next. It is set to 1 when the
//algorithm restarts with a new number of containers
private int containerId;
private void increaseNumContainers(int additionalContainers) {
this.numContainers += additionalContainers;
}
private void resetToFirstContainer() {
this.containerId = 1;
}
private int nextContainerId(int afterId) {
return (afterId == numContainers) ? 1 : afterId + 1;
}
@Override
public void initialize(Config config, TopologyAPI.Topology inputTopology) {
this.topology = inputTopology;
this.numContainers = TopologyUtils.getNumContainers(topology);
this.defaultInstanceResources = new Resource(
Context.instanceCpu(config),
Context.instanceRam(config),
Context.instanceDisk(config));
resetToFirstContainer();
LOG.info(String.format("Initializing ResourceCompliantRRPacking. "
+ "CPU default: %f, RAM default: %s, DISK default: %s.",
this.defaultInstanceResources.getCpu(),
this.defaultInstanceResources.getRam().toString(),
this.defaultInstanceResources.getDisk().toString()));
}
private PackingPlanBuilder newPackingPlanBuilder(PackingPlan existingPackingPlan) {
List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
double defaultCpu = this.defaultInstanceResources.getCpu()
* DEFAULT_NUMBER_INSTANCES_PER_CONTAINER;
ByteAmount defaultRam = this.defaultInstanceResources.getRam()
.multiply(DEFAULT_NUMBER_INSTANCES_PER_CONTAINER);
ByteAmount defaultDisk = this.defaultInstanceResources.getDisk()
.multiply(DEFAULT_NUMBER_INSTANCES_PER_CONTAINER);
int paddingPercentage = TopologyUtils.getConfigWithDefault(topologyConfig,
TOPOLOGY_CONTAINER_PADDING_PERCENTAGE, DEFAULT_CONTAINER_PADDING_PERCENTAGE);
Resource maxContainerResources = new Resource(
TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_CPU_REQUESTED,
(double) Math.round(PackingUtils.increaseBy(defaultCpu, paddingPercentage))),
TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_RAM_REQUESTED,
defaultRam.increaseBy(paddingPercentage)),
TopologyUtils.getConfigWithDefault(topologyConfig, TOPOLOGY_CONTAINER_DISK_REQUESTED,
defaultDisk.increaseBy(paddingPercentage)));
LOG.info(String.format("ResourceCompliantRRPacking newPackingPlanBuilder. "
+ "CPU max: %f, RAMmaxMax: %s, DISK max: %s, Padding percentage: %d.",
maxContainerResources.getCpu(),
maxContainerResources.getRam().toString(),
maxContainerResources.getDisk().toString(),
paddingPercentage));
return new PackingPlanBuilder(topology.getId(), existingPackingPlan)
.setMaxContainerResource(maxContainerResources)
.setDefaultInstanceResource(defaultInstanceResources)
.setRequestedContainerPadding(paddingPercentage)
.setRequestedComponentRam(TopologyUtils.getComponentRamMapConfig(topology));
}
@Override
public PackingPlan pack() {
while (true) {
try {
PackingPlanBuilder planBuilder = newPackingPlanBuilder(null);
planBuilder.updateNumContainers(numContainers);
planBuilder = getResourceCompliantRRAllocation(planBuilder);
return planBuilder.build();
} catch (ResourceExceededException e) {
//Not enough containers. Adjust the number of containers.
LOG.finest(String.format(
"%s Increasing the number of containers to %s and attempting to place again.",
e.getMessage(), this.numContainers + 1));
increaseNumContainers(1);
resetToFirstContainer();
}
}
}
/**
* Get a new packing plan given an existing packing plan and component-level changes.
*
* @return new packing plan
*/
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges) {
this.numContainers = currentPackingPlan.getContainers().size();
resetToFirstContainer();
int additionalContainers = computeNumAdditionalContainers(componentChanges, currentPackingPlan);
if (additionalContainers > 0) {
increaseNumContainers(additionalContainers);
LOG.info(String.format(
"Allocated %s additional containers for repack bring the number of containers to %s.",
additionalContainers, this.numContainers));
}
while (true) {
try {
PackingPlanBuilder planBuilder = newPackingPlanBuilder(currentPackingPlan);
planBuilder.updateNumContainers(numContainers);
planBuilder = getResourceCompliantRRAllocation(planBuilder, componentChanges);
return planBuilder.build();
} catch (ResourceExceededException e) {
//Not enough containers. Adjust the number of containers.
increaseNumContainers(1);
resetToFirstContainer();
LOG.info(String.format(
"%s Increasing the number of containers to %s and attempting packing again.",
e.getMessage(), this.numContainers));
}
}
}
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, int containers,
Map<String, Integer> componentChanges)
throws PackingException, UnsupportedOperationException {
throw new UnsupportedOperationException("ResourceCompliantRRPacking does not "
+ "currently support creating a new packing plan with a new number of containers.");
}
@Override
public void close() {
}
/**
* Computes the additional number of containers needed to accommodate a scale up/down operation
*
* @param componentChanges parallelism changes for scale up/down
* @param packingPlan existing packing plan
* @return additional number of containers needed
*/
private int computeNumAdditionalContainers(Map<String, Integer> componentChanges,
PackingPlan packingPlan) {
Resource scaleDownResource = PackingUtils.computeTotalResourceChange(topology, componentChanges,
defaultInstanceResources, PackingUtils.ScalingDirection.DOWN);
Resource scaleUpResource = PackingUtils.computeTotalResourceChange(topology, componentChanges,
defaultInstanceResources, PackingUtils.ScalingDirection.UP);
Resource additionalResource = scaleUpResource.subtractAbsolute(scaleDownResource);
return (int) additionalResource.divideBy(packingPlan.getMaxContainerResources());
}
private PackingPlanBuilder getResourceCompliantRRAllocation(
PackingPlanBuilder planBuilder) throws ResourceExceededException {
Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);
int totalInstances = TopologyUtils.getTotalInstance(topology);
if (numContainers > totalInstances) {
LOG.warning(String.format(
"More containers requested (%s) than total instances (%s). Reducing containers to %s",
numContainers, totalInstances, totalInstances));
numContainers = totalInstances;
planBuilder.updateNumContainers(numContainers);
}
assignInstancesToContainers(planBuilder, parallelismMap, PolicyType.STRICT);
return planBuilder;
}
/**
* Get the instances' allocation based on the ResourceCompliantRR packing algorithm
*
* @return Map &lt; containerId, list of InstanceId belonging to this container &gt;
*/
private PackingPlanBuilder getResourceCompliantRRAllocation(
PackingPlanBuilder planBuilder, Map<String, Integer> componentChanges)
throws ResourceExceededException {
Map<String, Integer> componentsToScaleDown =
PackingUtils.getComponentsToScale(componentChanges, PackingUtils.ScalingDirection.DOWN);
Map<String, Integer> componentsToScaleUp =
PackingUtils.getComponentsToScale(componentChanges, PackingUtils.ScalingDirection.UP);
if (!componentsToScaleDown.isEmpty()) {
resetToFirstContainer();
removeInstancesFromContainers(planBuilder, componentsToScaleDown);
}
if (!componentsToScaleUp.isEmpty()) {
resetToFirstContainer();
assignInstancesToContainers(planBuilder, componentsToScaleUp, PolicyType.FLEXIBLE);
}
return planBuilder;
}
/**
* Assigns instances to containers.
*
* @param planBuilder packing plan builder
* @param parallelismMap component parallelism
*/
private void assignInstancesToContainers(PackingPlanBuilder planBuilder,
Map<String, Integer> parallelismMap,
PolicyType policyType) throws ResourceExceededException {
for (String componentName : parallelismMap.keySet()) {
int numInstance = parallelismMap.get(componentName);
for (int i = 0; i < numInstance; ++i) {
policyType.assignInstance(planBuilder, componentName, this);
}
}
}
/**
* Attempts to place the instance the current containerId.
*
* @param planBuilder packing plan builder
* @param componentName the component name of the instance that needs to be placed in the container
* @throws ResourceExceededException if there is no room on the current container for the instance
*/
private void strictRRpolicy(PackingPlanBuilder planBuilder,
String componentName) throws ResourceExceededException {
planBuilder.addInstance(this.containerId, componentName);
this.containerId = nextContainerId(this.containerId);
}
/**
* Performs a RR placement. Tries to place the instance on any container with space, starting at
* containerId and cycling through the container set until it can be placed.
*
* @param planBuilder packing plan builder
* @param componentName the component name of the instance that needs to be placed in the container
* @throws ResourceExceededException if there is no room on any container to place the instance
*/
private void flexibleRRpolicy(PackingPlanBuilder planBuilder,
String componentName) throws ResourceExceededException {
// If there is not enough space on containerId look at other containers in a RR fashion
// starting from containerId.
ContainerIdScorer scorer = new ContainerIdScorer(this.containerId, this.numContainers);
this.containerId = nextContainerId(planBuilder.addInstance(scorer, componentName));
}
/**
* Removes instances from containers during scaling down
*
* @param packingPlanBuilder packing plan builder
* @param componentsToScaleDown scale down factor for the components.
*/
private void removeInstancesFromContainers(PackingPlanBuilder packingPlanBuilder,
Map<String, Integer> componentsToScaleDown) {
for (String componentName : componentsToScaleDown.keySet()) {
int numInstancesToRemove = -componentsToScaleDown.get(componentName);
for (int j = 0; j < numInstancesToRemove; j++) {
removeRRInstance(packingPlanBuilder, componentName);
}
}
}
/**
* Remove an instance of a particular component from the containers
*/
private void removeRRInstance(PackingPlanBuilder packingPlanBuilder,
String componentName) throws RuntimeException {
List<Scorer<Container>> scorers = new ArrayList<>();
scorers.add(new HomogeneityScorer(componentName, true)); // all-same-component containers first
scorers.add(new InstanceCountScorer()); // then fewest instances
scorers.add(new HomogeneityScorer(componentName, false)); // then most homogeneous
scorers.add(new ContainerIdScorer(false)); // then highest container id
this.containerId = nextContainerId(packingPlanBuilder.removeInstance(scorers, componentName));
}
private enum PolicyType {
STRICT, FLEXIBLE;
private void assignInstance(PackingPlanBuilder planBuilder,
String componentName,
ResourceCompliantRRPacking packing)
throws ResourceExceededException, RuntimeException {
switch (this) {
case STRICT:
packing.strictRRpolicy(planBuilder, componentName);
break;
case FLEXIBLE:
packing.flexibleRRpolicy(planBuilder, componentName);
break;
default:
throw new RuntimeException("Not valid policy type");
}
}
}
}