blob: 1f0eb94ddbdd2f49321775700f3954895487a68d [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Test cases for the "load" half of the HFileOutputFormat bulk load
* functionality. These tests run faster than the full MR cluster
* tests in TestHFileOutputFormat
*/
public class TestLoadIncrementalHFiles {
private static final byte[] TABLE = Bytes.toBytes("mytable");
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
private static final byte[] FAMILY = Bytes.toBytes("myfam");
private static final byte[][] SPLIT_KEYS = new byte[][] {
Bytes.toBytes("ddd"),
Bytes.toBytes("ppp")
};
public static int BLOCKSIZE = 64*1024;
public static String COMPRESSION =
Compression.Algorithm.NONE.getName();
private HBaseTestingUtility util = new HBaseTestingUtility();
/**
* Test case that creates some regions and loads
* HFiles that fit snugly inside those regions
*/
@Test
public void testSimpleLoad() throws Exception {
runTest("testSimpleLoad",
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
});
}
/**
* Test case that creates some regions and loads
* HFiles that cross the boundaries of those regions
*/
@Test
public void testRegionCrossingLoad() throws Exception {
runTest("testRegionCrossingLoad",
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
});
}
private void runTest(String testName, byte[][][] hfileRanges)
throws Exception {
Path dir = HBaseTestingUtility.getTestDir(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++),
FAMILY, QUALIFIER, from, to, 1000);
}
int expectedRows = hfileIdx * 1000;
util.startMiniCluster();
try {
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
HTableDescriptor htd = new HTableDescriptor(TABLE);
htd.addFamily(new HColumnDescriptor(FAMILY));
admin.createTable(htd, SPLIT_KEYS);
HTable table = new HTable(util.getConfiguration(), TABLE);
util.waitTableAvailable(TABLE, 30000);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
util.getConfiguration());
loader.doBulkLoad(dir, table);
assertEquals(expectedRows, util.countRows(table));
} finally {
util.shutdownMiniCluster();
}
}
@Test
public void testSplitStoreFile() throws IOException {
Path dir = HBaseTestingUtility.getTestDir("testSplitHFile");
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
createHFile(fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");
LoadIncrementalHFiles.splitStoreFile(
util.getConfiguration(), testIn,
familyDesc, Bytes.toBytes("ggg"),
bottomOut,
topOut);
int rowCount = verifyHFile(bottomOut);
rowCount += verifyHFile(topOut);
assertEquals(1000, rowCount);
}
private int verifyHFile(Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader = new HFile.Reader(
p.getFileSystem(conf), p, null, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();
int count = 0;
do {
count++;
} while (scanner.next());
assertTrue(count > 0);
return count;
}
/**
* Create an HFile with the given number of rows between a given
* start key and end key.
* TODO put me in an HFileTestUtil or something?
*/
static void createHFile(
FileSystem fs, Path path,
byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException
{
HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
KeyValue.KEY_COMPARATOR);
long now = System.currentTimeMillis();
try {
// subtract 2 since iterateOnSplits doesn't include boundary keys
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
writer.append(kv);
}
} finally {
writer.close();
}
}
}