blob: e051c5826e2b564176162e564321f1dcb32e8e8c [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Tool to load the output of HFileOutputFormat into an existing table.
* @see #usage()
*/
public class LoadIncrementalHFiles extends Configured implements Tool {
static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
public static String NAME = "completebulkload";
public LoadIncrementalHFiles(Configuration conf) {
super(conf);
}
public LoadIncrementalHFiles() {
super();
}
private void usage() {
System.err.println("usage: " + NAME +
" /path/to/hfileoutputformat-output " +
"tablename");
}
/**
* Represents an HFile waiting to be loaded. An queue is used
* in this class in order to support the case where a region has
* split during the process of the load. When this happens,
* the HFile is split into two physical parts across the new
* region boundary, and each part is added back into the queue.
* The import process finishes when the queue is empty.
*/
private static class LoadQueueItem {
final byte[] family;
final Path hfilePath;
public LoadQueueItem(byte[] family, Path hfilePath) {
this.family = family;
this.hfilePath = hfilePath;
}
}
/**
* Walk the given directory for all HFiles, and return a Queue
* containing all such files.
*/
private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
throws IOException {
FileSystem fs = hfofDir.getFileSystem(getConf());
if (!fs.exists(hfofDir)) {
throw new FileNotFoundException("HFileOutputFormat dir " +
hfofDir + " not found");
}
FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
if (familyDirStatuses == null) {
throw new FileNotFoundException("No families found in " + hfofDir);
}
Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
for (FileStatus stat : familyDirStatuses) {
if (!stat.isDir()) {
LOG.warn("Skipping non-directory " + stat.getPath());
continue;
}
Path familyDir = stat.getPath();
// Skip _logs, etc
if (familyDir.getName().startsWith("_")) continue;
byte[] family = familyDir.getName().getBytes();
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
for (Path hfile : hfiles) {
if (hfile.getName().startsWith("_")) continue;
ret.add(new LoadQueueItem(family, hfile));
}
}
return ret;
}
/**
* Perform a bulk load of the given directory into the given
* pre-existing table.
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
* @param table the table to load into
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, HTable table)
throws TableNotFoundException, IOException
{
HConnection conn = table.getConnection();
if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " +
Bytes.toStringBinary(table.getTableName()) +
"is not currently available.");
}
Deque<LoadQueueItem> queue = null;
try {
queue = discoverLoadQueue(hfofDir);
while (!queue.isEmpty()) {
LoadQueueItem item = queue.remove();
tryLoad(item, conn, table.getTableName(), queue);
}
} finally {
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
err.append("-------------------------------------------------\n");
for (LoadQueueItem q : queue) {
err.append(" ").append(q.hfilePath).append('\n');
}
LOG.error(err);
}
}
}
/**
* Attempt to load the given load queue item into its target region server.
* If the hfile boundary no longer fits into a region, physically splits
* the hfile such that the new bottom half will fit, and adds the two
* resultant hfiles back into the load queue.
*/
private void tryLoad(final LoadQueueItem item,
HConnection conn, final byte[] table,
final Deque<LoadQueueItem> queue)
throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
final byte[] first, last;
try {
hfr.loadFileInfo();
first = hfr.getFirstRowKey();
last = hfr.getLastRowKey();
} finally {
hfr.close();
}
LOG.info("Trying to load hfile=" + hfilePath +
" first=" + Bytes.toStringBinary(first) +
" last=" + Bytes.toStringBinary(last));
if (first == null || last == null) {
assert first == null && last == null;
LOG.info("hfile " + hfilePath + " has no entries, skipping");
return;
}
// We use a '_' prefix which is ignored when walking directory trees
// above.
final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
conn.getRegionServerWithRetries(
new ServerCallable<Void>(conn, table, first) {
@Override
public Void call() throws Exception {
LOG.debug("Going to connect to server " + location +
"for row " + Bytes.toStringBinary(row));
HRegionInfo hri = location.getRegionInfo();
if (!hri.containsRange(first, last)) {
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
botOut, topOut);
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
queue.addFirst(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut));
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
return null;
}
byte[] regionName = location.getRegionInfo().getRegionName();
server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
return null;
}
});
}
/**
* Split a storefile into a top and bottom half, maintaining
* the metadata, recreating bloom filters, etc.
*/
static void splitStoreFile(
Configuration conf, Path inFile,
HColumnDescriptor familyDesc, byte[] splitKey,
Path bottomOut, Path topOut) throws IOException
{
// Open reader with no block cache, and not in-memory
Reference topReference = new Reference(splitKey, Range.top);
Reference bottomReference = new Reference(splitKey, Range.bottom);
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
}
/**
* Copy half of an HFile into a new HFile.
*/
private static void copyHFileHalf(
Configuration conf, Path inFile, Path outFile, Reference reference,
HColumnDescriptor familyDescriptor)
throws IOException {
FileSystem fs = inFile.getFileSystem(conf);
HalfStoreFileReader halfReader = null;
StoreFile.Writer halfWriter = null;
try {
halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
int blocksize = familyDescriptor.getBlocksize();
Algorithm compression = familyDescriptor.getCompression();
BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
halfWriter = new StoreFile.Writer(
fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
bloomFilterType, 0);
HFileScanner scanner = halfReader.getScanner(false, false);
scanner.seekTo();
do {
KeyValue kv = scanner.getKeyValue();
halfWriter.append(kv);
} while (scanner.next());
for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
if (shouldCopyHFileMetaKey(entry.getKey())) {
halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
}
}
} finally {
if (halfWriter != null) halfWriter.close();
if (halfReader != null) halfReader.close();
}
}
private static boolean shouldCopyHFileMetaKey(byte[] key) {
return !HFile.isReservedFileInfoKey(key);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
usage();
return -1;
}
Path hfofDir = new Path(args[0]);
HTable table = new HTable(args[1]);
doBulkLoad(hfofDir, table);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new LoadIncrementalHFiles(), args);
}
}