blob: d97492fcdc03c21e507cacae31e74dededecce5e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.THBaseTable;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.util.StatsHelper;
import org.apache.impala.util.TResultRowBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
public interface FeHBaseTable extends FeTable {
/**
* @see Util#getEstimatedRowStats(byte[], byte[])
*/
Pair<Long, Long> getEstimatedRowStats(byte[] startRowKey, byte[] endRowKey);
/**
* @see Util#getHBaseTableName(Table)
*/
String getHBaseTableName();
/**
* @see Util#getTableStats(FeHBaseTable)
*/
TResultSet getTableStats();
/**
* Implementations may want to cache column families. This getter is for static
* functions in {@link Util} to access those potentially cached data.
*/
HColumnDescriptor[] getColumnFamilies() throws IOException;
/**
* Utility functions for acting on FeHBaseTable.
* When we fully move to Java 8, these can become default methods of the interface.
*/
abstract class Util {
// Storage handler class for HBase tables read by Hive.
public static final String HBASE_STORAGE_HANDLER =
"org.apache.hadoop.hive.hbase.HBaseStorageHandler";
// Column family of HBase row key
public static final String ROW_KEY_COLUMN_FAMILY = ":key";
// Copied from Hive's HBaseStorageHandler.java.
static final String DEFAULT_PREFIX = "default.";
// Number of rows fetched during the row count estimation per region
static final int ROW_COUNT_ESTIMATE_BATCH_SIZE = 10;
// Keep the conf around
static final Configuration HBASE_CONF = HBaseConfiguration.create();
private static final Logger LOG = LoggerFactory.getLogger(FeHBaseTable.class);
// Maximum deviation from the average to stop querying more regions
// to estimate the row count
private static final double DELTA_FROM_AVERAGE = 0.15;
// Minimum number of regions that are checked to estimate the row count
private static final int MIN_NUM_REGIONS_TO_CHECK = 5;
// constants from Hive's HBaseSerDe.java copied here to avoid dependending on
// hive-hbase-handler (and its transitive dependencies) These are user facing
// properties and pretty much guaranteed to not change without breaking backwards
// compatibility. Hence it is safe to just copy them here
private static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
private static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE =
"hbase.table.default.storage.type";
private static final String HBASE_KEY_COL = ":key";
private static final String HBASE_TABLE_NAME = "hbase.table.name";
/**
* Table client objects are thread-unsafe and cheap to create. The HBase docs
* recommend creating a new one for each task and then closing when done.
*/
public static org.apache.hadoop.hbase.client.Table getHBaseTable(
String hbaseTableName) throws IOException {
return ConnectionHolder.getConnection().getTable(TableName.valueOf(hbaseTableName));
}
static org.apache.hadoop.hbase.client.Table getHBaseTable(FeHBaseTable tbl)
throws IOException {
return getHBaseTable(tbl.getHBaseTableName());
}
/**
* Load columns from msTable in hive order. No IO is involved.
*/
public static List<Column> loadColumns(
org.apache.hadoop.hive.metastore.api.Table msTable)
throws MetaException, SerDeException {
Map<String, String> serdeParams = msTable.getSd().getSerdeInfo().getParameters();
String hbaseColumnsMapping = serdeParams.get(HBASE_COLUMNS_MAPPING);
if (hbaseColumnsMapping == null) {
throw new MetaException("No hbase.columns.mapping defined in Serde.");
}
String hbaseTableDefaultStorageType =
msTable.getParameters().get(HBASE_TABLE_DEFAULT_STORAGE_TYPE);
boolean tableDefaultStorageIsBinary = false;
if (hbaseTableDefaultStorageType != null &&
!hbaseTableDefaultStorageType.isEmpty()) {
if (hbaseTableDefaultStorageType.equalsIgnoreCase("binary")) {
tableDefaultStorageIsBinary = true;
} else if (!hbaseTableDefaultStorageType.equalsIgnoreCase("string")) {
throw new SerDeException(
"Error: " + HBASE_TABLE_DEFAULT_STORAGE_TYPE +
" parameter must be specified as" + " 'string' or 'binary'; '" +
hbaseTableDefaultStorageType +
"' is not a valid specification for this table/serde property.");
}
}
// Parse HBase column-mapping string.
List<FieldSchema> fieldSchemas = msTable.getSd().getCols();
List<String> hbaseColumnFamilies = new ArrayList<>();
List<String> hbaseColumnQualifiers = new ArrayList<>();
List<Boolean> hbaseColumnBinaryEncodings = new ArrayList<>();
parseColumnMapping(tableDefaultStorageIsBinary, hbaseColumnsMapping,
msTable.getTableName(), fieldSchemas, hbaseColumnFamilies,
hbaseColumnQualifiers, hbaseColumnBinaryEncodings);
Preconditions
.checkState(hbaseColumnFamilies.size() == hbaseColumnQualifiers.size());
Preconditions.checkState(fieldSchemas.size() == hbaseColumnFamilies.size());
// Populate tmp cols in the order they appear in the Hive metastore.
// We will reorder the cols below.
List<HBaseColumn> tmpCols = new ArrayList<>();
// Store the key column separately.
// TODO: Change this to an ArrayList once we support composite row keys.
HBaseColumn keyCol = null;
for (int i = 0; i < fieldSchemas.size(); ++i) {
FieldSchema s = fieldSchemas.get(i);
Type t = Type.INVALID;
try {
t = FeCatalogUtils.parseColumnType(s, msTable.getTableName());
} catch (TableLoadingException e) {
// Ignore hbase types we don't support yet. We can load the metadata
// but won't be able to select from it.
}
HBaseColumn col = new HBaseColumn(s.getName(), hbaseColumnFamilies.get(i),
hbaseColumnQualifiers.get(i), hbaseColumnBinaryEncodings.get(i), t,
s.getComment(), -1);
if (col.getColumnFamily().equals(ROW_KEY_COLUMN_FAMILY)) {
// Store the row key column separately from the rest
keyCol = col;
} else {
tmpCols.add(col);
}
}
Preconditions.checkState(keyCol != null);
// The backend assumes that the row key column is always first and
// that the remaining HBase columns are ordered by columnFamily,columnQualifier,
// so the final position depends on the other mapped HBase columns.
// Sort columns and update positions.
Collections.sort(tmpCols);
keyCol.setPosition(0);
List<Column> cols = new ArrayList<>();
cols.add(keyCol);
// Update the positions of the remaining columns
for (int i = 0; i < tmpCols.size(); ++i) {
HBaseColumn col = tmpCols.get(i);
col.setPosition(i + 1);
cols.add(col);
}
return cols;
}
/**
* Parse the column description string to the column families and column
* qualifies. This is a copy of HBaseSerDe.parseColumnMapping and
* parseColumnStorageTypes with parts we don't use removed. The hive functions
* are not public.
* tableDefaultStorageIsBinary - true if table is default to binary encoding
* columnsMappingSpec - input string format describing the table
* fieldSchemas - input field schema from metastore table
* columnFamilies/columnQualifiers/columnBinaryEncodings - out parameters that will be
* filled with the column family, column qualifier and encoding for each column.
*/
static void parseColumnMapping(boolean tableDefaultStorageIsBinary,
String columnsMappingSpec, String tblName, List<FieldSchema> fieldSchemas,
List<String> columnFamilies, List<String> columnQualifiers,
List<Boolean> colIsBinaryEncoded) throws SerDeException {
if (columnsMappingSpec == null) {
throw new SerDeException(
"Error: hbase.columns.mapping missing for this HBase table.");
}
if (columnsMappingSpec.equals("") ||
columnsMappingSpec.equals(HBASE_KEY_COL)) {
throw new SerDeException("Error: hbase.columns.mapping specifies only " +
"the HBase table row key. A valid Hive-HBase table must specify at " +
"least one additional column.");
}
int rowKeyIndex = -1;
String[] columnSpecs = columnsMappingSpec.split(",");
// If there was an implicit key column mapping, the number of columns (fieldSchemas)
// will be one more than the number of column mapping specs.
int fsStartIdxOffset = fieldSchemas.size() - columnSpecs.length;
if (fsStartIdxOffset != 0 && fsStartIdxOffset != 1) {
// This should never happen - Hive blocks creating a mismatched table and both
// Hive and Impala currently block all column-level DDL on HBase tables.
throw new SerDeException(String.format("Number of entries in " +
"'hbase.columns.mapping' does not match the number of columns in the " +
"table: %d != %d (counting the key if implicit)", columnSpecs.length,
fieldSchemas.size()));
}
for (int i = 0; i < columnSpecs.length; ++i) {
String mappingSpec = columnSpecs[i];
String[] mapInfo = mappingSpec.split("#");
// Trim column info so that serdeproperties with new lines still parse correctly.
String colInfo = mapInfo[0].trim();
int idxFirst = colInfo.indexOf(":");
int idxLast = colInfo.lastIndexOf(":");
if (idxFirst < 0 || !(idxFirst == idxLast)) {
throw new SerDeException("Error: the HBase columns mapping contains a " +
"badly formed column family, column qualifier specification.");
}
if (colInfo.equals(HBASE_KEY_COL)) {
Preconditions.checkState(fsStartIdxOffset == 0);
rowKeyIndex = i;
columnFamilies.add(colInfo);
columnQualifiers.add(null);
} else {
String[] parts = colInfo.split(":");
Preconditions.checkState(parts.length > 0 && parts.length <= 2);
columnFamilies.add(parts[0]);
if (parts.length == 2) {
columnQualifiers.add(parts[1]);
} else {
columnQualifiers.add(null);
}
}
// Set column binary encoding
FieldSchema fieldSchema = fieldSchemas.get(i + fsStartIdxOffset);
boolean supportsBinaryEncoding = supportsBinaryEncoding(fieldSchema, tblName);
if (mapInfo.length == 1) {
// There is no column level storage specification. Use the table storage spec.
colIsBinaryEncoded.add(tableDefaultStorageIsBinary && supportsBinaryEncoding);
} else if (mapInfo.length == 2) {
// There is a storage specification for the column
String storageOption = mapInfo[1];
if (!(storageOption.equals("-") || "string".startsWith(storageOption) ||
"binary".startsWith(storageOption))) {
throw new SerDeException("Error: A column storage specification is one of" +
" the following: '-', a prefix of 'string', or a prefix of 'binary'. " +
storageOption + " is not a valid storage option specification for " +
fieldSchema.getName());
}
boolean isBinaryEncoded = false;
if ("-".equals(storageOption)) {
isBinaryEncoded = tableDefaultStorageIsBinary;
} else if ("binary".startsWith(storageOption)) {
isBinaryEncoded = true;
}
if (isBinaryEncoded && !supportsBinaryEncoding) {
// Use string encoding and log a warning if the column spec is binary but the
// column type does not support it.
// TODO: Hive/HBase does not raise an exception, but should we?
LOG.warn("Column storage specification for column " + fieldSchema.getName() +
" is binary" + " but the column type " + fieldSchema.getType() +
" does not support binary encoding. Fallback to string format.");
isBinaryEncoded = false;
}
colIsBinaryEncoded.add(isBinaryEncoded);
} else {
// error in storage specification
throw new SerDeException(
"Error: " + HBASE_COLUMNS_MAPPING + " storage specification " +
mappingSpec + " is not valid for column: " + fieldSchema.getName());
}
}
if (rowKeyIndex == -1) {
columnFamilies.add(0, HBASE_KEY_COL);
columnQualifiers.add(0, null);
colIsBinaryEncoded.add(0, supportsBinaryEncoding(fieldSchemas.get(0), tblName) &&
tableDefaultStorageIsBinary);
}
}
/**
* Get an estimate of the number of rows and bytes per row in regions between
* startRowKey and endRowKey.
* <p>
* This number is calculated by incrementally checking as many region servers as
* necessary until we observe a relatively constant row size per region on average.
* Depending on the skew of data in the regions this can either mean that we need
* to check only a minimal number of regions or that we will scan all regions.
* <p>
* The HBase region servers periodically update the master with their metrics,
* including storefile size. We get the size of the storefiles for all regions in
* the cluster with a single call to getClusterStatus from the master.
* <p>
* The accuracy of this number is determined by the number of rows that are written
* and kept in the memstore and have not been flushed until now. A large number
* of key-value pairs in the memstore will lead to bad estimates as this number
* is not reflected in the storefile size that is used to estimate this number.
* <p>
* Currently, the algorithm does not consider the case that the key range used as a
* parameter might be generally of different size than the rest of the region.
*
* @param startRowKey First row key in the range
* @param endRowKey Last row key in the range
* @return The estimated number of rows in the regions between the row keys (first)
* and the estimated row size in bytes (second).
*/
public static Pair<Long, Long> getEstimatedRowStats(FeHBaseTable tbl,
byte[] startRowKey, byte[] endRowKey) {
Preconditions.checkNotNull(startRowKey);
Preconditions.checkNotNull(endRowKey);
if (LOG.isTraceEnabled()) {
LOG.trace("getEstimatedRowStats for {} for key range ('{}', '{}')",
tbl.getHBaseTableName(), Bytes.toString(startRowKey),
Bytes.toString(endRowKey));
}
long startTime = System.currentTimeMillis();
boolean isCompressed = false;
long rowCount;
long rowSize;
try {
ClusterStatus clusterStatus = getClusterStatus();
// Check to see if things are compressed.
// If they are we'll estimate a compression factor.
HColumnDescriptor[] columnFamilies = tbl.getColumnFamilies();
Preconditions.checkNotNull(columnFamilies);
for (HColumnDescriptor desc : columnFamilies) {
isCompressed |= desc.getCompression() != Compression.Algorithm.NONE;
}
// Fetch all regions for the key range
List<HRegionLocation> locations = getRegionsInRange(tbl, startRowKey, endRowKey);
Collections.shuffle(locations);
// The following variables track the number and size of 'rows' in
// HBase and allow incremental calculation of the average and standard
// deviation.
StatsHelper<Long> statsSize = new StatsHelper<>();
long totalEstimatedRows = 0;
// Collects stats samples from at least MIN_NUM_REGIONS_TO_CHECK
// and at most all regions until the delta is small enough.
if (LOG.isTraceEnabled()) {
LOG.trace("Start rows sampling on " + locations.size() + " regions");
}
while ((statsSize.count() < MIN_NUM_REGIONS_TO_CHECK ||
statsSize.stddev() > statsSize.mean() * DELTA_FROM_AVERAGE) &&
statsSize.count() < locations.size()) {
HRegionLocation currentLocation = locations.get((int) statsSize.count());
Pair<Long, Long> tmp =
getEstimatedRowStatsForRegion(tbl, currentLocation, isCompressed,
clusterStatus);
totalEstimatedRows += tmp.first;
statsSize.addSample(tmp.second);
if (LOG.isTraceEnabled()) {
LOG.trace("Estimation state: totalEstimatedRows={}, statsSize.count={}, " +
"statsSize.stddev={}, statsSize.mean={}",
totalEstimatedRows, statsSize.count(), statsSize.stddev(),
statsSize.mean());
}
}
// Sum up the total size for all regions in range.
long totalSize = 0;
for (final HRegionLocation location : locations) {
totalSize += getRegionSize(location, clusterStatus);
}
if (totalSize == 0) {
rowCount = totalEstimatedRows;
} else if (statsSize.mean() < 1) {
// No meaningful row width found. The < 1 handles both the
// no row case and the potential case where the average is
// too small to be meaningful.
LOG.warn("Table {}: no data available to compute " +
"row count estimate for key range ('{}', '{}')",
tbl.getFullName(), Bytes.toString(startRowKey), Bytes.toString(endRowKey));
return new Pair<>(-1L, -1L);
} else {
rowCount = (long) (totalSize / statsSize.mean());
}
rowSize = (long) statsSize.mean();
if (LOG.isTraceEnabled()) {
LOG.trace("getEstimatedRowStats results: rowCount={}, rowSize={}, " +
"timeElapsed={}ms", rowCount, rowSize,
System.currentTimeMillis() - startTime);
}
return new Pair<>(rowCount, rowSize);
} catch (IOException ioe) {
// Print the stack trace, but we'll ignore it
// as this is just an estimate.
// TODO: Put this into the per query log.
LOG.error("Error computing HBase row count estimate", ioe);
return new Pair<>(-1L, -1L);
}
}
/**
* Returns statistics on this table as a tabular result set. Used for the
* SHOW TABLE STATS statement. The schema of the returned TResultSet is set
* inside this method.
*/
public static TResultSet getTableStats(FeHBaseTable tbl) {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
result.setSchema(resultSchema);
resultSchema.addToColumns(new TColumn("Region Location", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Start RowKey", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Est. #Rows", Type.BIGINT.toThrift()));
resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
// TODO: Consider fancier stats maintenance techniques for speeding up this process.
// Currently, we list all regions and perform a mini-scan of each of them to
// estimate the number of rows, the data size, etc., which is rather expensive.
try {
ClusterStatus clusterStatus = getClusterStatus();
long totalNumRows = 0;
long totalSize = 0;
List<HRegionLocation> regions =
getRegionsInRange(tbl, HConstants.EMPTY_END_ROW, HConstants.EMPTY_START_ROW);
for (HRegionLocation region : regions) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
HRegionInfo regionInfo = region.getRegionInfo();
Pair<Long, Long> estRowStats =
getEstimatedRowStatsForRegion(tbl, region, false, clusterStatus);
long numRows = estRowStats.first;
long regionSize = getRegionSize(region, clusterStatus);
totalNumRows += numRows;
totalSize += regionSize;
// Add the region location, start rowkey, number of rows and raw size.
rowBuilder.add(String.valueOf(region.getHostname()))
.add(Bytes.toString(regionInfo.getStartKey())).add(numRows)
.addBytes(regionSize);
result.addToRows(rowBuilder.get());
}
// Total num rows and raw region size.
if (regions.size() > 1) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
rowBuilder.add("Total").add("").add(totalNumRows).addBytes(totalSize);
result.addToRows(rowBuilder.get());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return result;
}
/**
* This method is completely copied from Hive's HBaseStorageHandler.java.
*/
public static String getHBaseTableName(
org.apache.hadoop.hive.metastore.api.Table tbl) {
// Give preference to TBLPROPERTIES over SERDEPROPERTIES
// (really we should only use TBLPROPERTIES, so this is just
// for backwards compatibility with the original specs).
String tableName = tbl.getParameters().get(HBASE_TABLE_NAME);
if (tableName == null) {
tableName =
tbl.getSd().getSerdeInfo().getParameters().get(HBASE_TABLE_NAME);
}
if (tableName == null) {
tableName = tbl.getDbName() + "." + tbl.getTableName();
if (tableName.startsWith(DEFAULT_PREFIX)) {
tableName = tableName.substring(DEFAULT_PREFIX.length());
}
}
return tableName;
}
/**
* Estimates the number of rows for a single region and returns a pair with
* the estimated row count and the estimated size in bytes per row.
*/
private static Pair<Long, Long> getEstimatedRowStatsForRegion(FeHBaseTable tbl,
HRegionLocation location, boolean isCompressed, ClusterStatus clusterStatus)
throws IOException {
HRegionInfo info = location.getRegionInfo();
Scan s = new Scan(info.getStartKey());
// Get a small sample of rows
s.setBatch(ROW_COUNT_ESTIMATE_BATCH_SIZE);
// Try and get every version so the row's size can be used to estimate.
s.setMaxVersions(Short.MAX_VALUE);
// Don't cache the blocks as we don't think these are
// necessarily important blocks.
s.setCacheBlocks(false);
// Try and get deletes too so their size can be counted.
s.setRaw(false);
long currentRowSize = 0;
long currentRowCount = 0;
try (org.apache.hadoop.hbase.client.Table table = getHBaseTable(tbl);
ResultScanner rs = table.getScanner(s)) {
// Get the the ROW_COUNT_ESTIMATE_BATCH_SIZE fetched rows
// for a representative sample
for (int i = 0; i < ROW_COUNT_ESTIMATE_BATCH_SIZE; ++i) {
Result r = rs.next();
if (r == null) break;
// Check for empty rows, see IMPALA-1451
if (r.isEmpty()) continue;
++currentRowCount;
// To estimate the number of rows we simply use the amount of bytes
// returned from the underlying buffer. Since HBase internally works
// with these structures as well this gives us ok estimates.
Cell[] cells = r.rawCells();
for (Cell c : cells) {
if (c instanceof KeyValue) {
currentRowSize += KeyValue
.getKeyValueDataStructureSize(c.getRowLength(), c.getFamilyLength(),
c.getQualifierLength(), c.getValueLength(), c.getTagsLength());
} else {
throw new IllegalStateException(
"Celltype " + c.getClass().getName() + " not supported.");
}
}
}
}
// If there are no rows then no need to estimate.
if (currentRowCount == 0) return new Pair<>(0L, 0L);
// Get the size.
long currentSize = getRegionSize(location, clusterStatus);
// estimate the number of rows.
double bytesPerRow = currentRowSize / (double) currentRowCount;
if (currentSize == 0) {
return new Pair<>(currentRowCount, (long) bytesPerRow);
}
// Compression factor two is only a best effort guess
long estimatedRowCount =
(long) ((isCompressed ? 2 : 1) * (currentSize / bytesPerRow));
return new Pair<>(estimatedRowCount, (long) bytesPerRow);
}
/**
* Returns the size of the given region in bytes. Simply returns the storefile size
* for this region from the ClusterStatus. Returns 0 in case of an error.
*/
private static long getRegionSize(HRegionLocation location,
ClusterStatus clusterStatus) {
HRegionInfo info = location.getRegionInfo();
ServerLoad serverLoad = clusterStatus.getLoad(location.getServerName());
// If the serverLoad is null, the master doesn't have information for this region's
// server. This shouldn't normally happen.
if (serverLoad == null) {
LOG.error("Unable to find server load for server: " + location.getServerName() +
" for location " + info.getRegionNameAsString());
return 0;
}
RegionLoad regionLoad = serverLoad.getRegionsLoad().get(info.getRegionName());
if (regionLoad == null) {
LOG.error("Unable to find regions load for server: " + location.getServerName() +
" for location " + info.getRegionNameAsString());
return 0;
}
final long megaByte = 1024L * 1024L;
return regionLoad.getStorefileSizeMB() * megaByte;
}
public static THBaseTable getTHBaseTable(FeHBaseTable table) {
THBaseTable tHbaseTable = new THBaseTable();
tHbaseTable.setTableName(table.getHBaseTableName());
for (Column c : table.getColumns()) {
HBaseColumn hbaseCol = (HBaseColumn) c;
tHbaseTable.addToFamilies(hbaseCol.getColumnFamily());
if (hbaseCol.getColumnQualifier() != null) {
tHbaseTable.addToQualifiers(hbaseCol.getColumnQualifier());
} else {
tHbaseTable.addToQualifiers("");
}
tHbaseTable.addToBinary_encoded(hbaseCol.isBinaryEncoded());
}
return tHbaseTable;
}
/**
* Get the corresponding regions for an arbitrary range of keys.
* This is copied from org.apache.hadoop.hbase.client.HTable in HBase 0.95. The
* difference is that it does not use cache when calling getRegionLocation.
*
* @param tbl An FeHBaseTable in the catalog
* @param startKey Starting key in range, inclusive
* @param endKey Ending key in range, exclusive
* @return A list of HRegionLocations corresponding to the regions that
* contain the specified range
* @throws IOException if a remote or network exception occurs
*/
public static List<HRegionLocation> getRegionsInRange(FeHBaseTable tbl,
final byte[] startKey, final byte[] endKey) throws IOException {
long startTime = System.currentTimeMillis();
try (org.apache.hadoop.hbase.client.Table hbaseTbl = getHBaseTable(tbl)) {
final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(startKey) + " > " +
Bytes.toStringBinary(endKey));
}
final List<HRegionLocation> regionList = new ArrayList<>();
byte[] currentKey = startKey;
Connection connection = ConnectionHolder.getConnection();
RegionLocator locator = connection.getRegionLocator(hbaseTbl.getName());
do {
// always reload region location info.
HRegionLocation regionLocation = locator.getRegionLocation(currentKey, true);
regionList.add(regionLocation);
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
(endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
if (LOG.isTraceEnabled()) {
LOG.trace("getRegionsInRange timeElapsed={}ms",
System.currentTimeMillis() - startTime);
}
return regionList;
}
}
/**
* Get the cluster status, making sure we close the admin client afterwards.
*/
static ClusterStatus getClusterStatus() throws IOException {
try (Admin admin = ConnectionHolder.getConnection().getAdmin()) {
return admin.getClusterStatus();
}
}
private static boolean supportsBinaryEncoding(FieldSchema fs, String tblName) {
try {
Type colType = FeCatalogUtils.parseColumnType(fs, tblName);
// Only boolean, integer and floating point types can use binary storage.
return colType.isBoolean() || colType.isIntegerType() ||
colType.isFloatingPointType();
} catch (TableLoadingException e) {
return false;
}
}
/**
* Connection instances are expensive to create. The HBase documentation recommends
* one and then sharing it among threads. All operations on a connection are
* thread-safe.
*/
static class ConnectionHolder {
private static Connection connection_ = null;
static synchronized Connection getConnection() throws IOException {
if (connection_ == null || connection_.isClosed()) {
connection_ = ConnectionFactory.createConnection(Util.HBASE_CONF);
}
return connection_;
}
}
}
}