blob: d73769a3971bcfc242fbcb732fe72f62b088221f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
* based on the amount they are cached on a given server. A region can move across the region
* servers whenever a region server shuts down or crashes. The region server preserves the cache
* periodically and restores the cache when it is restarted. This balancer implements a mechanism
* where it maintains the amount by which a region is cached on a region server. During balancer
* run, a region plan is generated that takes into account this cache information and tries to
* move the regions so that the cache minimally impacted.
*/
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
private Configuration configuration;
public enum GeneratorFunctionType {
LOAD,
CACHE_RATIO
}
@Override
public synchronized void loadConf(Configuration configuration) {
this.configuration = configuration;
this.costFunctions = new ArrayList<>();
super.loadConf(configuration);
}
@Override
protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<>(2);
candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(),
new CacheAwareSkewnessCandidateGenerator());
candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(),
new CacheAwareCandidateGenerator());
return candidateGenerators;
}
@Override
protected List<CostFunction> createCostFunctions(Configuration configuration) {
List<CostFunction> costFunctions = new ArrayList<>();
addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
return costFunctions;
}
private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
if (costFunction.getMultiplier() > 0) {
costFunctions.add(costFunction);
}
}
@Override
public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
this.clusterStatus = clusterMetrics;
updateRegionLoad();
}
/**
* Collect the amount of region cached for all the regions from all the active region servers.
*/
private void updateRegionLoad() {
loads = new HashMap<>();
regionCacheRatioOnOldServerMap = new HashMap<>();
Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
// Build current region cache statistics
clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
// Create a map of region and the server where it is currently hosted
sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
String regionEncodedName = RegionInfo.encodeRegionName(regionName);
Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
// Get the total size of the hFiles in this region
int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
rload.add(new BalancerRegionLoad(rm));
// Maintain a map of region and it's total size. This is needed to calculate the cache
// ratios for the regions cached on old region servers
regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
loads.put(regionEncodedName, rload);
});
});
// Build cache statistics for the regions hosted previously on old region servers
clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
// Find if a region was previously hosted on a server other than the one it is currently
// hosted on.
sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
// If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
// this server
if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
ServerName currentServer =
regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
if (!ServerName.isSameAddress(currentServer, sn)) {
int regionSizeMB =
regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
float regionCacheRatioOnOldServer =
regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
regionCacheRatioOnOldServerMap.put(regionEncodedName,
new Pair<>(sn, regionCacheRatioOnOldServer));
}
}
});
});
}
private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
Optional<RegionInfo> regionInfoOptional =
Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
return regionName.equals(ri.getEncodedName());
}).findFirst();
if (regionInfoOptional.isPresent()) {
return regionInfoOptional.get();
}
return null;
}
private class CacheAwareCandidateGenerator extends CandidateGenerator {
@Override
protected BalanceAction generate(BalancerClusterState cluster) {
// Move the regions to the servers they were previously hosted on based on the cache ratio
if (
!regionCacheRatioOnOldServerMap.isEmpty()
&& regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
) {
Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
regionCacheRatioOnOldServerMap.entrySet().iterator().next();
// Get the server where this region was previously hosted
String regionEncodedName = regionCacheRatioServerMap.getKey();
RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
if (regionInfo == null) {
LOG.warn("Region {} not found", regionEncodedName);
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return BalanceAction.NULL_ACTION;
}
if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return BalanceAction.NULL_ACTION;
}
int regionIndex = cluster.regionsToIndex.get(regionInfo);
int oldServerIndex = cluster.serversToIndex
.get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
if (oldServerIndex < 0) {
LOG.warn("Server previously hosting region {} not found", regionEncodedName);
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return BalanceAction.NULL_ACTION;
}
float oldRegionCacheRatio =
cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
float currentRegionCacheRatio =
cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return action;
}
return BalanceAction.NULL_ACTION;
}
private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
float cacheRatioOnOldServer) {
return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
: BalanceAction.NULL_ACTION;
}
private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
float cacheRatioOnOldServer) {
// Find if the region has already moved by comparing the current server index with the
// current server index. This can happen when other candidate generator has moved the region
if (currentServerIndex < 0 || oldServerIndex < 0) {
return false;
}
float cacheRatioDiffThreshold = 0.6f;
// Conditions for moving the region
// If the region is fully cached on the old server, move the region back
if (cacheRatioOnOldServer == 1.0f) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region {} moved to the old server {} as it is fully cached there",
cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
}
return true;
}
// Move the region back to the old server if it is cached equally on both the servers
if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Region {} moved from {} to {} as the region is cached {} equally on both servers",
cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
}
return true;
}
// If the region is not fully cached on either of the servers, move the region back to the
// old server if the region cache ratio on the current server is still much less than the old
// server
if (
cacheRatioOnOldServer > 0.0f
&& cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Region {} moved from {} to {} as region cache ratio {} is better than the current "
+ "cache ratio {}",
cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
}
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
cluster.regions[regionIndex], cluster.servers[currentServerIndex],
cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
}
return false;
}
}
private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
@Override
BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
// First move all the regions which were hosted previously on some other server back to their
// old servers
if (
!regionCacheRatioOnOldServerMap.isEmpty()
&& regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
) {
// Get the first region index in the historical cache ratio list
Map.Entry<String, Pair<ServerName, Float>> regionEntry =
regionCacheRatioOnOldServerMap.entrySet().iterator().next();
String regionEncodedName = regionEntry.getKey();
RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
if (regionInfo == null) {
LOG.warn("Region {} does not exist", regionEncodedName);
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return BalanceAction.NULL_ACTION;
}
if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return BalanceAction.NULL_ACTION;
}
int regionIndex = cluster.regionsToIndex.get(regionInfo);
// Get the current host name for this region
thisServer = cluster.regionIndexToServerIndex[regionIndex];
// Get the old server index
otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
if (otherServer < 0) {
// The old server has been moved to other host and hence, the region cannot be moved back
// to the old server
if (LOG.isDebugEnabled()) {
LOG.debug(
"CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
+ "server {} as the server does not exist",
regionEncodedName, regionEntry.getValue().getFirst().getHostname());
}
return BalanceAction.NULL_ACTION;
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
+ "was hosted their earlier",
regionEncodedName, cluster.servers[thisServer].getHostname(),
cluster.servers[otherServer].getHostname());
}
return getAction(thisServer, regionIndex, otherServer, -1);
}
if (thisServer < 0 || otherServer < 0) {
return BalanceAction.NULL_ACTION;
}
int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
if (regionIndexToMove < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
}
return BalanceAction.NULL_ACTION;
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
+ "least cached on current server",
cluster.regions[regionIndexToMove].getEncodedName(),
cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
}
return getAction(thisServer, regionIndexToMove, otherServer, -1);
}
private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
float minCacheRatio = Float.MAX_VALUE;
int leastCachedRegion = -1;
for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
int regionIndex = cluster.regionsPerServer[thisServer][i];
float cacheRatioOnCurrentServer =
cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
if (cacheRatioOnCurrentServer < minCacheRatio) {
minCacheRatio = cacheRatioOnCurrentServer;
leastCachedRegion = regionIndex;
}
}
return leastCachedRegion;
}
}
static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
static final String REGION_COUNT_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
private final DoubleArrayCost cost = new DoubleArrayCost();
CacheAwareRegionSkewnessCostFunction(Configuration conf) {
// Load multiplier should be the greatest as it is the most general way to balance data.
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
}
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
cost.prepare(cluster.numServers);
cost.applyCostsChange(costs -> {
for (int i = 0; i < cluster.numServers; i++) {
costs[i] = cluster.regionsPerServer[i].length;
}
});
}
@Override
protected double cost() {
return cost.cost();
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.applyCostsChange(costs -> {
costs[oldServer] = cluster.regionsPerServer[oldServer].length;
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
public final void updateWeight(double[] weights) {
weights[GeneratorFunctionType.LOAD.ordinal()] += cost();
}
}
static class CacheAwareCostFunction extends CostFunction {
private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
private double cacheRatio;
private double bestCacheRatio;
private static final float DEFAULT_CACHE_COST = 20;
CacheAwareCostFunction(Configuration conf) {
boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
// Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
this.setMultiplier(
!isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
bestCacheRatio = 0.0;
cacheRatio = 0.0;
}
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
cacheRatio = 0.0;
bestCacheRatio = 0.0;
for (int region = 0; region < cluster.numRegions; region++) {
cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
cluster.regionIndexToServerIndex[region]);
bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
getServerWithBestCacheRatioForRegion(region));
}
cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
if (LOG.isDebugEnabled()) {
LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
}
}
@Override
protected double cost() {
return scale(0, 1, 1 - cacheRatio);
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
double regionCacheRatioOnOldServer =
cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
double regionCacheRatioOnNewServer =
cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
cacheRatio += normalizedDelta;
if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
LOG.debug(
"CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
+ "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
}
}
private int getServerWithBestCacheRatioForRegion(int region) {
return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
}
@Override
public final void updateWeight(double[] weights) {
weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost();
}
}
}