blob: 316f124a4cd640cfebe7ba09b4e224459067dc8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.spi.packing;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.heron.common.basics.ByteAmount;
public class PackingPlan {
private final String id;
private final Map<Integer, ContainerPlan> containersMap;
private final Set<ContainerPlan> containers;
public PackingPlan(String id, Set<ContainerPlan> containers) {
this.id = id;
this.containers = ImmutableSet.copyOf(containers);
containersMap = new HashMap<>();
for (ContainerPlan containerPlan : containers) {
containersMap.put(containerPlan.getId(), containerPlan);
}
}
/**
* Computes the maximum of all the resources required by the containers in the packing plan. If
* the PackingPlan has already been scheduled, the scheduled resources will be used over the
* required resources.
*
* @return maximum Resources found in all containers.
*/
public Resource getMaxContainerResources() {
double maxCpu = 0;
ByteAmount maxRam = ByteAmount.ZERO;
ByteAmount maxDisk = ByteAmount.ZERO;
for (ContainerPlan containerPlan : getContainers()) {
Resource containerResource =
containerPlan.getScheduledResource().or(containerPlan.getRequiredResource());
maxCpu = Math.max(maxCpu, containerResource.getCpu());
maxRam = maxRam.max(containerResource.getRam());
maxDisk = maxDisk.max(containerResource.getDisk());
}
return new Resource(maxCpu, maxRam, maxDisk);
}
/**
* Creates a clone of {@link PackingPlan}. It also computes the maximum of all the resources
* required by containers in the packing plan and updates the containers of the clone with the
* max resource information
*/
public PackingPlan cloneWithHomogeneousScheduledResource() {
Resource maxResource = getMaxContainerResources();
Set<ContainerPlan> updatedContainers = new LinkedHashSet<>();
for (ContainerPlan container : getContainers()) {
updatedContainers.add(container.cloneWithScheduledResource(maxResource));
}
return new PackingPlan(getId(), updatedContainers);
}
public String getId() {
return id;
}
public Set<ContainerPlan> getContainers() {
return containers;
}
public Map<Integer, ContainerPlan> getContainersMap() {
return containersMap;
}
public Optional<ContainerPlan> getContainer(int containerId) {
return Optional.fromNullable(this.containersMap.get(containerId));
}
public Integer getInstanceCount() {
Integer totalInstances = 0;
for (Integer count : getComponentCounts().values()) {
totalInstances += count;
}
return totalInstances;
}
/**
* Return a map containing the count of all of the components, keyed by name
*/
public Map<String, Integer> getComponentCounts() {
Map<String, Integer> componentCounts = new HashMap<>();
for (ContainerPlan containerPlan : getContainers()) {
for (InstancePlan instancePlan : containerPlan.getInstances()) {
Integer count = 0;
if (componentCounts.containsKey(instancePlan.getComponentName())) {
count = componentCounts.get(instancePlan.getComponentName());
}
componentCounts.put(instancePlan.getComponentName(), ++count);
}
}
return componentCounts;
}
/**
* Get the formatted String describing component RAM distribution from PackingPlan,
* used by executor
*
* @return String describing component RAM distribution
*/
public String getComponentRamDistribution() {
// Generate a map with the minimal RAM size for each component
Map<String, ByteAmount> ramMap = new HashMap<>();
for (ContainerPlan containerPlan : this.getContainers()) {
for (InstancePlan instancePlan : containerPlan.getInstances()) {
ByteAmount newRam = instancePlan.getResource().getRam();
ByteAmount currentRam = ramMap.get(instancePlan.getComponentName());
if (currentRam == null || currentRam.asBytes() > newRam.asBytes()) {
ramMap.put(instancePlan.getComponentName(), newRam);
}
}
}
// Convert it into a formatted String
StringBuilder ramMapBuilder = new StringBuilder();
for (String component : ramMap.keySet()) {
ramMapBuilder.append(String.format("%s:%d,", component, ramMap.get(component).asBytes()));
}
// Remove the duplicated "," at the end
ramMapBuilder.deleteCharAt(ramMapBuilder.length() - 1);
return ramMapBuilder.toString();
}
@Override
public String toString() {
return String.format("{plan-id: %s, containers-list: %s}",
getId(), getContainers().toString());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PackingPlan that = (PackingPlan) o;
return getId().equals(that.getId())
&& getContainers().equals(that.getContainers());
}
@Override
public int hashCode() {
int result = getId().hashCode();
result = 31 * result + getContainers().hashCode();
return result;
}
public static class InstancePlan {
private final String componentName;
private final int taskId;
private final int componentIndex;
private final Resource resource;
public InstancePlan(InstanceId instanceId, Resource resource) {
this.componentName = instanceId.getComponentName();
this.taskId = instanceId.getTaskId();
this.componentIndex = instanceId.getComponentIndex();
this.resource = resource;
}
public String getComponentName() {
return componentName;
}
public int getTaskId() {
return taskId;
}
public int getComponentIndex() {
return componentIndex;
}
public Resource getResource() {
return resource;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InstancePlan that = (InstancePlan) o;
return getComponentName().equals(that.getComponentName())
&& getTaskId() == that.getTaskId()
&& getComponentIndex() == that.getComponentIndex()
&& getResource().equals(that.getResource());
}
@Override
public int hashCode() {
int result = getComponentName().hashCode();
result = 31 * result + ((Integer) getTaskId()).hashCode();
result = 31 * result + ((Integer) getComponentIndex()).hashCode();
result = 31 * result + getResource().hashCode();
return result;
}
@Override
public String toString() {
return String.format(
"{component-name: %s, task-id: %s, component-index: %s, instance-resource: %s}",
getComponentName(), getTaskId(), getComponentIndex(), getResource().toString());
}
}
public static class ContainerPlan {
private final int id;
private final Set<InstancePlan> instances;
private final Resource requiredResource;
private final Optional<Resource> scheduledResource;
public ContainerPlan(int id, Set<InstancePlan> instances, Resource requiredResource) {
this(id, instances, requiredResource, null);
}
public ContainerPlan(int id,
Set<InstancePlan> instances,
Resource requiredResource,
Resource scheduledResource) {
this.id = id;
this.instances = ImmutableSet.copyOf(instances);
this.requiredResource = requiredResource;
this.scheduledResource = Optional.fromNullable(scheduledResource);
}
public int getId() {
return id;
}
public Set<InstancePlan> getInstances() {
return instances;
}
public Resource getRequiredResource() {
return requiredResource;
}
public Optional<Resource> getScheduledResource() {
return scheduledResource;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerPlan that = (ContainerPlan) o;
return id == that.id
&& getInstances().equals(that.getInstances())
&& getRequiredResource().equals(that.getRequiredResource())
&& getScheduledResource().equals(that.getScheduledResource());
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + getInstances().hashCode();
result = 31 * result + getRequiredResource().hashCode();
if (scheduledResource.isPresent()) {
result = 31 * result + getScheduledResource().get().hashCode();
}
return result;
}
/**
* Returns a {@link ContainerPlan} with updated scheduledResource
*/
private ContainerPlan cloneWithScheduledResource(Resource resource) {
return new ContainerPlan(getId(), getInstances(), getRequiredResource(), resource);
}
@Override
public String toString() {
String str = String.format("{container-id: %s, instances-list: %s, required-resource: %s",
id, getInstances().toString(), getRequiredResource());
if (scheduledResource.isPresent()) {
str = String.format("%s, scheduled-resource: %s", str, getScheduledResource().get());
}
return str + "}";
}
}
}