blob: 43c72c4206deab3c2ca7ddff0eac81dab2806e9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
public class HashTable extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(HashTable.class);
private static final int DEFAULT_BATCH_SIZE = 8000;
private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
final static String PARTITIONS_FILE_NAME = "partitions";
final static String MANIFEST_FILE_NAME = "manifest";
final static String HASH_DATA_DIR = "hashes";
final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
TableHash tableHash = new TableHash();
Path destPath;
public HashTable(Configuration conf) {
super(conf);
}
public static class TableHash {
Path hashDir;
String tableName;
String families = null;
long batchSize = DEFAULT_BATCH_SIZE;
int numHashFiles = 0;
byte[] startRow = HConstants.EMPTY_START_ROW;
byte[] stopRow = HConstants.EMPTY_END_ROW;
int scanBatch = 0;
int versions = -1;
long startTime = 0;
long endTime = 0;
List<ImmutableBytesWritable> partitions;
public static TableHash read(Configuration conf, Path hashDir) throws IOException {
TableHash tableHash = new TableHash();
FileSystem fs = hashDir.getFileSystem(conf);
tableHash.hashDir = hashDir;
tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
return tableHash;
}
void writePropertiesFile(FileSystem fs, Path path) throws IOException {
Properties p = new Properties();
p.setProperty("table", tableName);
if (families != null) {
p.setProperty("columnFamilies", families);
}
p.setProperty("targetBatchSize", Long.toString(batchSize));
p.setProperty("numHashFiles", Integer.toString(numHashFiles));
if (!isTableStartRow(startRow)) {
p.setProperty("startRowHex", Bytes.toHex(startRow));
}
if (!isTableEndRow(stopRow)) {
p.setProperty("stopRowHex", Bytes.toHex(stopRow));
}
if (scanBatch > 0) {
p.setProperty("scanBatch", Integer.toString(scanBatch));
}
if (versions >= 0) {
p.setProperty("versions", Integer.toString(versions));
}
if (startTime != 0) {
p.setProperty("startTimestamp", Long.toString(startTime));
}
if (endTime != 0) {
p.setProperty("endTimestamp", Long.toString(endTime));
}
try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
p.store(osw, null);
}
}
void readPropertiesFile(FileSystem fs, Path path) throws IOException {
Properties p = new Properties();
try (FSDataInputStream in = fs.open(path)) {
try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
p.load(isr);
}
}
tableName = p.getProperty("table");
families = p.getProperty("columnFamilies");
batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
String startRowHex = p.getProperty("startRowHex");
if (startRowHex != null) {
startRow = Bytes.fromHex(startRowHex);
}
String stopRowHex = p.getProperty("stopRowHex");
if (stopRowHex != null) {
stopRow = Bytes.fromHex(stopRowHex);
}
String scanBatchString = p.getProperty("scanBatch");
if (scanBatchString != null) {
scanBatch = Integer.parseInt(scanBatchString);
}
String versionString = p.getProperty("versions");
if (versionString != null) {
versions = Integer.parseInt(versionString);
}
String startTimeString = p.getProperty("startTimestamp");
if (startTimeString != null) {
startTime = Long.parseLong(startTimeString);
}
String endTimeString = p.getProperty("endTimestamp");
if (endTimeString != null) {
endTime = Long.parseLong(endTimeString);
}
}
Scan initScan() throws IOException {
Scan scan = new Scan();
scan.setCacheBlocks(false);
if (startTime != 0 || endTime != 0) {
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
}
if (scanBatch > 0) {
scan.setBatch(scanBatch);
}
if (versions >= 0) {
scan.setMaxVersions(versions);
}
if (!isTableStartRow(startRow)) {
scan.setStartRow(startRow);
}
if (!isTableEndRow(stopRow)) {
scan.setStopRow(stopRow);
}
if(families != null) {
for(String fam : families.split(",")) {
scan.addFamily(Bytes.toBytes(fam));
}
}
return scan;
}
/**
* Choose partitions between row ranges to hash to a single output file
* Selects region boundaries that fall within the scan range, and groups them
* into the desired number of partitions.
*/
void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
List<byte[]> startKeys = new ArrayList<byte[]>();
for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
// if scan begins after this region, or starts before this region, then drop this region
// in other words:
// IF (scan begins before the end of this region
// AND scan ends before the start of this region)
// THEN include this region
if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
|| Bytes.compareTo(startRow, regionEndKey) < 0)
&& (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
|| Bytes.compareTo(stopRow, regionStartKey) > 0)) {
startKeys.add(regionStartKey);
}
}
int numRegions = startKeys.size();
if (numHashFiles == 0) {
numHashFiles = numRegions / 100;
}
if (numHashFiles == 0) {
numHashFiles = 1;
}
if (numHashFiles > numRegions) {
// can't partition within regions
numHashFiles = numRegions;
}
// choose a subset of start keys to group regions into ranges
partitions = new ArrayList<ImmutableBytesWritable>(numHashFiles - 1);
// skip the first start key as it is not a partition between ranges.
for (long i = 1; i < numHashFiles; i++) {
int splitIndex = (int) (numRegions * i / numHashFiles);
partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
}
}
void writePartitionFile(Configuration conf, Path path) throws IOException {
FileSystem fs = path.getFileSystem(conf);
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
for (int i = 0; i < partitions.size(); i++) {
writer.append(partitions.get(i), NullWritable.get());
}
writer.close();
}
private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
throws IOException {
@SuppressWarnings("deprecation")
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
ImmutableBytesWritable key = new ImmutableBytesWritable();
partitions = new ArrayList<ImmutableBytesWritable>();
while (reader.next(key)) {
partitions.add(new ImmutableBytesWritable(key.copyBytes()));
}
reader.close();
if (!Ordering.natural().isOrdered(partitions)) {
throw new IOException("Partitions are not ordered!");
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("tableName=").append(tableName);
if (families != null) {
sb.append(", families=").append(families);
}
sb.append(", batchSize=").append(batchSize);
sb.append(", numHashFiles=").append(numHashFiles);
if (!isTableStartRow(startRow)) {
sb.append(", startRowHex=").append(Bytes.toHex(startRow));
}
if (!isTableEndRow(stopRow)) {
sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
}
if (scanBatch >= 0) {
sb.append(", scanBatch=").append(scanBatch);
}
if (versions >= 0) {
sb.append(", versions=").append(versions);
}
if (startTime != 0) {
sb.append("startTime=").append(startTime);
}
if (endTime != 0) {
sb.append("endTime=").append(endTime);
}
return sb.toString();
}
static String getDataFileName(int hashFileIndex) {
return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
}
/**
* Open a TableHash.Reader starting at the first hash at or after the given key.
* @throws IOException
*/
public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
throws IOException {
return new Reader(conf, startKey);
}
public class Reader implements java.io.Closeable {
private final Configuration conf;
private int hashFileIndex;
private MapFile.Reader mapFileReader;
private boolean cachedNext;
private ImmutableBytesWritable key;
private ImmutableBytesWritable hash;
Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
this.conf = conf;
int partitionIndex = Collections.binarySearch(partitions, startKey);
if (partitionIndex >= 0) {
// if the key is equal to a partition, then go the file after that partition
hashFileIndex = partitionIndex+1;
} else {
// if the key is between partitions, then go to the file between those partitions
hashFileIndex = -1-partitionIndex;
}
openHashFile();
// MapFile's don't make it easy to seek() so that the subsequent next() returns
// the desired key/value pair. So we cache it for the first call of next().
hash = new ImmutableBytesWritable();
key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
if (key == null) {
cachedNext = false;
hash = null;
} else {
cachedNext = true;
}
}
/**
* Read the next key/hash pair.
* Returns true if such a pair exists and false when at the end of the data.
*/
public boolean next() throws IOException {
if (cachedNext) {
cachedNext = false;
return true;
}
key = new ImmutableBytesWritable();
hash = new ImmutableBytesWritable();
while (true) {
boolean hasNext = mapFileReader.next(key, hash);
if (hasNext) {
return true;
}
hashFileIndex++;
if (hashFileIndex < TableHash.this.numHashFiles) {
mapFileReader.close();
openHashFile();
} else {
key = null;
hash = null;
return false;
}
}
}
/**
* Get the current key
* @return the current key or null if there is no current key
*/
public ImmutableBytesWritable getCurrentKey() {
return key;
}
/**
* Get the current hash
* @return the current hash or null if there is no current hash
*/
public ImmutableBytesWritable getCurrentHash() {
return hash;
}
private void openHashFile() throws IOException {
if (mapFileReader != null) {
mapFileReader.close();
}
Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
mapFileReader = new MapFile.Reader(dataFile, conf);
}
@Override
public void close() throws IOException {
mapFileReader.close();
}
}
}
static boolean isTableStartRow(byte[] row) {
return Bytes.equals(HConstants.EMPTY_START_ROW, row);
}
static boolean isTableEndRow(byte[] row) {
return Bytes.equals(HConstants.EMPTY_END_ROW, row);
}
public Job createSubmittableJob(String[] args) throws IOException {
Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
generatePartitions(partitionsPath);
Job job = Job.getInstance(getConf(),
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
Configuration jobConf = job.getConfiguration();
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
job.setJarByClass(HashTable.class);
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// use a TotalOrderPartitioner and reducers to group region output into hash files
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
job.setReducerClass(Reducer.class); // identity reducer
job.setNumReduceTasks(tableHash.numHashFiles);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(ImmutableBytesWritable.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
return job;
}
private void generatePartitions(Path partitionsPath) throws IOException {
Connection connection = ConnectionFactory.createConnection(getConf());
Pair<byte[][], byte[][]> regionKeys
= connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
connection.close();
tableHash.selectPartitions(regionKeys);
LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
tableHash.writePartitionFile(getConf(), partitionsPath);
}
static class ResultHasher {
private MessageDigest digest;
private boolean batchStarted = false;
private ImmutableBytesWritable batchStartKey;
private ImmutableBytesWritable batchHash;
private long batchSize = 0;
public ResultHasher() {
try {
digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
Throwables.propagate(e);
}
}
public void startBatch(ImmutableBytesWritable row) {
if (batchStarted) {
throw new RuntimeException("Cannot start new batch without finishing existing one.");
}
batchStarted = true;
batchSize = 0;
batchStartKey = row;
batchHash = null;
}
public void hashResult(Result result) {
if (!batchStarted) {
throw new RuntimeException("Cannot add to batch that has not been started.");
}
for (Cell cell : result.rawCells()) {
int rowLength = cell.getRowLength();
int familyLength = cell.getFamilyLength();
int qualifierLength = cell.getQualifierLength();
int valueLength = cell.getValueLength();
digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
long ts = cell.getTimestamp();
for (int i = 8; i > 0; i--) {
digest.update((byte) ts);
ts >>>= 8;
}
digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
}
}
public void finishBatch() {
if (!batchStarted) {
throw new RuntimeException("Cannot finish batch that has not started.");
}
batchStarted = false;
batchHash = new ImmutableBytesWritable(digest.digest());
}
public boolean isBatchStarted() {
return batchStarted;
}
public ImmutableBytesWritable getBatchStartKey() {
return batchStartKey;
}
public ImmutableBytesWritable getBatchHash() {
return batchHash;
}
public long getBatchSize() {
return batchSize;
}
}
public static class HashMapper
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
private ResultHasher hasher;
private long targetBatchSize;
private ImmutableBytesWritable currentRow;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
targetBatchSize = context.getConfiguration()
.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
hasher = new ResultHasher();
TableSplit split = (TableSplit) context.getInputSplit();
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
if (currentRow == null || !currentRow.equals(key)) {
currentRow = new ImmutableBytesWritable(key); // not immutable
if (hasher.getBatchSize() >= targetBatchSize) {
hasher.finishBatch();
context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
hasher.startBatch(currentRow);
}
}
hasher.hashResult(value);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
hasher.finishBatch();
context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
}
}
private void writeTempManifestFile() throws IOException {
Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
FileSystem fs = tempManifestPath.getFileSystem(getConf());
tableHash.writePropertiesFile(fs, tempManifestPath);
}
private void completeManifest() throws IOException {
Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
FileSystem fs = tempManifestPath.getFileSystem(getConf());
fs.rename(tempManifestPath, manifestPath);
}
private static final int NUM_ARGS = 2;
private static void printUsage(final String errorMsg) {
if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg);
System.err.println();
}
System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
System.err.println();
System.err.println("Options:");
System.err.println(" batchsize the target amount of bytes to hash in each batch");
System.err.println(" rows are added to the batch until this size is reached");
System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
System.err.println(" numhashfiles the number of hash files to create");
System.err.println(" if set to fewer than number of regions then");
System.err.println(" the job will create this number of reducers");
System.err.println(" (defaults to 1/100 of regions -- at least 1)");
System.err.println(" startrow the start row");
System.err.println(" stoprow the stop row");
System.err.println(" starttime beginning of the time range (unixtime in millis)");
System.err.println(" without endtime means from starttime to forever");
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
System.err.println(" scanbatch scanner batch size to support intra row scans");
System.err.println(" versions number of cell versions to include");
System.err.println(" families comma-separated list of families to include");
System.err.println();
System.err.println("Args:");
System.err.println(" tablename Name of the table to hash");
System.err.println(" outputpath Filesystem path to put the output data");
System.err.println();
System.err.println("Examples:");
System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
System.err.println(" $ bin/hbase " +
"org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
+ " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
+ " TestTable /hashes/testTable");
}
private boolean doCommandLine(final String[] args) {
if (args.length < NUM_ARGS) {
printUsage(null);
return false;
}
try {
tableHash.tableName = args[args.length-2];
destPath = new Path(args[args.length-1]);
for (int i = 0; i < args.length - NUM_ARGS; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
printUsage(null);
return false;
}
final String batchSizeArgKey = "--batchsize=";
if (cmd.startsWith(batchSizeArgKey)) {
tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
continue;
}
final String numHashFilesArgKey = "--numhashfiles=";
if (cmd.startsWith(numHashFilesArgKey)) {
tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
continue;
}
final String startRowArgKey = "--startrow=";
if (cmd.startsWith(startRowArgKey)) {
tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
continue;
}
final String stopRowArgKey = "--stoprow=";
if (cmd.startsWith(stopRowArgKey)) {
tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
continue;
}
final String startTimeArgKey = "--starttime=";
if (cmd.startsWith(startTimeArgKey)) {
tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
continue;
}
final String endTimeArgKey = "--endtime=";
if (cmd.startsWith(endTimeArgKey)) {
tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
continue;
}
final String scanBatchArgKey = "--scanbatch=";
if (cmd.startsWith(scanBatchArgKey)) {
tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
continue;
}
final String versionsArgKey = "--versions=";
if (cmd.startsWith(versionsArgKey)) {
tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
continue;
}
final String familiesArgKey = "--families=";
if (cmd.startsWith(familiesArgKey)) {
tableHash.families = cmd.substring(familiesArgKey.length());
continue;
}
printUsage("Invalid argument '" + cmd + "'");
return false;
}
if ((tableHash.startTime != 0 || tableHash.endTime != 0)
&& (tableHash.startTime >= tableHash.endTime)) {
printUsage("Invalid time range filter: starttime="
+ tableHash.startTime + " >= endtime=" + tableHash.endTime);
return false;
}
} catch (Exception e) {
e.printStackTrace();
printUsage("Can't start because " + e.getMessage());
return false;
}
return true;
}
/**
* Main entry point.
*/
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
System.exit(ret);
}
@Override
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (!doCommandLine(otherArgs)) {
return 1;
}
Job job = createSubmittableJob(otherArgs);
writeTempManifestFile();
if (!job.waitForCompletion(true)) {
LOG.info("Map-reduce job failed!");
return 1;
}
completeManifest();
return 0;
}
}