blob: de097a6b0c701e7ab54e894890badfb3d0070c15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler.resource.normalization;
import com.codahale.metrics.Meter;
import java.util.Arrays;
import java.util.Map;
import org.apache.storm.Constants;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
* does not keep track of memory as a resource.
*/
public class NormalizedResources {
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
static {
resetResourceNames();
}
private double cpu;
private double[] otherResources;
/**
* Create an empty NormalizedResources.
*/
NormalizedResources() {
cpu = 0.0;
otherResources = RESOURCE_MAP_ARRAY_BRIDGE.empty();
}
/**
* Copy constructor.
*/
public NormalizedResources(NormalizedResources other) {
cpu = other.cpu;
otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
}
/**
* Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
* offers because of how on heap vs off heap is used.
*
* @param normalizedResources the normalized resource map
*/
public NormalizedResources(Map<String, Double> normalizedResources) {
cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
}
/**
* This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
* algorithms sadly have different behavior if a resource exists or not.
*/
@VisibleForTesting
public static void resetResourceNames() {
RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
}
/**
* Get the total amount of cpu.
*
* @return the amount of cpu.
*/
public double getTotalCpu() {
return cpu;
}
private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
if (requiredLength > otherResources.length) {
double[] newResources = new double[requiredLength];
System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
otherResources = newResources;
}
}
private void add(double[] resourceArray) {
int otherLength = resourceArray.length;
zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] += resourceArray[i];
}
}
public void add(NormalizedResources other) {
this.cpu += other.cpu;
add(other.otherResources);
}
/**
* Add the resources from a worker to this.
*
* @param value the worker resources that should be added to this.
*/
public void add(WorkerResources value) {
Map<String, Double> workerNormalizedResources = value.get_resources();
cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
}
/**
* Remove the other resources from this. This is the same as subtracting the resources in other from this.
*
* @param other the resources we want removed.
* @param resourceMetrics The resource related metrics
* @return true if the resources would have gone negative, but were clamped to 0.
*/
public boolean remove(NormalizedResources other, ResourceMetrics resourceMetrics) {
boolean ret = false;
this.cpu -= other.cpu;
if (cpu < 0.0) {
ret = true;
if (resourceMetrics != null) {
resourceMetrics.getNegativeResourceEventsMeter().mark();
}
cpu = 0.0;
}
int otherLength = other.otherResources.length;
zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] -= other.otherResources[i];
if (otherResources[i] < 0.0) {
ret = true;
if (resourceMetrics != null) {
resourceMetrics.getNegativeResourceEventsMeter().mark();
}
otherResources[i] = 0.0;
}
}
return ret;
}
/**
* Remove the resources of a worker from this.
*
* @param value the worker resources that should be removed from this.
*/
public boolean remove(WorkerResources value) {
Map<String, Double> workerNormalizedResources = value.get_resources();
cpu -= workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
return remove(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources)) || cpu < 0;
}
private boolean remove(double[] resourceArray) {
boolean ret = false;
int otherLength = resourceArray.length;
zeroPadOtherResourcesIfNecessary(otherLength);
for (int i = 0; i < otherLength; i++) {
otherResources[i] -= resourceArray[i];
if (otherResources[i] < 0) {
ret = true;
}
}
return ret;
}
@Override
public String toString() {
return "Normalized resources: " + toNormalizedMap();
}
/**
* Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
* user.
*/
public Map<String, Double> toNormalizedMap() {
Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
return ret;
}
private double getResourceAt(int index) {
if (index >= otherResources.length) {
return 0.0;
}
return otherResources[index];
}
/**
* A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
* does not check memory because with shared memory it is beyond the scope of this.
*
* @param other the resources that we want to check if they would fit in this.
* @param thisTotalMemoryMb The total memory in MB of this
* @param otherTotalMemoryMb The total memory in MB of other
* @return true if it might fit, else false if it could not possibly fit.
*/
public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
if (this.cpu < other.getTotalCpu()) {
return false;
}
return couldHoldIgnoringSharedMemoryAndCpu(other, thisTotalMemoryMb, otherTotalMemoryMb);
}
/**
* A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
* does not check memory because with shared memory it is beyond the scope of this. It also does not check CPU.
*
* @param other the resources that we want to check if they would fit in this.
* @param thisTotalMemoryMb The total memory in MB of this
* @param otherTotalMemoryMb The total memory in MB of other
* @return true if it might fit, else false if it could not possibly fit.
*/
public boolean couldHoldIgnoringSharedMemoryAndCpu(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
int length = Math.max(this.otherResources.length, other.otherResources.length);
for (int i = 0; i < length; i++) {
if (getResourceAt(i) < other.getResourceAt(i)) {
return false;
}
}
return thisTotalMemoryMb >= otherTotalMemoryMb;
}
private String getResourceNameForResourceIndex(int resourceIndex) {
for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
int index = entry.getValue();
if (index == resourceIndex) {
return entry.getKey();
}
}
return null;
}
private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb, String info) {
throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f', additionalInfo: '%s'",
used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb, info));
}
private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
}
/**
* Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
* subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
* division by 0. If all resources are skipped the result is defined to be 100.0.
*
* @param used the amount of resources used.
* @param totalMemoryMb The total memory in MB
* @param usedMemoryMb The used memory in MB
* @return the average percentage used 0.0 to 100.0.
*
* @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
* resources that are not present in the total.
*/
public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
int skippedResourceTypes = 0;
double total = 0.0;
if (usedMemoryMb > totalMemoryMb) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalMemoryMb != 0.0) {
total += usedMemoryMb / totalMemoryMb;
} else {
skippedResourceTypes++;
}
double totalCpu = getTotalCpu();
if (used.getTotalCpu() > getTotalCpu()) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalCpu != 0.0) {
total += used.getTotalCpu() / getTotalCpu();
} else {
skippedResourceTypes++;
}
if (used.otherResources.length > otherResources.length) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
for (int i = 0; i < otherResources.length; i++) {
double totalValue = otherResources[i];
double usedValue;
if (i >= used.otherResources.length) {
//Resources missing from used are using none of that resource
usedValue = 0.0;
} else {
usedValue = used.otherResources[i];
}
if (usedValue > totalValue) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalValue == 0.0) {
//Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
//We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
skippedResourceTypes++;
continue;
}
total += usedValue / totalValue;
}
//Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
int divisor = 2 + otherResources.length - skippedResourceTypes;
if (divisor == 0) {
/*
* This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
* no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
*/
return 100.0;
} else {
return (total * 100.0) / divisor;
}
}
/**
* Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
* subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
* division by 0. If all resources are skipped the result is defined to be 100.0.
*
* @param used the amount of resources used.
* @param totalMemoryMb The total memory in MB
* @param usedMemoryMb The used memory in MB
* @return the minimum percentage used 0.0 to 100.0.
*
* @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
* resources that are not present in the total.
*/
public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
if (LOG.isTraceEnabled()) {
LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+ " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
toNormalizedMap(), used.toNormalizedMap());
}
double min = 1.0;
if (usedMemoryMb > totalMemoryMb) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalMemoryMb != 0.0) {
min = Math.min(min, usedMemoryMb / totalMemoryMb);
}
double totalCpu = getTotalCpu();
if (used.getTotalCpu() > totalCpu) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
if (totalCpu != 0.0) {
min = Math.min(min, used.getTotalCpu() / totalCpu);
}
if (used.otherResources.length > otherResources.length) {
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
}
for (int i = 0; i < otherResources.length; i++) {
if (otherResources[i] == 0.0) {
//Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
//We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
continue;
}
if (i >= used.otherResources.length) {
//Resources missing from used are using none of that resource
return 0;
}
if (used.otherResources[i] > otherResources[i]) {
String info = String.format("%s, %f > %f", getResourceNameForResourceIndex(i), used.otherResources[i], otherResources[i]);
throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb, info);
}
min = Math.min(min, used.otherResources[i] / otherResources[i]);
}
return min * 100.0;
}
/**
* If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
* be less likely to be selected. If the resource is in the request, make that resource positive.
* @param request the requested resources.
*/
public void updateForRareResourceAffinity(NormalizedResources request) {
int length = Math.min(this.otherResources.length, request.otherResources.length);
for (int i = 0; i < length; i++) {
if (request.getResourceAt(i) == 0.0 && this.otherResources[i] > 0.0) {
this.otherResources[i] = -this.otherResources[i]; // make negative
} else if (request.getResourceAt(i) > 0.0 && this.otherResources[i] < 0.0) {
this.otherResources[i] = -this.otherResources[i]; // make positive
}
}
}
/**
* Set all resources to 0.
*/
public void clear() {
this.cpu = 0.0;
for (int i = 0; i < otherResources.length; i++) {
otherResources[i] = 0.0;
}
}
private boolean areAnyOverZero(boolean skipCpuCheck) {
for (int i = 0; i < otherResources.length; i++) {
if (otherResources[i] > 0) {
return true;
}
}
if (skipCpuCheck) {
return false;
} else {
return cpu > 0;
}
}
/**
* Are any of the resources positive.
* @return true of any of the resources are positive. False if they are all <= 0.
*/
public boolean areAnyOverZero() {
return areAnyOverZero(false);
}
/**
* Are any of the non cpu resources positive.
* @return true of any of the non cpu resources are positive. False if they are all <= 0.
*/
public boolean anyNonCpuOverZero() {
return areAnyOverZero(true);
}
}