blob: 0e7f48e412a546b5aa59b2e3ecf6d6b12205abe9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.datagenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
/**
* Splits HBase tables into regions and deterministically assigns regions to region
* servers.
*/
class HBaseTestDataRegionAssigment {
public class TableNotFoundException extends Exception {
public TableNotFoundException(String s) {
super(s);
}
}
private final static Logger LOG = LoggerFactory.getLogger(
HBaseTestDataRegionAssigment.class);
private final Configuration conf;
private final HBaseAdmin hbaseAdmin;
private final List<ServerName> sortedRS; // sorted list of region server name
private final String[] splitPoints = { "1", "3", "5", "7", "9"};
// Number of times to retry a series region-split/wait-for-split calls.
private final static int MAX_SPLIT_ATTEMPTS = 10;
// Maximum time in ms to wait for a region to be split.
private final static int WAIT_FOR_SPLIT_TIMEOUT = 10000;
public HBaseTestDataRegionAssigment() throws IOException {
conf = new Configuration();
hbaseAdmin = new HBaseAdmin(conf);
ClusterStatus clusterStatus = hbaseAdmin.getClusterStatus();
Collection<ServerName> regionServerNames = clusterStatus.getServers();
sortedRS = new ArrayList<ServerName>(regionServerNames);
Collections.sort(sortedRS);
}
public void close() throws IOException {
hbaseAdmin.close();
}
/**
* Split the table regions according to splitPoints and pair up adjacent regions to the
* same server. Each region pair in ([unbound:1,1:3], [3:5,5:7], [7:9,9:unbound])
* will be on the same server.
* The table must have data loaded and only a single region.
*/
public void performAssigment(String tableName) throws IOException,
InterruptedException, TableNotFoundException {
HTableDescriptor[] desc = hbaseAdmin.listTables(tableName);
if (desc == null || desc.length == 0) {
throw new TableNotFoundException("Table " + tableName + " not found.");
}
if (hbaseAdmin.getTableRegions(tableName.getBytes()).size() == 1) {
// Split into regions
// The table has one region only to begin with. The logic of
// blockUntilRegionSplit requires that the input regionName has performed a split.
// If the table has already been split (i.e. regions count > 1), the same split
// call will be a no-op and this will cause blockUntilRegionSplit to break.
for (int i = 0; i < splitPoints.length; ++i) {
hbaseAdmin.majorCompact(tableName);
List<HRegionInfo> regions = hbaseAdmin.getTableRegions(tableName.getBytes());
HRegionInfo splitRegion = regions.get(regions.size() - 1);
int attempt = 1;
boolean done = false;
while (!done && attempt < MAX_SPLIT_ATTEMPTS) {
// HBase seems to not always properly receive/process this split RPC,
// so we need to retry the split/block several times.
hbaseAdmin.split(splitRegion.getRegionNameAsString(), splitPoints[i]);
done = blockUntilRegionSplit(conf, WAIT_FOR_SPLIT_TIMEOUT,
splitRegion.getRegionName(), true);
Thread.sleep(100);
++attempt;
}
if (!done) {
throw new IllegalStateException(
String.format("Failed to split region '%s' after %s attempts.",
splitRegion.getRegionNameAsString(), WAIT_FOR_SPLIT_TIMEOUT));
}
LOG.info(String.format("Split region '%s' after %s attempts.",
splitRegion.getRegionNameAsString(), attempt));
}
}
// Sort the region by start key
List<HRegionInfo> regions = hbaseAdmin.getTableRegions(tableName.getBytes());
Preconditions.checkArgument(regions.size() == splitPoints.length + 1);
Collections.sort(regions);
// Pair up two adjacent regions to the same region server. That is,
// region server 1 <- regions (unbound:1), (1:3)
// region server 2 <- regions (3:5), (5:7)
// region server 3 <- regions (7:9), (9:unbound)
NavigableMap<HRegionInfo, ServerName> expectedLocs = Maps.newTreeMap();
for (int i = 0; i < regions.size(); ++i) {
HRegionInfo regionInfo = regions.get(i);
int rsIdx = (i / 2) % sortedRS.size();
ServerName regionServerName = sortedRS.get(rsIdx);
hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
regionServerName.getServerName().getBytes());
expectedLocs.put(regionInfo, regionServerName);
}
// hbaseAdmin.move() is an asynchronous operation. HBase tests use sleep to wait for
// the move to complete. It should be done in 10sec.
int sleepCnt = 0;
HTable hbaseTable = new HTable(conf, tableName);
try {
while(!expectedLocs.equals(hbaseTable.getRegionLocations()) &&
sleepCnt < 100) {
Thread.sleep(100);
++sleepCnt;
}
NavigableMap<HRegionInfo, ServerName> actualLocs = hbaseTable.getRegionLocations();
Preconditions.checkArgument(expectedLocs.equals(actualLocs));
// Log the actual region location map
for (Map.Entry<HRegionInfo, ServerName> entry: actualLocs.entrySet()) {
LOG.info(printKey(entry.getKey().getStartKey()) + " -> " +
entry.getValue().getHostAndPort());
}
// Force a major compaction such that the HBase table is backed by deterministic
// physical artifacts (files, WAL, etc.). Our #rows estimate relies on the sizes of
// these physical artifacts.
LOG.info("Major compacting HBase table: " + tableName);
hbaseAdmin.majorCompact(tableName);
} finally {
IOUtils.closeQuietly(hbaseTable);
}
}
/**
* Returns non-printable characters in escaped octal, otherwise returns the characters.
*/
public static String printKey(byte[] key) {
StringBuilder result = new StringBuilder();
for (int i = 0; i < key.length; ++i) {
if (!Character.isISOControl(key[i])) {
result.append((char) key[i]);
} else {
result.append("\\");
result.append(Integer.toOctalString(key[i]));
}
}
return result.toString();
}
/**
* The following static methods blockUntilRegionSplit, getRegionRow,
* blockUntilRegionIsOpened and blockUntilRegionIsInMeta are copied from
* org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction
* to help block until a region split is completed.
*
* The original code was modified to return a true/false in case of success/failure.
*
* Blocks until the region split is complete in META and region server opens the
* daughters
*/
private static boolean blockUntilRegionSplit(Configuration conf, long timeout,
final byte[] regionName, boolean waitForDaughters)
throws IOException, InterruptedException {
long start = System.currentTimeMillis();
HRegionInfo daughterA = null, daughterB = null;
HTable metaTable = new HTable(conf, TableName.META_TABLE_NAME);
try {
while (System.currentTimeMillis() - start < timeout) {
Result result = getRegionRow(metaTable, regionName);
if (result == null) {
break;
}
HRegionInfo region = HRegionInfo.getHRegionInfo(result);
if(region.isSplitParent()) {
PairOfSameType<HRegionInfo> pair = HRegionInfo.getDaughterRegions(result);
daughterA = pair.getFirst();
daughterB = pair.getSecond();
break;
}
Threads.sleep(100);
}
if (daughterA == null || daughterB == null) return false;
//if we are here, this means the region split is complete or timed out
if (waitForDaughters) {
long rem = timeout - (System.currentTimeMillis() - start);
blockUntilRegionIsInMeta(metaTable, rem, daughterA);
rem = timeout - (System.currentTimeMillis() - start);
blockUntilRegionIsInMeta(metaTable, rem, daughterB);
rem = timeout - (System.currentTimeMillis() - start);
blockUntilRegionIsOpened(conf, rem, daughterA);
rem = timeout - (System.currentTimeMillis() - start);
blockUntilRegionIsOpened(conf, rem, daughterB);
}
} finally {
IOUtils.closeQuietly(metaTable);
}
return true;
}
private static Result getRegionRow(HTable metaTable, byte[] regionName)
throws IOException {
Get get = new Get(regionName);
return metaTable.get(get);
}
private static void blockUntilRegionIsInMeta(HTable metaTable, long timeout,
HRegionInfo hri) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
Result result = getRegionRow(metaTable, hri.getRegionName());
if (result != null) {
HRegionInfo info = HRegionInfo.getHRegionInfo(result);
if (info != null && !info.isOffline()) {
break;
}
}
Threads.sleep(10);
}
}
/**
* Starting with HBase 0.95.2 the Get class' c'tor no longer accepts
* empty key strings leading to the rather undesirable behavior that this method
* is not guaranteed to succeed. This method repeatedly attempts to 'get' the start key
* of the given region from the region server to detect when the region server becomes
* available. However, the first region has an empty array as the start key causing the
* Get c'tor to throw an exception as stated above. The end key cannot be used instead
* because it is an exclusive upper bound.
*/
private static void blockUntilRegionIsOpened(Configuration conf, long timeout,
HRegionInfo hri) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
HTable table = new HTable(conf, hri.getTableName());
try {
byte [] row = hri.getStartKey();
// Check for null/empty row. If we find one, use a key that is likely to
// be in first region. If key '0' happens not to be in the given region
// then an exception will be thrown.
if (row == null || row.length <= 0) row = new byte [] {'0'};
Get get = new Get(row);
while (System.currentTimeMillis() - start < timeout) {
try {
table.get(get);
break;
} catch(IOException ex) {
//wait some more
}
Threads.sleep(10);
}
} finally {
IOUtils.closeQuietly(table);
}
}
/**
* args contains a list of hbase table names. This program will split the hbase tables
* into regions and assign each region to a specific region server.
*/
public static void main(String args[]) throws IOException, InterruptedException,
TableNotFoundException {
HBaseTestDataRegionAssigment assignment = new HBaseTestDataRegionAssigment();
for (String htable: args) {
assignment.performAssigment(htable);
}
assignment.close();
// Exit forcefully because of HDFS-6057. Otherwise, there the JVM won't exit due to a
// non-daemon thread still being up.
System.exit(0);
}
}