blob: 4c9e075a2c37b408c18dc5e1e95fac69115c614c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.service.balancer;
import java.util.Map;
/**
* Utils for balancer.
*/
public class BalancerUtils {
/**
* Util function to calculate how many streams to balance for <i>nodeToRebalance</i>,
* based on the load distribution <i>loadDistribution</i>.
*
* @param nodeToRebalance
* node to rebalance
* @param loadDistribution
* load distribution map
* @param rebalanceWaterMark
* if number of streams of <i>nodeToRebalance</i>
* is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced.
* @param tolerancePercentage
* tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i>
* is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
* be re-balanced.
* @param <K>
* @return number of streams to rebalance
*/
public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance,
Map<K, Integer> loadDistribution,
int rebalanceWaterMark,
double tolerancePercentage) {
Integer myLoad = loadDistribution.get(nodeToRebalance);
if (null == myLoad || myLoad <= rebalanceWaterMark) {
return 0;
}
long totalLoad = 0L;
int numNodes = loadDistribution.size();
for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) {
if (null == entry.getKey() || null == entry.getValue()) {
continue;
}
totalLoad += entry.getValue();
}
double averageLoad = ((double) totalLoad) / numNodes;
long permissibleLoad =
Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f));
if (myLoad <= permissibleLoad) {
return 0;
}
return Math.max(0, myLoad - (int) Math.ceil(averageLoad));
}
}