blob: 02f5fae2a69001996e7d7fa411746b20ca7e53e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.Closeable;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.MunkresAssignment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
/**
* A tool that is used for manipulating and viewing favored nodes information
* for regions. Run with -h to get a list of the options
*/
@InterfaceAudience.Private
// TODO: Remove? Unused. Partially implemented only.
public class RegionPlacementMaintainer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
.getName());
//The cost of a placement that should never be assigned.
private static final float MAX_COST = Float.POSITIVE_INFINITY;
// The cost of a placement that is undesirable but acceptable.
private static final float AVOID_COST = 100000f;
// The amount by which the cost of a placement is increased if it is the
// last slot of the server. This is done to more evenly distribute the slop
// amongst servers.
private static final float LAST_SLOT_COST_PENALTY = 0.5f;
// The amount by which the cost of a primary placement is penalized if it is
// not the host currently serving the region. This is done to minimize moves.
private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
private Configuration conf;
private final boolean enforceLocality;
private final boolean enforceMinAssignmentMove;
private RackManager rackManager;
private Set<TableName> targetTableSet;
private AsyncClusterConnection connection;
public RegionPlacementMaintainer(Configuration conf) throws IOException {
this(conf, true, true);
}
public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
boolean enforceMinAssignmentMove) {
this.conf = conf;
this.enforceLocality = enforceLocality;
this.enforceMinAssignmentMove = enforceMinAssignmentMove;
this.targetTableSet = new HashSet<>();
this.rackManager = new RackManager(conf);
}
private static void printHelp(Options opt) {
new HelpFormatter().printHelp(
"RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
"-diff>" +
" [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
" [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
}
private AsyncClusterConnection getConnection() throws IOException {
if (connection == null) {
connection =
ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
}
return connection;
}
public void setTargetTableName(String[] tableNames) {
if (tableNames != null) {
for (String table : tableNames)
this.targetTableSet.add(TableName.valueOf(table));
}
}
/**
* @return the new RegionAssignmentSnapshot
*/
public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
currentAssignmentShapshot.initialize();
return currentAssignmentShapshot;
}
/**
* Verify the region placement is consistent with the assignment plan
*/
public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
throws IOException {
System.out.println("Start to verify the region assignment and " +
"generate the verification report");
// Get the region assignment snapshot
SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
// Get all the tables
Set<TableName> tables = snapshot.getTableSet();
// Get the region locality map
Map<String, Map<String, Float>> regionLocalityMap = null;
if (this.enforceLocality == true) {
regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
}
List<AssignmentVerificationReport> reports = new ArrayList<>();
// Iterate all the tables to fill up the verification report
for (TableName table : tables) {
if (!this.targetTableSet.isEmpty() &&
!this.targetTableSet.contains(table)) {
continue;
}
AssignmentVerificationReport report = new AssignmentVerificationReport();
report.fillUp(table, snapshot, regionLocalityMap);
report.print(isDetailMode);
reports.add(report);
}
return reports;
}
/**
* Generate the assignment plan for the existing table
*
* @param tableName
* @param assignmentSnapshot
* @param regionLocalityMap
* @param plan
* @param munkresForSecondaryAndTertiary if set on true the assignment plan
* for the tertiary and secondary will be generated with Munkres algorithm,
* otherwise will be generated using placeSecondaryAndTertiaryRS
* @throws IOException
*/
private void genAssignmentPlan(TableName tableName,
SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
boolean munkresForSecondaryAndTertiary) throws IOException {
// Get the all the regions for the current table
List<RegionInfo> regions =
assignmentSnapshot.getTableToRegionMap().get(tableName);
int numRegions = regions.size();
// Get the current assignment map
Map<RegionInfo, ServerName> currentAssignmentMap =
assignmentSnapshot.getRegionToRegionServerMap();
// Get the all the region servers
List<ServerName> servers = new ArrayList<>();
servers.addAll(FutureUtils.get(getConnection().getAdmin().getRegionServers()));
LOG.info("Start to generate assignment plan for " + numRegions +
" regions from table " + tableName + " with " +
servers.size() + " region servers");
int slotsPerServer = (int) Math.ceil((float) numRegions /
servers.size());
int regionSlots = slotsPerServer * servers.size();
// Compute the primary, secondary and tertiary costs for each region/server
// pair. These costs are based only on node locality and rack locality, and
// will be modified later.
float[][] primaryCost = new float[numRegions][regionSlots];
float[][] secondaryCost = new float[numRegions][regionSlots];
float[][] tertiaryCost = new float[numRegions][regionSlots];
if (this.enforceLocality && regionLocalityMap != null) {
// Transform the locality mapping into a 2D array, assuming that any
// unspecified locality value is 0.
float[][] localityPerServer = new float[numRegions][regionSlots];
for (int i = 0; i < numRegions; i++) {
Map<String, Float> serverLocalityMap =
regionLocalityMap.get(regions.get(i).getEncodedName());
if (serverLocalityMap == null) {
continue;
}
for (int j = 0; j < servers.size(); j++) {
String serverName = servers.get(j).getHostname();
if (serverName == null) {
continue;
}
Float locality = serverLocalityMap.get(serverName);
if (locality == null) {
continue;
}
for (int k = 0; k < slotsPerServer; k++) {
// If we can't find the locality of a region to a server, which occurs
// because locality is only reported for servers which have some
// blocks of a region local, then the locality for that pair is 0.
localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
}
}
}
// Compute the total rack locality for each region in each rack. The total
// rack locality is the sum of the localities of a region on all servers in
// a rack.
Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
for (int i = 0; i < numRegions; i++) {
RegionInfo region = regions.get(i);
for (int j = 0; j < regionSlots; j += slotsPerServer) {
String rack = rackManager.getRack(servers.get(j / slotsPerServer));
Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
if (rackLocality == null) {
rackLocality = new HashMap<>();
rackRegionLocality.put(rack, rackLocality);
}
Float localityObj = rackLocality.get(region);
float locality = localityObj == null ? 0 : localityObj.floatValue();
locality += localityPerServer[i][j];
rackLocality.put(region, locality);
}
}
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < regionSlots; j++) {
String rack = rackManager.getRack(servers.get(j / slotsPerServer));
Float totalRackLocalityObj =
rackRegionLocality.get(rack).get(regions.get(i));
float totalRackLocality = totalRackLocalityObj == null ?
0 : totalRackLocalityObj.floatValue();
// Primary cost aims to favor servers with high node locality and low
// rack locality, so that secondaries and tertiaries can be chosen for
// nodes with high rack locality. This might give primaries with
// slightly less locality at first compared to a cost which only
// considers the node locality, but should be better in the long run.
primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] -
totalRackLocality);
// Secondary cost aims to favor servers with high node locality and high
// rack locality since the tertiary will be chosen from the same rack as
// the secondary. This could be negative, but that is okay.
secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
// Tertiary cost is only concerned with the node locality. It will later
// be restricted to only hosts on the same rack as the secondary.
tertiaryCost[i][j] = 1 - localityPerServer[i][j];
}
}
}
if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
// We want to minimize the number of regions which move as the result of a
// new assignment. Therefore, slightly penalize any placement which is for
// a host that is not currently serving the region.
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < servers.size(); j++) {
ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
if (currentAddress != null &&
!currentAddress.equals(servers.get(j))) {
for (int k = 0; k < slotsPerServer; k++) {
primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
}
}
}
}
}
// Artificially increase cost of last slot of each server to evenly
// distribute the slop, otherwise there will be a few servers with too few
// regions and many servers with the max number of regions.
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < regionSlots; j += slotsPerServer) {
primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
}
}
RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions,
regionSlots);
primaryCost = randomizedMatrix.transform(primaryCost);
int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
// Modify the secondary and tertiary costs for each region/server pair to
// prevent a region from being assigned to the same rack for both primary
// and either one of secondary or tertiary.
for (int i = 0; i < numRegions; i++) {
int slot = primaryAssignment[i];
String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
for (int k = 0; k < servers.size(); k++) {
if (!rackManager.getRack(servers.get(k)).equals(rack)) {
continue;
}
if (k == slot / slotsPerServer) {
// Same node, do not place secondary or tertiary here ever.
for (int m = 0; m < slotsPerServer; m++) {
secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
}
} else {
// Same rack, do not place secondary or tertiary here if possible.
for (int m = 0; m < slotsPerServer; m++) {
secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
}
}
}
}
if (munkresForSecondaryAndTertiary) {
randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
secondaryCost = randomizedMatrix.transform(secondaryCost);
int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
// Modify the tertiary costs for each region/server pair to ensure that a
// region is assigned to a tertiary server on the same rack as its secondary
// server, but not the same server in that rack.
for (int i = 0; i < numRegions; i++) {
int slot = secondaryAssignment[i];
String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
for (int k = 0; k < servers.size(); k++) {
if (k == slot / slotsPerServer) {
// Same node, do not place tertiary here ever.
for (int m = 0; m < slotsPerServer; m++) {
tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
}
} else {
if (rackManager.getRack(servers.get(k)).equals(rack)) {
continue;
}
// Different rack, do not place tertiary here if possible.
for (int m = 0; m < slotsPerServer; m++) {
tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
}
}
}
}
randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
tertiaryCost = randomizedMatrix.transform(tertiaryCost);
int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
for (int i = 0; i < numRegions; i++) {
List<ServerName> favoredServers
= new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
ServerName.NON_STARTCODE));
s = servers.get(secondaryAssignment[i] / slotsPerServer);
favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
ServerName.NON_STARTCODE));
s = servers.get(tertiaryAssignment[i] / slotsPerServer);
favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
ServerName.NON_STARTCODE));
// Update the assignment plan
plan.updateFavoredNodesMap(regions.get(i), favoredServers);
}
LOG.info("Generated the assignment plan for " + numRegions +
" regions from table " + tableName + " with " +
servers.size() + " region servers");
LOG.info("Assignment plan for secondary and tertiary generated " +
"using MunkresAssignment");
} else {
Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
for (int i = 0; i < numRegions; i++) {
primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
}
FavoredNodeAssignmentHelper favoredNodeHelper =
new FavoredNodeAssignmentHelper(servers, conf);
favoredNodeHelper.initialize();
Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
for (int i = 0; i < numRegions; i++) {
List<ServerName> favoredServers
= new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
RegionInfo currentRegion = regions.get(i);
ServerName s = primaryRSMap.get(currentRegion);
favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
ServerName.NON_STARTCODE));
ServerName[] secondaryAndTertiary =
secondaryAndTertiaryMap.get(currentRegion);
s = secondaryAndTertiary[0];
favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
ServerName.NON_STARTCODE));
s = secondaryAndTertiary[1];
favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
ServerName.NON_STARTCODE));
// Update the assignment plan
plan.updateFavoredNodesMap(regions.get(i), favoredServers);
}
LOG.info("Generated the assignment plan for " + numRegions +
" regions from table " + tableName + " with " +
servers.size() + " region servers");
LOG.info("Assignment plan for secondary and tertiary generated " +
"using placeSecondaryAndTertiaryWithRestrictions method");
}
}
public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
// Get the current region assignment snapshot by scanning from the META
SnapshotOfRegionAssignmentFromMeta assignmentSnapshot =
this.getRegionAssignmentSnapshot();
// Get the region locality map
Map<String, Map<String, Float>> regionLocalityMap = null;
if (this.enforceLocality) {
regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
}
// Initialize the assignment plan
FavoredNodesPlan plan = new FavoredNodesPlan();
// Get the table to region mapping
Map<TableName, List<RegionInfo>> tableToRegionMap =
assignmentSnapshot.getTableToRegionMap();
LOG.info("Start to generate the new assignment plan for the " +
+ tableToRegionMap.keySet().size() + " tables" );
for (TableName table : tableToRegionMap.keySet()) {
try {
if (!this.targetTableSet.isEmpty() &&
!this.targetTableSet.contains(table)) {
continue;
}
// TODO: maybe run the placement in parallel for each table
genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
} catch (Exception e) {
LOG.error("Get some exceptions for placing primary region server" +
"for table " + table + " because " + e);
}
}
LOG.info("Finish to generate the new assignment plan for the " +
+ tableToRegionMap.keySet().size() + " tables" );
return plan;
}
@Override
public void close() throws IOException {
Closeables.close(connection, true);
}
/**
* Some algorithms for solving the assignment problem may traverse workers or
* jobs in linear order which may result in skewing the assignments of the
* first jobs in the matrix toward the last workers in the matrix if the
* costs are uniform. To avoid this kind of clumping, we can randomize the
* rows and columns of the cost matrix in a reversible way, such that the
* solution to the assignment problem can be interpreted in terms of the
* original untransformed cost matrix. Rows and columns are transformed
* independently such that the elements contained in any row of the input
* matrix are the same as the elements in the corresponding output matrix,
* and each row has its elements transformed in the same way. Similarly for
* columns.
*/
protected static class RandomizedMatrix {
private final int rows;
private final int cols;
private final int[] rowTransform;
private final int[] rowInverse;
private final int[] colTransform;
private final int[] colInverse;
/**
* Create a randomization scheme for a matrix of a given size.
* @param rows the number of rows in the matrix
* @param cols the number of columns in the matrix
*/
public RandomizedMatrix(int rows, int cols) {
this.rows = rows;
this.cols = cols;
Random random = new Random();
rowTransform = new int[rows];
rowInverse = new int[rows];
for (int i = 0; i < rows; i++) {
rowTransform[i] = i;
}
// Shuffle the row indices.
for (int i = rows - 1; i >= 0; i--) {
int r = random.nextInt(i + 1);
int temp = rowTransform[r];
rowTransform[r] = rowTransform[i];
rowTransform[i] = temp;
}
// Generate the inverse row indices.
for (int i = 0; i < rows; i++) {
rowInverse[rowTransform[i]] = i;
}
colTransform = new int[cols];
colInverse = new int[cols];
for (int i = 0; i < cols; i++) {
colTransform[i] = i;
}
// Shuffle the column indices.
for (int i = cols - 1; i >= 0; i--) {
int r = random.nextInt(i + 1);
int temp = colTransform[r];
colTransform[r] = colTransform[i];
colTransform[i] = temp;
}
// Generate the inverse column indices.
for (int i = 0; i < cols; i++) {
colInverse[colTransform[i]] = i;
}
}
/**
* Copy a given matrix into a new matrix, transforming each row index and
* each column index according to the randomization scheme that was created
* at construction time.
* @param matrix the cost matrix to transform
* @return a new matrix with row and column indices transformed
*/
public float[][] transform(float[][] matrix) {
float[][] result = new float[rows][cols];
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
result[rowTransform[i]][colTransform[j]] = matrix[i][j];
}
}
return result;
}
/**
* Copy a given matrix into a new matrix, transforming each row index and
* each column index according to the inverse of the randomization scheme
* that was created at construction time.
* @param matrix the cost matrix to be inverted
* @return a new matrix with row and column indices inverted
*/
public float[][] invert(float[][] matrix) {
float[][] result = new float[rows][cols];
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
result[rowInverse[i]][colInverse[j]] = matrix[i][j];
}
}
return result;
}
/**
* Given an array where each element {@code indices[i]} represents the
* randomized column index corresponding to randomized row index {@code i},
* create a new array with the corresponding inverted indices.
* @param indices an array of transformed indices to be inverted
* @return an array of inverted indices
*/
public int[] invertIndices(int[] indices) {
int[] result = new int[indices.length];
for (int i = 0; i < indices.length; i++) {
result[rowInverse[i]] = colInverse[indices[i]];
}
return result;
}
}
/**
* Print the assignment plan to the system output stream
* @param plan
*/
public static void printAssignmentPlan(FavoredNodesPlan plan) {
if (plan == null) return;
LOG.info("========== Start to print the assignment plan ================");
// sort the map based on region info
Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
String regionName = entry.getKey();
LOG.info("Region: " + regionName );
LOG.info("Its favored nodes: " + serverList);
}
LOG.info("========== Finish to print the assignment plan ================");
}
/**
* Update the assignment plan into hbase:meta
* @param plan the assignments plan to be updated into hbase:meta
* @throws IOException if cannot update assignment plan in hbase:meta
*/
public void updateAssignmentPlanToMeta(FavoredNodesPlan plan)
throws IOException {
try {
LOG.info("Start to update the hbase:meta with the new assignment plan");
Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
Map<String, RegionInfo> regionToRegionInfoMap =
getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
}
FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
LOG.info("Updated the hbase:meta with the new assignment plan");
} catch (Exception e) {
LOG.error("Failed to update hbase:meta with the new assignment" +
"plan because " + e.getMessage());
}
}
/**
* Update the assignment plan to all the region servers
* @param plan
* @throws IOException
*/
private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan)
throws IOException{
LOG.info("Start to update the region servers with the new assignment plan");
// Get the region to region server map
Map<ServerName, List<RegionInfo>> currentAssignment =
this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
// track of the failed and succeeded updates
int succeededNum = 0;
Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
for (Map.Entry<ServerName, List<RegionInfo>> entry :
currentAssignment.entrySet()) {
List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
try {
// Keep track of the favored updates for the current region server
FavoredNodesPlan singleServerPlan = null;
// Find out all the updates for the current region server
for (RegionInfo region : entry.getValue()) {
List<ServerName> favoredServerList = plan.getFavoredNodes(region);
if (favoredServerList != null &&
favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
// Create the single server plan if necessary
if (singleServerPlan == null) {
singleServerPlan = new FavoredNodesPlan();
}
// Update the single server update
singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
regionUpdateInfos.add(new Pair<>(region, favoredServerList));
}
}
if (singleServerPlan != null) {
// Update the current region server with its updated favored nodes
AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
UpdateFavoredNodesRequest request =
RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
UpdateFavoredNodesResponse updateFavoredNodesResponse =
FutureUtils.get(rsAdmin.updateFavoredNodes(request));
LOG.info("Region server " +
FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
.getServerInfo() +
" has updated " + updateFavoredNodesResponse.getResponse() + " / " +
singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan");
succeededNum++;
}
} catch (Exception e) {
failedUpdateMap.put(entry.getKey(), e);
}
}
// log the succeeded updates
LOG.info("Updated " + succeededNum + " region servers with " +
"the new assignment plan");
// log the failed updates
int failedNum = failedUpdateMap.size();
if (failedNum != 0) {
LOG.error("Failed to update the following + " + failedNum +
" region servers with its corresponding favored nodes");
for (Map.Entry<ServerName, Exception> entry :
failedUpdateMap.entrySet() ) {
LOG.error("Failed to update " + entry.getKey().getAddress() +
" because of " + entry.getValue().getMessage());
}
}
}
public void updateAssignmentPlan(FavoredNodesPlan plan)
throws IOException {
LOG.info("Start to update the new assignment plan for the hbase:meta table and" +
" the region servers");
// Update the new assignment plan to META
updateAssignmentPlanToMeta(plan);
// Update the new assignment plan to Region Servers
updateAssignmentPlanToRegionServers(plan);
LOG.info("Finish to update the new assignment plan for the hbase:meta table and" +
" the region servers");
}
/**
* Return how many regions will move per table since their primary RS will
* change
*
* @param newPlan - new AssignmentPlan
* @return how many primaries will move per table
*/
public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan)
throws IOException {
Map<TableName, Integer> movesPerTable = new HashMap<>();
SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
Map<TableName, List<RegionInfo>> tableToRegions = snapshot
.getTableToRegionMap();
FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
Set<TableName> tables = snapshot.getTableSet();
for (TableName table : tables) {
int movedPrimaries = 0;
if (!this.targetTableSet.isEmpty()
&& !this.targetTableSet.contains(table)) {
continue;
}
List<RegionInfo> regions = tableToRegions.get(table);
for (RegionInfo region : regions) {
List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
List<ServerName> newServers = newPlan.getFavoredNodes(region);
if (oldServers != null && newServers != null) {
ServerName oldPrimary = oldServers.get(0);
ServerName newPrimary = newServers.get(0);
if (oldPrimary.compareTo(newPrimary) != 0) {
movedPrimaries++;
}
}
}
movesPerTable.put(table, movedPrimaries);
}
return movesPerTable;
}
/**
* Compares two plans and check whether the locality dropped or increased
* (prints the information as a string) also prints the baseline locality
*
* @param movesPerTable - how many primary regions will move per table
* @param regionLocalityMap - locality map from FS
* @param newPlan - new assignment plan
* @throws IOException
*/
public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
throws IOException {
// localities for primary, secondary and tertiary
SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
Set<TableName> tables = snapshot.getTableSet();
Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
for (TableName table : tables) {
float[] deltaLocality = new float[3];
float[] locality = new float[3];
if (!this.targetTableSet.isEmpty()
&& !this.targetTableSet.contains(table)) {
continue;
}
List<RegionInfo> regions = tableToRegionsMap.get(table);
System.out.println("==================================================");
System.out.println("Assignment Plan Projection Report For Table: " + table);
System.out.println("\t Total regions: " + regions.size());
System.out.println("\t" + movesPerTable.get(table)
+ " primaries will move due to their primary has changed");
for (RegionInfo currentRegion : regions) {
Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
.getEncodedName());
if (regionLocality == null) {
continue;
}
List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
if (newServers != null && oldServers != null) {
int i=0;
for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
ServerName newServer = newServers.get(p.ordinal());
ServerName oldServer = oldServers.get(p.ordinal());
Float oldLocality = 0f;
if (oldServers != null) {
oldLocality = regionLocality.get(oldServer.getHostname());
if (oldLocality == null) {
oldLocality = 0f;
}
locality[i] += oldLocality;
}
Float newLocality = regionLocality.get(newServer.getHostname());
if (newLocality == null) {
newLocality = 0f;
}
deltaLocality[i] += newLocality - oldLocality;
i++;
}
}
}
DecimalFormat df = new java.text.DecimalFormat( "#.##");
for (int i = 0; i < deltaLocality.length; i++) {
System.out.print("\t\t Baseline locality for ");
if (i == 0) {
System.out.print("primary ");
} else if (i == 1) {
System.out.print("secondary ");
} else if (i == 2) {
System.out.print("tertiary ");
}
System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
System.out.print("\t\t Locality will change with the new plan: ");
System.out.println(df.format(100 * deltaLocality[i] / regions.size())
+ "%");
}
System.out.println("\t Baseline dispersion");
printDispersionScores(table, snapshot, regions.size(), null, true);
System.out.println("\t Projected dispersion");
printDispersionScores(table, snapshot, regions.size(), newPlan, true);
}
}
public void printDispersionScores(TableName table,
SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan,
boolean simplePrint) {
if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
return;
}
AssignmentVerificationReport report = new AssignmentVerificationReport();
report.fillUpDispersion(table, snapshot, newPlan);
List<Float> dispersion = report.getDispersionInformation();
if (simplePrint) {
DecimalFormat df = new java.text.DecimalFormat("#.##");
System.out.println("\tAvg dispersion score: "
+ df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: "
+ df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: "
+ df.format(dispersion.get(2)) + " hosts;");
} else {
LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
+ " ; The average dispersion score is " + dispersion.get(0));
}
}
public void printLocalityAndDispersionForCurrentPlan(
Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
Set<TableName> tables = snapshot.getTableSet();
Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot
.getTableToRegionMap();
for (TableName table : tables) {
float[] locality = new float[3];
if (!this.targetTableSet.isEmpty()
&& !this.targetTableSet.contains(table)) {
continue;
}
List<RegionInfo> regions = tableToRegionsMap.get(table);
for (RegionInfo currentRegion : regions) {
Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
.getEncodedName());
if (regionLocality == null) {
continue;
}
List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
if (servers != null) {
int i = 0;
for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
ServerName server = servers.get(p.ordinal());
Float currentLocality = 0f;
if (servers != null) {
currentLocality = regionLocality.get(server.getHostname());
if (currentLocality == null) {
currentLocality = 0f;
}
locality[i] += currentLocality;
}
i++;
}
}
}
for (int i = 0; i < locality.length; i++) {
String copy = null;
if (i == 0) {
copy = "primary";
} else if (i == 1) {
copy = "secondary";
} else if (i == 2) {
copy = "tertiary" ;
}
float avgLocality = 100 * locality[i] / regions.size();
LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
+ " ; The average locality for " + copy+ " is " + avgLocality + " %");
}
printDispersionScores(table, snapshot, regions.size(), null, false);
}
}
/**
* @param favoredNodesStr The String of favored nodes
* @return the list of ServerName for the byte array of favored nodes.
*/
public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
if (favoredNodesArray == null)
return null;
List<ServerName> serverList = new ArrayList<>();
for (String hostNameAndPort : favoredNodesArray) {
serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
}
return serverList;
}
public static void main(String args[]) throws IOException {
Options opt = new Options();
opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
opt.addOption("u", "update", false,
"update the assignments to hbase:meta and RegionServers together");
opt.addOption("n", "dry-run", false, "do not write assignments to META");
opt.addOption("v", "verify", false, "verify current assignments against META");
opt.addOption("p", "print", false, "print the current assignment plan in META");
opt.addOption("h", "help", false, "print usage");
opt.addOption("d", "verification-details", false,
"print the details of verification report");
opt.addOption("zk", true, "to set the zookeeper quorum");
opt.addOption("fs", true, "to set HDFS");
opt.addOption("hbase_root", true, "to set hbase_root directory");
opt.addOption("overwrite", false,
"overwrite the favored nodes for a single region," +
"for example: -update -r regionName -f server1:port,server2:port,server3:port");
opt.addOption("r", true, "The region name that needs to be updated");
opt.addOption("f", true, "The new favored nodes");
opt.addOption("tables", true,
"The list of table names splitted by ',' ;" +
"For example: -tables: t1,t2,...,tn");
opt.addOption("l", "locality", true, "enforce the maximum locality");
opt.addOption("m", "min-move", true, "enforce minimum assignment move");
opt.addOption("diff", false, "calculate difference between assignment plans");
opt.addOption("munkres", false,
"use munkres to place secondaries and tertiaries");
opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
"information for current plan");
try {
CommandLine cmd = new GnuParser().parse(opt, args);
Configuration conf = HBaseConfiguration.create();
boolean enforceMinAssignmentMove = true;
boolean enforceLocality = true;
boolean verificationDetails = false;
// Read all the options
if ((cmd.hasOption("l") &&
cmd.getOptionValue("l").equalsIgnoreCase("false")) ||
(cmd.hasOption("locality") &&
cmd.getOptionValue("locality").equalsIgnoreCase("false"))) {
enforceLocality = false;
}
if ((cmd.hasOption("m") &&
cmd.getOptionValue("m").equalsIgnoreCase("false")) ||
(cmd.hasOption("min-move") &&
cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) {
enforceMinAssignmentMove = false;
}
if (cmd.hasOption("zk")) {
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
}
if (cmd.hasOption("fs")) {
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
}
if (cmd.hasOption("hbase_root")) {
conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
}
// Create the region placement obj
try (RegionPlacementMaintainer rp =
new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
verificationDetails = true;
}
if (cmd.hasOption("tables")) {
String tableNameListStr = cmd.getOptionValue("tables");
String[] tableNames = StringUtils.split(tableNameListStr, ",");
rp.setTargetTableName(tableNames);
}
if (cmd.hasOption("munkres")) {
USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
}
// Read all the modes
if (cmd.hasOption("v") || cmd.hasOption("verify")) {
// Verify the region placement.
rp.verifyRegionPlacement(verificationDetails);
} else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
// Generate the assignment plan only without updating the hbase:meta and RS
FavoredNodesPlan plan = rp.getNewAssignmentPlan();
printAssignmentPlan(plan);
} else if (cmd.hasOption("w") || cmd.hasOption("write")) {
// Generate the new assignment plan
FavoredNodesPlan plan = rp.getNewAssignmentPlan();
// Print the new assignment plan
printAssignmentPlan(plan);
// Write the new assignment plan to META
rp.updateAssignmentPlanToMeta(plan);
} else if (cmd.hasOption("u") || cmd.hasOption("update")) {
// Generate the new assignment plan
FavoredNodesPlan plan = rp.getNewAssignmentPlan();
// Print the new assignment plan
printAssignmentPlan(plan);
// Update the assignment to hbase:meta and Region Servers
rp.updateAssignmentPlan(plan);
} else if (cmd.hasOption("diff")) {
FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
Map<String, Map<String, Float>> locality =
FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
System.out.println("Do you want to update the assignment plan? [y/n]");
Scanner s = new Scanner(System.in);
String input = s.nextLine().trim();
if (input.equals("y")) {
System.out.println("Updating assignment plan...");
rp.updateAssignmentPlan(newPlan);
}
s.close();
} else if (cmd.hasOption("ld")) {
Map<String, Map<String, Float>> locality =
FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
rp.printLocalityAndDispersionForCurrentPlan(locality);
} else if (cmd.hasOption("p") || cmd.hasOption("print")) {
FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
printAssignmentPlan(plan);
} else if (cmd.hasOption("overwrite")) {
if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
throw new IllegalArgumentException("Please specify: " +
" -update -r regionName -f server1:port,server2:port,server3:port");
}
String regionName = cmd.getOptionValue("r");
String favoredNodesStr = cmd.getOptionValue("f");
LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
favoredNodesStr);
List<ServerName> favoredNodes = null;
RegionInfo regionInfo =
rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
if (regionInfo == null) {
LOG.error("Cannot find the region " + regionName + " from the META");
} else {
try {
favoredNodes = getFavoredNodeList(favoredNodesStr);
} catch (IllegalArgumentException e) {
LOG.error("Cannot parse the invalid favored nodes because " + e);
}
FavoredNodesPlan newPlan = new FavoredNodesPlan();
newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
rp.updateAssignmentPlan(newPlan);
}
} else {
printHelp(opt);
}
}
} catch (ParseException e) {
printHelp(opt);
}
}
}