blob: 633a0f4d62f287c8ed157123e061561b17a3f275 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.packing.builder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.packing.exceptions.ResourceExceededException;
import org.apache.heron.packing.utils.PackingUtils;
import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
/**
* Class the help with building packing plans.
*/
public class PackingPlanBuilder {
private static final Logger LOG = Logger.getLogger(PackingPlanBuilder.class.getName());
private final String topologyId;
private final PackingPlan existingPacking;
private Resource defaultInstanceResource;
private Resource maxContainerResource;
private Map<String, ByteAmount> componentRamMap;
private int requestedContainerPadding;
private int numContainers;
private Map<Integer, Container> containers;
private TreeSet<Integer> taskIds; // globally unique ids assigned to instances
private HashMap<String, TreeSet<Integer>> componentIndexes; // componentName -> componentIndexes
public PackingPlanBuilder(String topologyId) {
this(topologyId, null);
}
public PackingPlanBuilder(String topologyId, PackingPlan existingPacking) {
this.topologyId = topologyId;
this.existingPacking = existingPacking;
this.numContainers = 0;
this.requestedContainerPadding = 0;
this.componentRamMap = new HashMap<>();
}
// set resource settings
public PackingPlanBuilder setDefaultInstanceResource(Resource resource) {
this.defaultInstanceResource = resource;
return this;
}
public PackingPlanBuilder setMaxContainerResource(Resource resource) {
this.maxContainerResource = resource;
return this;
}
public PackingPlanBuilder setRequestedComponentRam(Map<String, ByteAmount> ramMap) {
this.componentRamMap = ramMap;
return this;
}
public PackingPlanBuilder setRequestedContainerPadding(int percent) {
this.requestedContainerPadding = percent;
return this;
}
// Calling updateNumContainers will produce that many containers, starting with id 1. The build()
// method will prune out empty containers from the plan.
public PackingPlanBuilder updateNumContainers(int count) {
this.numContainers = count;
return this;
}
// adds an instance to a container with id containerId. If that container does not exist, it will
// be lazily initialized, which could result in more containers than those requested using the
// updateNumContainers method
public PackingPlanBuilder addInstance(Integer containerId,
String componentName) throws ResourceExceededException {
initContainer(containerId);
Integer taskId = taskIds.isEmpty() ? 1 : taskIds.last() + 1;
Integer componentIndex = componentIndexes.get(componentName) != null
? componentIndexes.get(componentName).last() + 1 : 0;
InstanceId instanceId = new InstanceId(componentName, taskId, componentIndex);
Resource instanceResource = PackingUtils.getResourceRequirement(
componentName, this.componentRamMap, this.defaultInstanceResource,
this.maxContainerResource, this.requestedContainerPadding);
try {
addToContainer(containers.get(containerId),
new PackingPlan.InstancePlan(instanceId, instanceResource),
this.componentIndexes, this.taskIds);
} catch (ResourceExceededException e) {
throw new ResourceExceededException(String.format(
"Insufficient container resources to add instance %s with resources %s to container %d.",
instanceId, instanceResource, containerId), e);
}
LOG.finest(String.format("Added to container %d instance %s", containerId, instanceId));
return this;
}
@SuppressWarnings("JavadocMethod")
/**
* Add an instance to the first container possible ranked by score.
* @return containerId of the container the instance was added to
* @throws org.apache.heron.packing.ResourceExceededException if the instance could not be added
*/
public int addInstance(Scorer<Container> scorer,
String componentName) throws ResourceExceededException {
List<Scorer<Container>> scorers = new LinkedList<>();
scorers.add(scorer);
return addInstance(scorers, componentName);
}
@SuppressWarnings("JavadocMethod")
/**
* Add an instance to the first container possible ranked by score. If a scoring tie exists,
* uses the next scorer in the scorers list to break the tie.
* @return containerId of the container the instance was added to
* @throws org.apache.heron.packing.ResourceExceededException if the instance could not be added
*/
private int addInstance(List<Scorer<Container>> scorers, String componentName)
throws ResourceExceededException {
initContainers();
for (Container container : sortContainers(scorers, this.containers.values())) {
try {
addInstance(container.getContainerId(), componentName);
return container.getContainerId();
} catch (ResourceExceededException e) {
// ignore since we'll continue trying
}
}
//Not enough containers.
throw new ResourceExceededException(String.format(
"Insufficient resources to add '%s' instance to any of the %d containers.",
componentName, this.containers.size()));
}
public void removeInstance(Integer containerId, String componentName) throws PackingException {
initContainers();
Container container = containers.get(containerId);
if (container == null) {
throw new PackingException(String.format("Failed to remove component '%s' because container "
+ "with id %d does not exist.", componentName, containerId));
}
Optional<PackingPlan.InstancePlan> instancePlan =
container.removeAnyInstanceOfComponent(componentName);
if (instancePlan.isPresent()) {
taskIds.remove(instancePlan.get().getTaskId());
if (componentIndexes.get(componentName) != null) {
componentIndexes.get(componentName).remove(instancePlan.get().getComponentIndex());
}
} else {
throw new PackingException(String.format("Failed to remove component '%s' because container "
+ "with id %d does not include that component'", componentName, containerId));
}
}
@SuppressWarnings("JavadocMethod")
/**
* Remove an instance from the first container possible ranked by score.
* @return containerId of the container the instance was removed from
* @throws org.apache.heron.spi.packing.PackingException if the instance could not be removed
*/
public int removeInstance(Scorer<Container> scorer, String componentName) {
List<Scorer<Container>> scorers = new LinkedList<>();
scorers.add(scorer);
return removeInstance(scorers, componentName);
}
@SuppressWarnings("JavadocMethod")
/**
* Remove an instance from the first container possible ranked by score. If a scoring tie exists,
* uses the next scorer in the scorers list to break the tie.
* @return containerId of the container the instance was removed from
* @throws org.apache.heron.spi.packing.PackingException if the instance could not be removed
*/
public int removeInstance(List<Scorer<Container>> scorers, String componentName) {
initContainers();
for (Container container : sortContainers(scorers, this.containers.values())) {
try {
removeInstance(container.getContainerId(), componentName);
return container.getContainerId();
} catch (PackingException e) {
// ignore since we keep trying
}
}
throw new PackingException("Cannot remove instance. No more instances of component "
+ componentName + " exist in the containers.");
}
// build container plan sets by summing up instance resources
public PackingPlan build() {
assertResourceSettings();
Set<PackingPlan.ContainerPlan> containerPlans = buildContainerPlans(
this.containers, this.componentRamMap,
this.defaultInstanceResource, this.requestedContainerPadding);
return new PackingPlan(topologyId, containerPlans);
}
private void initContainers() {
assertResourceSettings();
Map<Integer, Container> newContainerMap = this.containers;
HashMap<String, TreeSet<Integer>> newComponentIndexes = this.componentIndexes;
TreeSet<Integer> newTaskIds = this.taskIds;
if (newComponentIndexes == null) {
newComponentIndexes = new HashMap<>();
}
if (newTaskIds == null) {
newTaskIds = new TreeSet<>();
}
// if this is the first time called, initialize container map with empty or existing containers
if (newContainerMap == null) {
if (this.existingPacking == null) {
newContainerMap = new HashMap<>();
for (int containerId = 1; containerId <= numContainers; containerId++) {
newContainerMap.put(containerId, new Container(
containerId, this.maxContainerResource, this.requestedContainerPadding));
}
} else {
try {
newContainerMap = getContainers(this.existingPacking, this.requestedContainerPadding,
newComponentIndexes, newTaskIds);
} catch (ResourceExceededException e) {
throw new PackingException(
"Could not initialize containers using existing packing plan", e);
}
}
}
if (this.numContainers > newContainerMap.size()) {
List<Scorer<Container>> scorers = new ArrayList<>();
scorers.add(new ContainerIdScorer());
List<Container> sortedContainers = sortContainers(scorers, newContainerMap.values());
int nextContainerId = sortedContainers.get(sortedContainers.size() - 1).getContainerId() + 1;
Resource capacity =
newContainerMap.get(sortedContainers.get(0).getContainerId()).getCapacity();
for (int i = 0; i < numContainers - newContainerMap.size(); i++) {
newContainerMap.put(nextContainerId,
new Container(nextContainerId, capacity, this.requestedContainerPadding));
nextContainerId++;
}
}
this.containers = newContainerMap;
this.componentIndexes = newComponentIndexes;
this.taskIds = newTaskIds;
}
private void initContainer(int containerId) {
initContainers();
if (this.containers.get(containerId) == null) {
this.containers.put(containerId, new Container(
containerId, this.maxContainerResource, this.requestedContainerPadding));
}
}
private void assertResourceSettings() {
if (this.defaultInstanceResource == null) {
throw new PackingException(
"defaultInstanceResource must be set on PackingPlanBuilder before modifying containers");
}
if (this.maxContainerResource == null) {
throw new PackingException(
"maxContainerResource must be set on PackingPlanBuilder before modifying containers");
}
}
/**
* Estimate the per instance and topology resources for the packing plan based on the ramMap,
* instance defaults and paddingPercentage.
*
* @return container plans
*/
private static Set<PackingPlan.ContainerPlan> buildContainerPlans(
Map<Integer, Container> containerInstances,
Map<String, ByteAmount> ramMap,
Resource instanceDefaults,
int paddingPercentage) {
Set<PackingPlan.ContainerPlan> containerPlans = new LinkedHashSet<>();
for (Integer containerId : containerInstances.keySet()) {
Container container = containerInstances.get(containerId);
if (container.getInstances().size() == 0) {
continue;
}
ByteAmount containerRam = ByteAmount.ZERO;
ByteAmount containerDiskInBytes = ByteAmount.ZERO;
double containerCpu = 0;
// Calculate the resource required for single instance
Set<PackingPlan.InstancePlan> instancePlans = new HashSet<>();
for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
InstanceId instanceId = new InstanceId(instancePlan.getComponentName(),
instancePlan.getTaskId(), instancePlan.getComponentIndex());
ByteAmount instanceRam;
if (ramMap.containsKey(instanceId.getComponentName())) {
instanceRam = ramMap.get(instanceId.getComponentName());
} else {
instanceRam = instanceDefaults.getRam();
}
containerRam = containerRam.plus(instanceRam);
// Currently not yet support disk or CPU config for different components,
// so just use the default value.
ByteAmount instanceDisk = instanceDefaults.getDisk();
containerDiskInBytes = containerDiskInBytes.plus(instanceDisk);
double instanceCpu = instanceDefaults.getCpu();
containerCpu += instanceCpu;
// Insert it into the map
instancePlans.add(new PackingPlan.InstancePlan(instanceId,
new Resource(instanceCpu, instanceRam, instanceDisk)));
}
containerCpu += (paddingPercentage * containerCpu) / 100;
containerRam = containerRam.increaseBy(paddingPercentage);
containerDiskInBytes = containerDiskInBytes.increaseBy(paddingPercentage);
Resource resource =
new Resource(Math.round(containerCpu), containerRam, containerDiskInBytes);
PackingPlan.ContainerPlan containerPlan =
new PackingPlan.ContainerPlan(containerId, instancePlans, resource);
containerPlans.add(containerPlan);
}
return containerPlans;
}
/**
* Generates the containers that correspond to the current packing plan
* along with their associated instances.
*
* @return Map of containers for the current packing plan, keyed by containerId
*/
@VisibleForTesting
static Map<Integer, Container> getContainers(
PackingPlan currentPackingPlan, int paddingPercentage,
Map<String, TreeSet<Integer>> componentIndexes, TreeSet<Integer> taskIds)
throws ResourceExceededException {
Map<Integer, Container> containers = new HashMap<>();
Resource capacity = currentPackingPlan.getMaxContainerResources();
for (PackingPlan.ContainerPlan currentContainerPlan : currentPackingPlan.getContainers()) {
Container container =
new Container(currentContainerPlan.getId(), capacity, paddingPercentage);
for (PackingPlan.InstancePlan instancePlan : currentContainerPlan.getInstances()) {
try {
addToContainer(container, instancePlan, componentIndexes, taskIds);
} catch (ResourceExceededException e) {
throw new ResourceExceededException(String.format(
"Insufficient container resources to add instancePlan %s to container %s",
instancePlan, container), e);
}
}
containers.put(currentContainerPlan.getId(), container);
}
return containers;
}
@VisibleForTesting
static List<Container> sortContainers(List<Scorer<Container>> scorers,
Collection<Container> containers) {
List<Container> sorted = new ArrayList<>(containers);
Collections.sort(sorted, new ChainedContainerComparator<>(scorers));
return sorted;
}
/**
* Add instancePlan to container and update the componentIndexes and taskIds indexes
*/
private static void addToContainer(Container container,
PackingPlan.InstancePlan instancePlan,
Map<String, TreeSet<Integer>> componentIndexes,
Set<Integer> taskIds) throws ResourceExceededException {
container.add(instancePlan);
String componentName = instancePlan.getComponentName();
if (componentIndexes.get(componentName) == null) {
componentIndexes.put(componentName, new TreeSet<Integer>());
}
componentIndexes.get(componentName).add(instancePlan.getComponentIndex());
taskIds.add(instancePlan.getTaskId());
}
private static class ChainedContainerComparator<T> implements Comparator<T> {
private final Comparator<T> comparator;
private final ChainedContainerComparator<T> tieBreaker;
ChainedContainerComparator(List<Scorer<T>> scorers) {
this((Queue<Scorer<T>>) new LinkedList<Scorer<T>>(scorers));
}
ChainedContainerComparator(Queue<Scorer<T>> scorers) {
if (scorers.isEmpty()) {
this.comparator = new EqualsComparator<T>();
this.tieBreaker = null;
} else {
this.comparator = new ContainerComparator<T>(scorers.remove());
this.tieBreaker = new ChainedContainerComparator<T>(scorers);
}
}
@Override
public int compare(T thisOne, T thatOne) {
int delta = comparator.compare(thisOne, thatOne);
if (delta != 0 || this.tieBreaker == null) {
return delta;
}
return tieBreaker.compare(thisOne, thatOne);
}
}
private static class ContainerComparator<T> implements Comparator<T> {
private Scorer<T> scorer;
ContainerComparator(Scorer<T> scorer) {
this.scorer = scorer;
}
@Override
public int compare(T thisOne, T thatOne) {
int sign = 1;
if (!scorer.sortAscending()) {
sign = -1;
}
return sign * (getScore(thisOne) - getScore(thatOne));
}
private int getScore(T container) {
return (int) (1000 * scorer.getScore(container));
}
}
private static class EqualsComparator<T> implements Comparator<T> {
@Override
public int compare(T thisOne, T thatOne) {
return 0;
}
}
}