blob: 05c6a42d665ad25166373d886fc062fc3ae999a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.hbase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.To;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import static org.apache.crunch.types.writable.Writables.strings;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class HFileSourceIT implements Serializable {
private static byte[] ROW1 = Bytes.toBytes("row1");
private static byte[] ROW2 = Bytes.toBytes("row2");
private static byte[] ROW3 = Bytes.toBytes("row3");
private static byte[] FAMILY1 = Bytes.toBytes("family1");
private static byte[] FAMILY2 = Bytes.toBytes("family2");
private static byte[] FAMILY3 = Bytes.toBytes("family3");
private static byte[] QUALIFIER1 = Bytes.toBytes("qualifier1");
private static byte[] QUALIFIER2 = Bytes.toBytes("qualifier2");
private static byte[] QUALIFIER3 = Bytes.toBytes("qualifier3");
private static byte[] QUALIFIER4 = Bytes.toBytes("qualifier4");
private static byte[] VALUE1 = Bytes.toBytes("value1");
private static byte[] VALUE2 = Bytes.toBytes("value2");
private static byte[] VALUE3 = Bytes.toBytes("value3");
private static byte[] VALUE4 = Bytes.toBytes("value4");
@Rule
public transient TemporaryPath tmpDir = TemporaryPaths.create();
private transient Configuration conf;
@Before
public void setUp() {
conf = tmpDir.getDefaultConfiguration();
}
@Test
public void testHFileSource() throws IOException {
List<KeyValue> kvs = generateKeyValues(100);
Path inputPath = tmpDir.getPath("in");
Path outputPath = tmpDir.getPath("out");
writeKeyValuesToHFile(inputPath, kvs);
Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
PCollection<KeyValue> in = pipeline.read(FromHBase.hfile(inputPath));
PCollection<String> texts = in.parallelDo(new MapFn<KeyValue, String>() {
@Override
public String map(KeyValue input) {
return input.toString();
}
}, strings());
texts.write(To.textFile(outputPath));
PipelineResult result = pipeline.run();
assertTrue(result.succeeded());
List<String> lines = FileUtils.readLines(new File(outputPath.toString(), "part-m-00000"));
assertEquals(kvs.size(), lines.size());
for (int i = 0; i < kvs.size(); i++) {
assertEquals(kvs.get(i).toString(), lines.get(i));
}
}
@Test
public void testReadHFile() throws Exception {
List<KeyValue> kvs = generateKeyValues(100);
assertEquals(kvs, doTestReadHFiles(kvs, new Scan()));
}
@Test
public void testScanHFiles() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW1, FAMILY1, QUALIFIER2, 0, VALUE2));
List<Result> results = doTestScanHFiles(kvs, new Scan());
assertEquals(1, results.size());
Result result = Iterables.getOnlyElement(results);
assertArrayEquals(ROW1, result.getRow());
assertEquals(2, result.raw().length);
assertArrayEquals(VALUE1, result.getColumnLatest(FAMILY1, QUALIFIER1).getValue());
assertArrayEquals(VALUE2, result.getColumnLatest(FAMILY1, QUALIFIER2).getValue());
}
@Test
public void testScanHFiles_maxVersions() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 3, VALUE3),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2));
Scan scan = new Scan();
scan.setMaxVersions(2);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(1, results.size());
Result result = Iterables.getOnlyElement(results);
List<KeyValue> kvs2 = result.getColumn(FAMILY1, QUALIFIER1);
assertEquals(3, kvs2.size());
assertArrayEquals(VALUE3, kvs2.get(0).getValue());
assertArrayEquals(VALUE2, kvs2.get(1).getValue());
assertArrayEquals(VALUE1, kvs2.get(2).getValue());
}
@Test
public void testScanHFiles_startStopRows() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW3, FAMILY1, QUALIFIER1, 0, VALUE1));
Scan scan = new Scan();
scan.setStartRow(ROW2);
scan.setStopRow(ROW3);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(1, results.size());
Result result = Iterables.getOnlyElement(results);
assertArrayEquals(ROW2, result.getRow());
}
@Test
public void testScanHFiles_startRowIsTooSmall() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW3, FAMILY1, QUALIFIER1, 0, VALUE1));
Scan scan = new Scan();
scan.setStartRow(ROW1);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(2, results.size());
assertArrayEquals(ROW2, kvs.get(0).getRow());
assertArrayEquals(ROW3, kvs.get(1).getRow());
}
//@Test
public void testScanHFiles_startRowIsTooLarge() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1));
Scan scan = new Scan();
scan.setStartRow(ROW3);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(0, results.size());
}
@Test
public void testScanHFiles_startRowDoesNotExist() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW3, FAMILY3, QUALIFIER3, 0, VALUE3));
Scan scan = new Scan();
scan.setStartRow(ROW2);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(1, results.size());
assertArrayEquals(ROW3, results.get(0).getRow());
}
@Test
public void testScanHFiles_familyMap() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
new KeyValue(ROW1, FAMILY2, QUALIFIER2, 0, VALUE2),
new KeyValue(ROW1, FAMILY2, QUALIFIER3, 0, VALUE3),
new KeyValue(ROW1, FAMILY3, QUALIFIER4, 0, VALUE4));
Scan scan = new Scan();
scan.addFamily(FAMILY1);
scan.addColumn(FAMILY2, QUALIFIER2);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(1, results.size());
Result result = Iterables.getOnlyElement(results);
assertEquals(2, result.size());
assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER1));
assertNotNull(result.getColumnLatest(FAMILY2, QUALIFIER2));
}
@Test
public void testScanHFiles_timeRange() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
new KeyValue(ROW1, FAMILY1, QUALIFIER2, 2, VALUE2),
new KeyValue(ROW1, FAMILY1, QUALIFIER2, 3, VALUE3));
Scan scan = new Scan();
scan.setTimeRange(2, 3);
List<Result> results = doTestScanHFiles(kvs, scan);
assertEquals(1, results.size());
Result result = Iterables.getOnlyElement(results);
assertEquals(1, result.size());
assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2));
}
@Test
public void testScanHFiles_delete() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.Delete));
List<Result> results = doTestScanHFiles(kvs, new Scan());
assertEquals(1, results.size());
assertArrayEquals(VALUE1, results.get(0).getValue(FAMILY1, QUALIFIER1));
}
@Test
public void testScanHFiles_deleteColumn() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.DeleteColumn));
List<Result> results = doTestScanHFiles(kvs, new Scan());
assertEquals(0, results.size());
}
@Test
public void testScanHFiles_deleteFamily() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
new KeyValue(ROW1, FAMILY1, QUALIFIER2, 2, VALUE2),
new KeyValue(ROW1, FAMILY1, QUALIFIER3, 3, VALUE3),
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.DeleteFamily));
List<Result> results = doTestScanHFiles(kvs, new Scan());
assertEquals(1, results.size());
assertNull(results.get(0).getValue(FAMILY1, QUALIFIER1));
assertNull(results.get(0).getValue(FAMILY1, QUALIFIER2));
assertArrayEquals(VALUE3, results.get(0).getValue(FAMILY1, QUALIFIER3));
}
private List<Result> doTestScanHFiles(List<KeyValue> kvs, Scan scan) throws IOException {
Path inputPath = tmpDir.getPath("in");
writeKeyValuesToHFile(inputPath, kvs);
Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
PCollection<Result> results = HFileUtils.scanHFiles(pipeline, inputPath, scan);
return ImmutableList.copyOf(results.materialize());
}
private List<KeyValue> doTestReadHFiles(List<KeyValue> kvs, Scan scan) throws IOException {
Path inputPath = tmpDir.getPath("in");
writeKeyValuesToHFile(inputPath, kvs);
Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
PCollection<KeyValue> results = pipeline.read(FromHBase.hfile(inputPath));
return ImmutableList.copyOf(results.materialize());
}
private List<KeyValue> generateKeyValues(int count) {
List<KeyValue> kvs = Lists.newArrayList();
for (int i = 0; i < count; i++) {
kvs.add(new KeyValue(
Bytes.toBytes("row_" + i),
Bytes.toBytes("family"),
Bytes.toBytes("qualifier_" + i)));
}
Collections.sort(kvs, KeyValue.COMPARATOR);
return kvs;
}
private Path writeKeyValuesToHFile(Path inputPath, List<KeyValue> kvs) throws IOException {
HFile.Writer w = null;
try {
List<KeyValue> sortedKVs = Lists.newArrayList(kvs);
Collections.sort(sortedKVs, KeyValue.COMPARATOR);
FileSystem fs = FileSystem.get(conf);
w = HFile.getWriterFactory(conf, new CacheConfig(conf))
.withPath(fs, inputPath)
.withComparator(KeyValue.COMPARATOR)
.create();
for (KeyValue kv : sortedKVs) {
w.append(kv);
}
return inputPath;
} finally {
IOUtils.closeQuietly(w);
}
}
}