blob: c78ae754e344288ee54c3434df12fc9fca177645 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.hbase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import org.apache.commons.io.IOUtils;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.fn.FilterFns;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.io.CompositePathIterable;
import org.apache.crunch.io.seq.SeqFileReaderFactory;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.apache.crunch.types.writable.Writables.nulls;
import static org.apache.crunch.types.writable.Writables.tableOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class HFileTargetIT implements Serializable {
private static HBaseTestingUtility HBASE_TEST_UTILITY;
private static final byte[] TEST_FAMILY = Bytes.toBytes("test_family");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("count");
private static final Path TEMP_DIR = new Path("/tmp");
private static final Random RANDOM = new Random();
private static final FilterFn<String> SHORT_WORD_FILTER = new FilterFn<String>() {
@Override
public boolean accept(String input) {
return input.length() <= 2;
}
};
@Rule
public transient TemporaryPath tmpDir = TemporaryPaths.create();
@BeforeClass
public static void setUpClass() throws Exception {
// We have to use mini mapreduce cluster, because LocalJobRunner allows only a single reducer
// (we will need it to test bulk load against multiple regions).
Configuration conf = HBaseConfiguration.create();
// Workaround for HBASE-5711, we need to set config value dfs.datanode.data.dir.perm
// equal to the permissions of the temp dirs on the filesystem. These temp dirs were
// probably created using this process' umask. So we guess the temp dir permissions as
// 0777 & ~umask, and use that to set the config value.
Process process = Runtime.getRuntime().exec("/bin/sh -c umask");
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("UTF-8")));
int rc = process.waitFor();
if(rc == 0) {
String umask = br.readLine();
int umaskBits = Integer.parseInt(umask, 8);
int permBits = 0777 & ~umaskBits;
String perms = Integer.toString(permBits, 8);
conf.set("dfs.datanode.data.dir.perm", perms);
}
HBASE_TEST_UTILITY = new HBaseTestingUtility(conf);
HBASE_TEST_UTILITY.startMiniCluster(1);
}
private static HTable createTable(int splits) throws Exception {
HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
return createTable(splits, hcol);
}
private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception {
byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000));
HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
HTableDescriptor htable = new HTableDescriptor(tableName);
for (HColumnDescriptor hcol : hcols) {
htable.addFamily(hcol);
}
admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000);
return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName);
}
@AfterClass
public static void tearDownClass() throws Exception {
HBASE_TEST_UTILITY.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
FileSystem fs = HBASE_TEST_UTILITY.getTestFileSystem();
fs.delete(TEMP_DIR, true);
}
@Test
public void testHFileTarget() throws Exception {
Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
Path inputPath = copyResourceFileToHDFS("shakes.txt");
Path outputPath = getTempPathOnHDFS("out");
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
PTable<String, Long> wordCounts = words.count();
pipeline.write(convertToKeyValues(wordCounts), ToHBase.hfile(outputPath));
PipelineResult result = pipeline.run();
assertTrue(result.succeeded());
FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration());
KeyValue kv = readFromHFiles(fs, outputPath, "and");
assertEquals(427L, Bytes.toLong(kv.getValue()));
}
@Test
public void testBulkLoad() throws Exception {
Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
Path inputPath = copyResourceFileToHDFS("shakes.txt");
Path outputPath = getTempPathOnHDFS("out");
byte[] columnFamilyA = Bytes.toBytes("colfamA");
byte[] columnFamilyB = Bytes.toBytes("colfamB");
HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB));
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
PTable<String,Long> wordCounts = words.count();
PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB);
HFileUtils.writePutsToHFilesForIncrementalLoad(
wordCountPuts,
testTable,
outputPath);
PipelineResult result = pipeline.run();
assertTrue(result.succeeded());
new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration())
.doBulkLoad(outputPath, testTable);
Map<String, Long> EXPECTED = ImmutableMap.<String, Long>builder()
.put("__EMPTY__", 1470L)
.put("the", 620L)
.put("and", 427L)
.put("of", 396L)
.put("to", 367L)
.build();
for (Map.Entry<String, Long> e : EXPECTED.entrySet()) {
assertEquals((long) e.getValue(), getWordCountFromTable(testTable, columnFamilyA, e.getKey()));
assertEquals((long) e.getValue(), getWordCountFromTable(testTable, columnFamilyB, e.getKey()));
}
}
/** See CRUNCH-251 */
@Test
public void testMultipleHFileTargets() throws Exception {
Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
Path inputPath = copyResourceFileToHDFS("shakes.txt");
Path outputPath1 = getTempPathOnHDFS("out1");
Path outputPath2 = getTempPathOnHDFS("out2");
HTable table1 = createTable(26);
HTable table2 = createTable(26);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration());
boolean onlyAffectedRegions = true;
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
PCollection<String> shortWords = words.filter(SHORT_WORD_FILTER);
PCollection<String> longWords = words.filter(FilterFns.not(SHORT_WORD_FILTER));
PTable<String, Long> shortWordCounts = shortWords.count();
PTable<String, Long> longWordCounts = longWords.count();
HFileUtils.writePutsToHFilesForIncrementalLoad(
convertToPuts(shortWordCounts),
table1,
outputPath1);
HFileUtils.writePutsToHFilesForIncrementalLoad(
convertToPuts(longWordCounts),
table2,
outputPath2,
onlyAffectedRegions);
PipelineResult result = pipeline.run();
assertTrue(result.succeeded());
loader.doBulkLoad(outputPath1, table1);
loader.doBulkLoad(outputPath2, table2);
assertEquals(396L, getWordCountFromTable(table1, "of"));
assertEquals(427L, getWordCountFromTable(table2, "and"));
}
@Test
public void testHFileUsesFamilyConfig() throws Exception {
DataBlockEncoding newBlockEncoding = DataBlockEncoding.PREFIX;
assertNotSame(newBlockEncoding, DataBlockEncoding.valueOf(HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING));
Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
Path inputPath = copyResourceFileToHDFS("shakes.txt");
Path outputPath = getTempPathOnHDFS("out");
HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
hcol.setDataBlockEncoding(newBlockEncoding);
HTable testTable = createTable(26, hcol);
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
PTable<String,Long> wordCounts = words.count();
PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
HFileUtils.writePutsToHFilesForIncrementalLoad(
wordCountPuts,
testTable,
outputPath);
PipelineResult result = pipeline.run();
assertTrue(result.succeeded());
int hfilesCount = 0;
Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
FileSystem fs = outputPath.getFileSystem(conf);
for (FileStatus e : fs.listStatus(new Path(outputPath, Bytes.toString(TEST_FAMILY)))) {
Path f = e.getPath();
if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
continue;
}
HFile.Reader reader = null;
try {
reader = HFile.createReader(fs, f, new CacheConfig(conf), conf);
assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding());
} finally {
if (reader != null) {
reader.close();
}
}
hfilesCount++;
}
assertTrue(hfilesCount > 0);
}
/**
* @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a>
*/
@Test
public void testOnlyAffectedRegionsWhenWritingHFiles() throws Exception {
Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
Path inputPath = copyResourceFileToHDFS("shakes.txt");
Path outputPath1 = getTempPathOnHDFS("out1");
HTable table1 = createTable(26);
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
// take the top 5 here to reduce the number of affected regions in the table
PTable<String, Long> count = words.filter(SHORT_WORD_FILTER).count().top(5);
boolean onlyAffectedRegions = true;
PCollection<Put> wordPuts = convertToPuts(count);
HFileUtils.writePutsToHFilesForIncrementalLoad(
wordPuts,
table1,
outputPath1,
onlyAffectedRegions);
// locate partition file directory and read it in to verify
// the number of regions to be written to are less than the
// number of regions in the table
String tempPath = ((DistributedPipeline) pipeline).createTempPath().toString();
Path temp = new Path(tempPath.substring(0, tempPath.lastIndexOf("/")));
FileSystem fs = FileSystem.get(pipeline.getConfiguration());
Path partitionPath = null;
for (final FileStatus fileStatus : fs.listStatus(temp)) {
RemoteIterator<LocatedFileStatus> remoteFIles = fs.listFiles(fileStatus.getPath(), true);
while(remoteFIles.hasNext()) {
LocatedFileStatus file = remoteFIles.next();
if(file.getPath().toString().contains("partition")) {
partitionPath = file.getPath();
System.out.println("found written partitions in path: " + partitionPath.toString());
break;
}
}
if(partitionPath != null) {
break;
}
}
if(partitionPath == null) {
throw new AssertionError("Partition path was not found");
}
Class<BytesWritable> keyClass = BytesWritable.class;
List<BytesWritable> writtenPartitions = new ArrayList<>();
WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
SeqFileReaderFactory<BytesWritable> s = new SeqFileReaderFactory<>(keyClass);
// read back in the partition file
Iterator<BytesWritable> iter = CompositePathIterable.create(fs, partitionPath, s).iterator();
while (iter.hasNext()) {
BytesWritable next = iter.next();
writtenPartitions.add((BytesWritable) wdc.deepCopy(next));
}
ImmutableList<byte[]> startKeys = ImmutableList.copyOf(table1.getStartKeys());
// assert that only affected regions were loaded into
assertTrue(startKeys.size() > writtenPartitions.size());
// write out and read back in the start keys for each region.
// do this to get proper byte alignment
Path regionStartKeys = tmpDir.getPath("regionStartKeys");
List<KeyValue> startKeysToWrite = Lists.newArrayList();
for (final byte[] startKey : startKeys.subList(1, startKeys.size())) {
startKeysToWrite.add(KeyValueUtil.createFirstOnRow(startKey));
}
writeToSeqFile(pipeline.getConfiguration(), regionStartKeys, startKeysToWrite);
List<BytesWritable> writtenStartKeys = new ArrayList<>();
iter = CompositePathIterable.create(fs, partitionPath, s).iterator();
while (iter.hasNext()) {
BytesWritable next = iter.next();
writtenStartKeys.add((BytesWritable) wdc.deepCopy(next));
}
// assert the keys read back in match start keys for a region on the table
for (final BytesWritable writtenPartition : writtenPartitions) {
boolean foundMatchingKv = false;
for (final BytesWritable writtenStartKey : writtenStartKeys) {
if (writtenStartKey.equals(writtenPartition)) {
foundMatchingKv = true;
break;
}
}
if(!foundMatchingKv) {
throw new AssertionError("Written KeyValue: " + writtenPartition + " did not match any known start keys of the table");
}
}
pipeline.done();
}
private static void writeToSeqFile(
Configuration conf,
Path path,
List<KeyValue> splitPoints) throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(
path.getFileSystem(conf),
conf,
path,
NullWritable.class,
BytesWritable.class);
for (KeyValue key : splitPoints) {
writer.append(NullWritable.get(), HBaseTypes.keyValueToBytes(key));
}
writer.close();
}
private static PCollection<Put> convertToPuts(PTable<String, Long> in) {
return convertToPuts(in, TEST_FAMILY);
}
private static PCollection<Put> convertToPuts(PTable<String, Long> in, final byte[]...columnFamilies) {
return in.parallelDo(new MapFn<Pair<String, Long>, Put>() {
@Override
public Put map(Pair<String, Long> input) {
String w = input.first();
if (w.length() == 0) {
w = "__EMPTY__";
}
long c = input.second();
Put p = new Put(Bytes.toBytes(w));
for (byte[] columnFamily : columnFamilies) {
p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c));
}
return p;
}
}, HBaseTypes.puts());
}
private static PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) {
return in.parallelDo(new MapFn<Pair<String, Long>, Pair<KeyValue, Void>>() {
@Override
public Pair<KeyValue, Void> map(Pair<String, Long> input) {
String w = input.first();
if (w.length() == 0) {
w = "__EMPTY__";
}
long c = input.second();
Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c));
return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null);
}
}, tableOf(HBaseTypes.keyValues(), nulls()))
.groupByKey(GroupingOptions.builder()
.sortComparatorClass(HFileUtils.KeyValueComparator.class)
.build())
.ungroup()
.keys();
}
private static PCollection<String> split(PCollection<String> in, final String regex) {
return in.parallelDo(new DoFn<String, String>() {
@Override
public void process(String input, Emitter<String> emitter) {
for (String w : input.split(regex)) {
emitter.emit(w);
}
}
}, Writables.strings());
}
/** Reads the first value on a given row from a bunch of hfiles. */
private static KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException {
List<KeyValueScanner> scanners = Lists.newArrayList();
KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row));
for (FileStatus e : fs.listStatus(mrOutputPath)) {
Path f = e.getPath();
if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
continue;
}
StoreFile.Reader reader = new StoreFile.Reader(
fs,
f,
new CacheConfig(fs.getConf()),
fs.getConf());
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
scanner.seek(fakeKV); // have to call seek of each underlying scanner, otherwise KeyValueHeap won't work
scanners.add(scanner);
}
assertTrue(!scanners.isEmpty());
KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
boolean seekOk = kvh.seek(fakeKV);
assertTrue(seekOk);
Cell kv = kvh.next();
kvh.close();
return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of());
}
private static Path copyResourceFileToHDFS(String resourceName) throws IOException {
Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path resultPath = getTempPathOnHDFS(resourceName);
InputStream in = null;
OutputStream out = null;
try {
in = Resources.getResource(resourceName).openConnection().getInputStream();
out = fs.create(resultPath);
IOUtils.copy(in, out);
} finally {
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(out);
}
return resultPath;
}
private static Path getTempPathOnHDFS(String fileName) throws IOException {
Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path result = new Path(TEMP_DIR, fileName);
return result.makeQualified(fs);
}
private static long getWordCountFromTable(HTable table, String word) throws IOException {
return getWordCountFromTable(table, TEST_FAMILY, word);
}
private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word) throws IOException {
Get get = new Get(Bytes.toBytes(word));
get.addFamily(columnFamily);
byte[] value = table.get(get).value();
if (value == null) {
fail("no such row: " + word);
}
return Bytes.toLong(value);
}
}