blob: c322e3de04fff6afcdf6ed601d560c893e50a991 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
public class WeightToPercentConverter
implements CapacityConverter {
private static final BigDecimal HUNDRED = new BigDecimal(100).setScale(3);
private static final BigDecimal ZERO = new BigDecimal(0).setScale(3);
@Override
public void convertWeightsForChildQueues(FSQueue queue,
Configuration csConfig) {
List<FSQueue> children = queue.getChildQueues();
int totalWeight = getTotalWeight(children);
Pair<Map<String, BigDecimal>, Boolean> result =
getCapacities(totalWeight, children);
Map<String, BigDecimal> capacities = result.getLeft();
boolean shouldAllowZeroSumCapacity = result.getRight();
capacities
.forEach((key, value) -> csConfig.set(PREFIX + key +
".capacity", value.toString()));
if (shouldAllowZeroSumCapacity) {
String queueName = queue.getName();
csConfig.setBoolean(
PREFIX + queueName + ".allow-zero-capacity-sum", true);
}
}
private Pair<Map<String, BigDecimal>, Boolean> getCapacities(int totalWeight,
List<FSQueue> children) {
if (children.size() == 0) {
return Pair.of(new HashMap<>(), false);
} else if (children.size() == 1) {
Map<String, BigDecimal> capacity = new HashMap<>();
String queueName = children.get(0).getName();
capacity.put(queueName, HUNDRED);
return Pair.of(capacity, false);
} else {
Map<String, BigDecimal> capacities = new HashMap<>();
children
.stream()
.forEach(queue -> {
BigDecimal pct;
if (totalWeight == 0) {
pct = ZERO;
} else {
BigDecimal total = new BigDecimal(totalWeight);
BigDecimal weight = new BigDecimal(queue.getWeight());
pct = weight
.setScale(5)
.divide(total, RoundingMode.HALF_UP)
.multiply(HUNDRED)
.setScale(3);
}
capacities.put(queue.getName(), pct);
});
BigDecimal totalPct = ZERO;
for (Map.Entry<String, BigDecimal> entry : capacities.entrySet()) {
totalPct = totalPct.add(entry.getValue());
}
// fix capacities if total != 100.000
boolean shouldAllowZeroSumCapacity = false;
if (!totalPct.equals(HUNDRED)) {
shouldAllowZeroSumCapacity = fixCapacities(capacities, totalPct);
}
return Pair.of(capacities, shouldAllowZeroSumCapacity);
}
}
@VisibleForTesting
boolean fixCapacities(Map<String, BigDecimal> capacities,
BigDecimal totalPct) {
boolean shouldAllowZeroSumCapacity = false;
// Sort the list so we'll adjust the highest capacity value,
// because that will affected less by a small change.
// Also, it's legal to have weight = 0 and we have to avoid picking
// that value as well.
List<Map.Entry<String, BigDecimal>> sortedEntries = capacities
.entrySet()
.stream()
.sorted(new Comparator<Map.Entry<String, BigDecimal>>() {
@Override
public int compare(Map.Entry<String, BigDecimal> e1,
Map.Entry<String, BigDecimal> e2) {
return e2.getValue().compareTo(e1.getValue());
}
})
.collect(Collectors.toList());
String highestCapacityQueue = sortedEntries.get(0).getKey();
BigDecimal highestCapacity = sortedEntries.get(0).getValue();
if (highestCapacity.equals(ZERO)) {
// need to set allow-zero-capacity-sum on this queue
// because we have zero weights on this level
shouldAllowZeroSumCapacity = true;
} else {
BigDecimal diff = HUNDRED.subtract(totalPct);
BigDecimal correctedHighest = highestCapacity.add(diff);
capacities.put(highestCapacityQueue, correctedHighest);
}
return shouldAllowZeroSumCapacity;
}
private int getTotalWeight(List<FSQueue> children) {
double sum = children
.stream()
.mapToDouble(c -> c.getWeight())
.sum();
return (int) sum;
}
}