| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master.balancer; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import org.agrona.collections.Hashing; |
| import org.agrona.collections.Int2IntCounterMap; |
| import org.apache.hadoop.hbase.HDFSBlocksDistribution; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionReplicaUtil; |
| import org.apache.hadoop.hbase.master.RackManager; |
| import org.apache.hadoop.hbase.net.Address; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An efficient array based implementation similar to ClusterState for keeping the status of the |
| * cluster in terms of region assignment and distribution. LoadBalancers, such as |
| * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap |
| * manipulations are very costly, which is why this class uses mostly indexes and arrays. |
| * <p/> |
| * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server |
| * topology in terms of server names, hostnames and racks. |
| */ |
| @InterfaceAudience.Private |
| class BalancerClusterState { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); |
| |
| ServerName[] servers; |
| // ServerName uniquely identifies a region server. multiple RS can run on the same host |
| String[] hosts; |
| String[] racks; |
| boolean multiServersPerHost = false; // whether or not any host has more than one server |
| |
| ArrayList<String> tables; |
| RegionInfo[] regions; |
| Deque<BalancerRegionLoad>[] regionLoads; |
| private RegionHDFSBlockLocationFinder regionFinder; |
| |
| int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality |
| |
| int[] serverIndexToHostIndex; // serverIndex -> host index |
| int[] serverIndexToRackIndex; // serverIndex -> rack index |
| |
| int[][] regionsPerServer; // serverIndex -> region list |
| int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list |
| int[][] regionsPerHost; // hostIndex -> list of regions |
| int[][] regionsPerRack; // rackIndex -> region list |
| Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated |
| // replicas by primary region index |
| Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by |
| // primary region index |
| Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by |
| // primary region index |
| |
| int[][] serversPerHost; // hostIndex -> list of server indexes |
| int[][] serversPerRack; // rackIndex -> list of server indexes |
| int[] regionIndexToServerIndex; // regionIndex -> serverIndex |
| int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) |
| int[] regionIndexToTableIndex; // regionIndex -> tableIndex |
| int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions |
| int[] numRegionsPerTable; // tableIndex -> region count |
| double[] meanRegionsPerTable; // mean region count per table |
| int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary |
| boolean hasRegionReplicas = false; // whether there is regions with replicas |
| |
| Integer[] serverIndicesSortedByRegionCount; |
| Integer[] serverIndicesSortedByLocality; |
| |
| Map<Address, Integer> serversToIndex; |
| Map<String, Integer> hostsToIndex; |
| Map<String, Integer> racksToIndex; |
| Map<String, Integer> tablesToIndex; |
| Map<RegionInfo, Integer> regionsToIndex; |
| float[] localityPerServer; |
| |
| int numServers; |
| int numHosts; |
| int numRacks; |
| int numTables; |
| int numRegions; |
| |
| int numMovedRegions = 0; // num moved regions from the initial configuration |
| Map<ServerName, List<RegionInfo>> clusterState; |
| |
| private final RackManager rackManager; |
| // Maps region -> rackIndex -> locality of region on rack |
| private float[][] rackLocalities; |
| // Maps localityType -> region -> [server|rack]Index with highest locality |
| private int[][] regionsToMostLocalEntities; |
| // Maps region -> serverIndex -> regionCacheRatio of a region on a server |
| private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; |
| // Maps regionIndex -> serverIndex with best region cache ratio |
| private int[] regionServerIndexWithBestRegionCachedRatio; |
| // Maps regionName -> oldServerName -> cache ratio of the region on the old server |
| Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; |
| |
| static class DefaultRackManager extends RackManager { |
| @Override |
| public String getRack(ServerName server) { |
| return UNKNOWN_RACK; |
| } |
| } |
| |
| BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, |
| Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, |
| RackManager rackManager) { |
| this(null, clusterState, loads, regionFinder, rackManager, null); |
| } |
| |
| protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, |
| Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, |
| RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { |
| this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); |
| } |
| |
| @SuppressWarnings("unchecked") |
| BalancerClusterState(Collection<RegionInfo> unassignedRegions, |
| Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, |
| RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, |
| Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { |
| if (unassignedRegions == null) { |
| unassignedRegions = Collections.emptyList(); |
| } |
| |
| serversToIndex = new HashMap<>(); |
| hostsToIndex = new HashMap<>(); |
| racksToIndex = new HashMap<>(); |
| tablesToIndex = new HashMap<>(); |
| |
| // TODO: We should get the list of tables from master |
| tables = new ArrayList<>(); |
| this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); |
| |
| this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; |
| |
| numRegions = 0; |
| |
| List<List<Integer>> serversPerHostList = new ArrayList<>(); |
| List<List<Integer>> serversPerRackList = new ArrayList<>(); |
| this.clusterState = clusterState; |
| this.regionFinder = regionFinder; |
| |
| // Use servername and port as there can be dead servers in this list. We want everything with |
| // a matching hostname and port to have the same index. |
| for (ServerName sn : clusterState.keySet()) { |
| if (sn == null) { |
| LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " |
| + "skipping; unassigned regions?"); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); |
| } |
| continue; |
| } |
| if (serversToIndex.get(sn.getAddress()) == null) { |
| serversToIndex.put(sn.getAddress(), numServers++); |
| } |
| if (!hostsToIndex.containsKey(sn.getHostname())) { |
| hostsToIndex.put(sn.getHostname(), numHosts++); |
| serversPerHostList.add(new ArrayList<>(1)); |
| } |
| |
| int serverIndex = serversToIndex.get(sn.getAddress()); |
| int hostIndex = hostsToIndex.get(sn.getHostname()); |
| serversPerHostList.get(hostIndex).add(serverIndex); |
| |
| String rack = this.rackManager.getRack(sn); |
| |
| if (!racksToIndex.containsKey(rack)) { |
| racksToIndex.put(rack, numRacks++); |
| serversPerRackList.add(new ArrayList<>()); |
| } |
| int rackIndex = racksToIndex.get(rack); |
| serversPerRackList.get(rackIndex).add(serverIndex); |
| } |
| |
| LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); |
| // Count how many regions there are. |
| for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { |
| numRegions += entry.getValue().size(); |
| } |
| numRegions += unassignedRegions.size(); |
| |
| regionsToIndex = new HashMap<>(numRegions); |
| servers = new ServerName[numServers]; |
| serversPerHost = new int[numHosts][]; |
| serversPerRack = new int[numRacks][]; |
| regions = new RegionInfo[numRegions]; |
| regionIndexToServerIndex = new int[numRegions]; |
| initialRegionIndexToServerIndex = new int[numRegions]; |
| regionIndexToTableIndex = new int[numRegions]; |
| regionIndexToPrimaryIndex = new int[numRegions]; |
| regionLoads = new Deque[numRegions]; |
| |
| regionLocations = new int[numRegions][]; |
| serverIndicesSortedByRegionCount = new Integer[numServers]; |
| serverIndicesSortedByLocality = new Integer[numServers]; |
| localityPerServer = new float[numServers]; |
| |
| serverIndexToHostIndex = new int[numServers]; |
| serverIndexToRackIndex = new int[numServers]; |
| regionsPerServer = new int[numServers][]; |
| serverIndexToRegionsOffset = new int[numServers]; |
| regionsPerHost = new int[numHosts][]; |
| regionsPerRack = new int[numRacks][]; |
| colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; |
| colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; |
| colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; |
| |
| int regionIndex = 0, regionPerServerIndex = 0; |
| |
| for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { |
| if (entry.getKey() == null) { |
| LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); |
| continue; |
| } |
| int serverIndex = serversToIndex.get(entry.getKey().getAddress()); |
| |
| // keep the servername if this is the first server name for this hostname |
| // or this servername has the newest startcode. |
| if ( |
| servers[serverIndex] == null |
| || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() |
| ) { |
| servers[serverIndex] = entry.getKey(); |
| } |
| |
| if (regionsPerServer[serverIndex] != null) { |
| // there is another server with the same hostAndPort in ClusterState. |
| // allocate the array for the total size |
| regionsPerServer[serverIndex] = |
| new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; |
| } else { |
| regionsPerServer[serverIndex] = new int[entry.getValue().size()]; |
| } |
| colocatedReplicaCountsPerServer[serverIndex] = |
| new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); |
| serverIndicesSortedByRegionCount[serverIndex] = serverIndex; |
| serverIndicesSortedByLocality[serverIndex] = serverIndex; |
| } |
| |
| hosts = new String[numHosts]; |
| for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { |
| hosts[entry.getValue()] = entry.getKey(); |
| } |
| racks = new String[numRacks]; |
| for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { |
| racks[entry.getValue()] = entry.getKey(); |
| } |
| |
| for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { |
| int serverIndex = serversToIndex.get(entry.getKey().getAddress()); |
| regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; |
| |
| int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); |
| serverIndexToHostIndex[serverIndex] = hostIndex; |
| |
| int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); |
| serverIndexToRackIndex[serverIndex] = rackIndex; |
| |
| for (RegionInfo region : entry.getValue()) { |
| registerRegion(region, regionIndex, serverIndex, loads, regionFinder); |
| regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; |
| regionIndex++; |
| } |
| serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; |
| } |
| |
| for (RegionInfo region : unassignedRegions) { |
| registerRegion(region, regionIndex, -1, loads, regionFinder); |
| regionIndex++; |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| for (int i = 0; i < numServers; i++) { |
| LOG.debug("server {} has {} regions", i, regionsPerServer[i].length); |
| } |
| } |
| for (int i = 0; i < serversPerHostList.size(); i++) { |
| serversPerHost[i] = new int[serversPerHostList.get(i).size()]; |
| for (int j = 0; j < serversPerHost[i].length; j++) { |
| serversPerHost[i][j] = serversPerHostList.get(i).get(j); |
| LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i); |
| } |
| if (serversPerHost[i].length > 1) { |
| multiServersPerHost = true; |
| } |
| } |
| |
| for (int i = 0; i < serversPerRackList.size(); i++) { |
| serversPerRack[i] = new int[serversPerRackList.get(i).size()]; |
| for (int j = 0; j < serversPerRack[i].length; j++) { |
| serversPerRack[i][j] = serversPerRackList.get(i).get(j); |
| LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i); |
| } |
| } |
| |
| numTables = tables.size(); |
| LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, |
| numRacks); |
| numRegionsPerServerPerTable = new int[numTables][numServers]; |
| numRegionsPerTable = new int[numTables]; |
| |
| for (int i = 0; i < numTables; i++) { |
| for (int j = 0; j < numServers; j++) { |
| numRegionsPerServerPerTable[i][j] = 0; |
| } |
| } |
| |
| for (int i = 0; i < regionIndexToServerIndex.length; i++) { |
| if (regionIndexToServerIndex[i] >= 0) { |
| numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; |
| numRegionsPerTable[regionIndexToTableIndex[i]]++; |
| } |
| } |
| |
| // Avoid repeated computation for planning |
| meanRegionsPerTable = new double[numTables]; |
| |
| for (int i = 0; i < numTables; i++) { |
| meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; |
| } |
| |
| for (int i = 0; i < regions.length; i++) { |
| RegionInfo info = regions[i]; |
| if (RegionReplicaUtil.isDefaultReplica(info)) { |
| regionIndexToPrimaryIndex[i] = i; |
| } else { |
| hasRegionReplicas = true; |
| RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); |
| regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); |
| } |
| } |
| |
| for (int i = 0; i < regionsPerServer.length; i++) { |
| colocatedReplicaCountsPerServer[i] = |
| new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); |
| for (int j = 0; j < regionsPerServer[i].length; j++) { |
| int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; |
| colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); |
| } |
| } |
| // compute regionsPerHost |
| if (multiServersPerHost) { |
| populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, |
| serversPerHost); |
| } |
| |
| // compute regionsPerRack |
| if (numRacks > 1) { |
| populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, |
| serversPerRack); |
| } |
| } |
| |
| private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, |
| Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { |
| for (int i = 0; i < serversPerLocation.length; i++) { |
| int numRegionsPerLocation = 0; |
| for (int j = 0; j < serversPerLocation[i].length; j++) { |
| numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; |
| } |
| regionsPerLocation[i] = new int[numRegionsPerLocation]; |
| colocatedReplicaCountsPerLocation[i] = |
| new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); |
| } |
| |
| for (int i = 0; i < serversPerLocation.length; i++) { |
| int numRegionPerLocationIndex = 0; |
| for (int j = 0; j < serversPerLocation[i].length; j++) { |
| for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { |
| int region = regionsPerServer[serversPerLocation[i][j]][k]; |
| regionsPerLocation[i][numRegionPerLocationIndex] = region; |
| int primaryIndex = regionIndexToPrimaryIndex[region]; |
| colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); |
| numRegionPerLocationIndex++; |
| } |
| } |
| } |
| |
| } |
| |
| /** Helper for Cluster constructor to handle a region */ |
| private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, |
| Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { |
| String tableName = region.getTable().getNameAsString(); |
| if (!tablesToIndex.containsKey(tableName)) { |
| tables.add(tableName); |
| tablesToIndex.put(tableName, tablesToIndex.size()); |
| } |
| int tableIndex = tablesToIndex.get(tableName); |
| |
| regionsToIndex.put(region, regionIndex); |
| regions[regionIndex] = region; |
| regionIndexToServerIndex[regionIndex] = serverIndex; |
| initialRegionIndexToServerIndex[regionIndex] = serverIndex; |
| regionIndexToTableIndex[regionIndex] = tableIndex; |
| |
| // region load |
| if (loads != null) { |
| Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); |
| // That could have failed if the RegionLoad is using the other regionName |
| if (rl == null) { |
| // Try getting the region load using encoded name. |
| rl = loads.get(region.getEncodedName()); |
| } |
| regionLoads[regionIndex] = rl; |
| } |
| |
| if (regionFinder != null) { |
| // region location |
| List<ServerName> loc = regionFinder.getTopBlockLocations(region); |
| regionLocations[regionIndex] = new int[loc.size()]; |
| for (int i = 0; i < loc.size(); i++) { |
| regionLocations[regionIndex][i] = loc.get(i) == null |
| ? -1 |
| : (serversToIndex.get(loc.get(i).getAddress()) == null |
| ? -1 |
| : serversToIndex.get(loc.get(i).getAddress())); |
| } |
| } |
| } |
| |
| /** |
| * Returns true iff a given server has less regions than the balanced amount |
| */ |
| public boolean serverHasTooFewRegions(int server) { |
| int minLoad = this.numRegions / numServers; |
| int numRegions = getNumRegions(server); |
| return numRegions < minLoad; |
| } |
| |
| /** |
| * Retrieves and lazily initializes a field storing the locality of every region/server |
| * combination |
| */ |
| public float[][] getOrComputeRackLocalities() { |
| if (rackLocalities == null || regionsToMostLocalEntities == null) { |
| computeCachedLocalities(); |
| } |
| return rackLocalities; |
| } |
| |
| /** |
| * Lazily initializes and retrieves a mapping of region -> server for which region has the highest |
| * the locality |
| */ |
| public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { |
| if (rackLocalities == null || regionsToMostLocalEntities == null) { |
| computeCachedLocalities(); |
| } |
| return regionsToMostLocalEntities[type.ordinal()]; |
| } |
| |
| /** |
| * Looks up locality from cache of localities. Will create cache if it does not already exist. |
| */ |
| public float getOrComputeLocality(int region, int entity, |
| BalancerClusterState.LocalityType type) { |
| switch (type) { |
| case SERVER: |
| return getLocalityOfRegion(region, entity); |
| case RACK: |
| return getOrComputeRackLocalities()[region][entity]; |
| default: |
| throw new IllegalArgumentException("Unsupported LocalityType: " + type); |
| } |
| } |
| |
| /** |
| * Returns locality weighted by region size in MB. Will create locality cache if it does not |
| * already exist. |
| */ |
| public double getOrComputeWeightedLocality(int region, int server, |
| BalancerClusterState.LocalityType type) { |
| return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); |
| } |
| |
| /** |
| * Returns the size in MB from the most recent RegionLoad for region |
| */ |
| public int getRegionSizeMB(int region) { |
| Deque<BalancerRegionLoad> load = regionLoads[region]; |
| // This means regions have no actual data on disk |
| if (load == null) { |
| return 0; |
| } |
| return regionLoads[region].getLast().getStorefileSizeMB(); |
| } |
| |
| /** |
| * Computes and caches the locality for each region/rack combinations, as well as storing a |
| * mapping of region -> server and region -> rack such that server and rack have the highest |
| * locality for region |
| */ |
| private void computeCachedLocalities() { |
| rackLocalities = new float[numRegions][numRacks]; |
| regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; |
| |
| // Compute localities and find most local server per region |
| for (int region = 0; region < numRegions; region++) { |
| int serverWithBestLocality = 0; |
| float bestLocalityForRegion = 0; |
| for (int server = 0; server < numServers; server++) { |
| // Aggregate per-rack locality |
| float locality = getLocalityOfRegion(region, server); |
| int rack = serverIndexToRackIndex[server]; |
| int numServersInRack = serversPerRack[rack].length; |
| rackLocalities[region][rack] += locality / numServersInRack; |
| |
| if (locality > bestLocalityForRegion) { |
| serverWithBestLocality = server; |
| bestLocalityForRegion = locality; |
| } |
| } |
| regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; |
| |
| // Find most local rack per region |
| int rackWithBestLocality = 0; |
| float bestRackLocalityForRegion = 0.0f; |
| for (int rack = 0; rack < numRacks; rack++) { |
| float rackLocality = rackLocalities[region][rack]; |
| if (rackLocality > bestRackLocalityForRegion) { |
| bestRackLocalityForRegion = rackLocality; |
| rackWithBestLocality = rack; |
| } |
| } |
| regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; |
| } |
| |
| } |
| |
| /** |
| * Returns the size of hFiles from the most recent RegionLoad for region |
| */ |
| public int getTotalRegionHFileSizeMB(int region) { |
| Deque<BalancerRegionLoad> load = regionLoads[region]; |
| if (load == null) { |
| // This means, that the region has no actual data on disk |
| return 0; |
| } |
| return regionLoads[region].getLast().getRegionSizeMB(); |
| } |
| |
| /** |
| * Returns the weighted cache ratio of a region on the given region server |
| */ |
| public float getOrComputeWeightedRegionCacheRatio(int region, int server) { |
| return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); |
| } |
| |
| /** |
| * Returns the amount by which a region is cached on a given region server. If the region is not |
| * currently hosted on the given region server, then find out if it was previously hosted there |
| * and return the old cache ratio. |
| */ |
| protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { |
| float regionCacheRatio = 0.0f; |
| |
| // Get the current region cache ratio if the region is hosted on the server regionServerIndex |
| for (int regionIndex : regionsPerServer[regionServerIndex]) { |
| if (region != regionIndex) { |
| continue; |
| } |
| |
| Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; |
| |
| // The region is currently hosted on this region server. Get the region cache ratio for this |
| // region on this server |
| regionCacheRatio = |
| regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); |
| |
| return regionCacheRatio; |
| } |
| |
| // Region is not currently hosted on this server. Check if the region was cached on this |
| // server earlier. This can happen when the server was shutdown and the cache was persisted. |
| // Search using the region name and server name and not the index id and server id as these ids |
| // may change when a server is marked as dead or a new server is added. |
| String regionEncodedName = regions[region].getEncodedName(); |
| ServerName serverName = servers[regionServerIndex]; |
| if ( |
| regionCacheRatioOnOldServerMap != null |
| && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) |
| ) { |
| Pair<ServerName, Float> cacheRatioOfRegionOnServer = |
| regionCacheRatioOnOldServerMap.get(regionEncodedName); |
| if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { |
| regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, |
| serverName, regionCacheRatio); |
| } |
| } |
| } |
| return regionCacheRatio; |
| } |
| |
| /** |
| * Populate the maps containing information about how much a region is cached on a region server. |
| */ |
| private void computeRegionServerRegionCacheRatio() { |
| regionIndexServerIndexRegionCachedRatio = new HashMap<>(); |
| regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; |
| |
| for (int region = 0; region < numRegions; region++) { |
| float bestRegionCacheRatio = 0.0f; |
| int serverWithBestRegionCacheRatio = 0; |
| for (int server = 0; server < numServers; server++) { |
| float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); |
| if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { |
| // A region with cache ratio 0 on a server means nothing. Hence, just make a note of |
| // cache ratio only if the cache ratio is greater than 0. |
| Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); |
| regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); |
| } |
| if (regionCacheRatio > bestRegionCacheRatio) { |
| serverWithBestRegionCacheRatio = server; |
| // If the server currently hosting the region has equal cache ratio to a historical |
| // server, consider the current server to keep hosting the region |
| bestRegionCacheRatio = regionCacheRatio; |
| } else if ( |
| regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] |
| ) { |
| // If two servers have same region cache ratio, then the server currently hosting the |
| // region |
| // should retain the region |
| serverWithBestRegionCacheRatio = server; |
| } |
| } |
| regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; |
| Pair<Integer, Integer> regionServerPair = |
| new Pair<>(region, regionIndexToServerIndex[region]); |
| float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); |
| if (tempRegionCacheRatio > bestRegionCacheRatio) { |
| LOG.warn( |
| "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " |
| + "best region cache ratio {} on server {}", |
| regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], |
| tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); |
| } |
| } |
| } |
| |
| protected float getOrComputeRegionCacheRatio(int region, int server) { |
| if ( |
| regionServerIndexWithBestRegionCachedRatio == null |
| || regionIndexServerIndexRegionCachedRatio.isEmpty() |
| ) { |
| computeRegionServerRegionCacheRatio(); |
| } |
| |
| Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); |
| return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) |
| ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) |
| : 0.0f; |
| } |
| |
| public int[] getOrComputeServerWithBestRegionCachedRatio() { |
| if ( |
| regionServerIndexWithBestRegionCachedRatio == null |
| || regionIndexServerIndexRegionCachedRatio.isEmpty() |
| ) { |
| computeRegionServerRegionCacheRatio(); |
| } |
| return regionServerIndexWithBestRegionCachedRatio; |
| } |
| |
| /** |
| * Maps region index to rack index |
| */ |
| public int getRackForRegion(int region) { |
| return serverIndexToRackIndex[regionIndexToServerIndex[region]]; |
| } |
| |
| enum LocalityType { |
| SERVER, |
| RACK |
| } |
| |
| public void doAction(BalanceAction action) { |
| switch (action.getType()) { |
| case NULL: |
| break; |
| case ASSIGN_REGION: |
| // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings |
| assert action instanceof AssignRegionAction : action.getClass(); |
| AssignRegionAction ar = (AssignRegionAction) action; |
| regionsPerServer[ar.getServer()] = |
| addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); |
| regionMoved(ar.getRegion(), -1, ar.getServer()); |
| break; |
| case MOVE_REGION: |
| assert action instanceof MoveRegionAction : action.getClass(); |
| MoveRegionAction mra = (MoveRegionAction) action; |
| regionsPerServer[mra.getFromServer()] = |
| removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); |
| regionsPerServer[mra.getToServer()] = |
| addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); |
| regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); |
| break; |
| case SWAP_REGIONS: |
| assert action instanceof SwapRegionsAction : action.getClass(); |
| SwapRegionsAction a = (SwapRegionsAction) action; |
| regionsPerServer[a.getFromServer()] = |
| replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); |
| regionsPerServer[a.getToServer()] = |
| replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); |
| regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); |
| regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); |
| break; |
| default: |
| throw new RuntimeException("Uknown action:" + action.getType()); |
| } |
| } |
| |
| /** |
| * Return true if the placement of region on server would lower the availability of the region in |
| * question |
| * @return true or false |
| */ |
| boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { |
| if (!serversToIndex.containsKey(serverName.getAddress())) { |
| return false; // safeguard against race between cluster.servers and servers from LB method |
| // args |
| } |
| int server = serversToIndex.get(serverName.getAddress()); |
| int region = regionsToIndex.get(regionInfo); |
| |
| // Region replicas for same region should better assign to different servers |
| for (int i : regionsPerServer[server]) { |
| RegionInfo otherRegionInfo = regions[i]; |
| if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { |
| return true; |
| } |
| } |
| |
| int primary = regionIndexToPrimaryIndex[region]; |
| if (primary == -1) { |
| return false; |
| } |
| // there is a subset relation for server < host < rack |
| // check server first |
| int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); |
| if (result != 0) { |
| return result > 0; |
| } |
| |
| // check host |
| if (multiServersPerHost) { |
| result = checkLocationForPrimary(serverIndexToHostIndex[server], |
| colocatedReplicaCountsPerHost, primary); |
| if (result != 0) { |
| return result > 0; |
| } |
| } |
| |
| // check rack |
| if (numRacks > 1) { |
| result = checkLocationForPrimary(serverIndexToRackIndex[server], |
| colocatedReplicaCountsPerRack, primary); |
| if (result != 0) { |
| return result > 0; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Common method for better solution check. |
| * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or |
| * colocatedReplicaCountsPerRack |
| * @return 1 for better, -1 for no better, 0 for unknown |
| */ |
| private int checkLocationForPrimary(int location, |
| Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { |
| if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { |
| // check for whether there are other Locations that we can place this region |
| for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { |
| if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { |
| return 1; // meaning there is a better Location |
| } |
| } |
| return -1; // there is not a better Location to place this |
| } |
| return 0; |
| } |
| |
| void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { |
| if (!serversToIndex.containsKey(serverName.getAddress())) { |
| return; |
| } |
| int server = serversToIndex.get(serverName.getAddress()); |
| int region = regionsToIndex.get(regionInfo); |
| doAction(new AssignRegionAction(region, server)); |
| } |
| |
| void regionMoved(int region, int oldServer, int newServer) { |
| regionIndexToServerIndex[region] = newServer; |
| if (initialRegionIndexToServerIndex[region] == newServer) { |
| numMovedRegions--; // region moved back to original location |
| } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { |
| numMovedRegions++; // region moved from original location |
| } |
| int tableIndex = regionIndexToTableIndex[region]; |
| if (oldServer >= 0) { |
| numRegionsPerServerPerTable[tableIndex][oldServer]--; |
| } |
| numRegionsPerServerPerTable[tableIndex][newServer]++; |
| |
| // update for servers |
| int primary = regionIndexToPrimaryIndex[region]; |
| if (oldServer >= 0) { |
| colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); |
| } |
| colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); |
| |
| // update for hosts |
| if (multiServersPerHost) { |
| updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, |
| oldServer, newServer, primary, region); |
| } |
| |
| // update for racks |
| if (numRacks > 1) { |
| updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, |
| oldServer, newServer, primary, region); |
| } |
| } |
| |
| /** |
| * Common method for per host and per Location region index updates when a region is moved. |
| * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex |
| * @param regionsPerLocation regionsPerHost or regionsPerLocation |
| * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or |
| * colocatedReplicaCountsPerRack |
| */ |
| private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, |
| Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, |
| int primary, int region) { |
| int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; |
| int newLocation = serverIndexToLocation[newServer]; |
| if (newLocation != oldLocation) { |
| regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); |
| colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); |
| if (oldLocation >= 0) { |
| regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); |
| colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); |
| } |
| } |
| |
| } |
| |
| int[] removeRegion(int[] regions, int regionIndex) { |
| // TODO: this maybe costly. Consider using linked lists |
| int[] newRegions = new int[regions.length - 1]; |
| int i = 0; |
| for (i = 0; i < regions.length; i++) { |
| if (regions[i] == regionIndex) { |
| break; |
| } |
| newRegions[i] = regions[i]; |
| } |
| System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); |
| return newRegions; |
| } |
| |
| int[] addRegion(int[] regions, int regionIndex) { |
| int[] newRegions = new int[regions.length + 1]; |
| System.arraycopy(regions, 0, newRegions, 0, regions.length); |
| newRegions[newRegions.length - 1] = regionIndex; |
| return newRegions; |
| } |
| |
| int[] addRegionSorted(int[] regions, int regionIndex) { |
| int[] newRegions = new int[regions.length + 1]; |
| int i = 0; |
| for (i = 0; i < regions.length; i++) { // find the index to insert |
| if (regions[i] > regionIndex) { |
| break; |
| } |
| } |
| System.arraycopy(regions, 0, newRegions, 0, i); // copy first half |
| System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half |
| newRegions[i] = regionIndex; |
| |
| return newRegions; |
| } |
| |
| int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { |
| int i = 0; |
| for (i = 0; i < regions.length; i++) { |
| if (regions[i] == regionIndex) { |
| regions[i] = newRegionIndex; |
| break; |
| } |
| } |
| return regions; |
| } |
| |
| void sortServersByRegionCount() { |
| Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); |
| } |
| |
| int getNumRegions(int server) { |
| return regionsPerServer[server].length; |
| } |
| |
| boolean contains(int[] arr, int val) { |
| return Arrays.binarySearch(arr, val) >= 0; |
| } |
| |
| private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); |
| |
| public Comparator<Integer> getNumRegionsComparator() { |
| return numRegionsComparator; |
| } |
| |
| int getLowestLocalityRegionOnServer(int serverIndex) { |
| if (regionFinder != null) { |
| float lowestLocality = 1.0f; |
| int lowestLocalityRegionIndex = -1; |
| if (regionsPerServer[serverIndex].length == 0) { |
| // No regions on that region server |
| return -1; |
| } |
| for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { |
| int regionIndex = regionsPerServer[serverIndex][j]; |
| HDFSBlocksDistribution distribution = |
| regionFinder.getBlockDistribution(regions[regionIndex]); |
| float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); |
| // skip empty region |
| if (distribution.getUniqueBlocksTotalWeight() == 0) { |
| continue; |
| } |
| if (locality < lowestLocality) { |
| lowestLocality = locality; |
| lowestLocalityRegionIndex = j; |
| } |
| } |
| if (lowestLocalityRegionIndex == -1) { |
| return -1; |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Lowest locality region is " |
| + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] |
| .getRegionNameAsString() |
| + " with locality " + lowestLocality + " and its region server contains " |
| + regionsPerServer[serverIndex].length + " regions"); |
| } |
| return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; |
| } else { |
| return -1; |
| } |
| } |
| |
| float getLocalityOfRegion(int region, int server) { |
| if (regionFinder != null) { |
| HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); |
| return distribution.getBlockLocalityIndex(servers[server].getHostname()); |
| } else { |
| return 0f; |
| } |
| } |
| |
| void setNumRegions(int numRegions) { |
| this.numRegions = numRegions; |
| } |
| |
| void setNumMovedRegions(int numMovedRegions) { |
| this.numMovedRegions = numMovedRegions; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder desc = new StringBuilder("Cluster={servers=["); |
| for (ServerName sn : servers) { |
| desc.append(sn.getAddress().toString()).append(", "); |
| } |
| desc.append("], serverIndicesSortedByRegionCount=") |
| .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") |
| .append(Arrays.deepToString(regionsPerServer)); |
| |
| desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) |
| .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) |
| .append('}'); |
| return desc.toString(); |
| } |
| } |