blob: 57fdffbe9a1e42ec363bae38a879c1dd0b0b46e2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static org.apache.crunch.types.writable.Writables.bytes;
import static org.apache.crunch.types.writable.Writables.nulls;
import static org.apache.crunch.types.writable.Writables.tableOf;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.lib.sort.TotalOrderPartitioner;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class HFileUtils {
private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class);
/** Compares {@code KeyValue} by its family, qualifier, timestamp (reversely), type (reversely) and memstoreTS. */
private static final Comparator<KeyValue> KEY_VALUE_COMPARATOR = new Comparator<KeyValue>() {
public int compare(KeyValue l, KeyValue r) {
int cmp;
if ((cmp = compareFamily(l, r)) != 0) {
return cmp;
if ((cmp = compareQualifier(l, r)) != 0) {
return cmp;
if ((cmp = compareTimestamp(l, r)) != 0) {
return cmp;
if ((cmp = compareType(l, r)) != 0) {
return cmp;
return 0;
private int compareFamily(KeyValue l, KeyValue r) {
return Bytes.compareTo(
l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(),
r.getBuffer(), r.getFamilyOffset(), r.getFamilyLength());
private int compareQualifier(KeyValue l, KeyValue r) {
return Bytes.compareTo(
l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(),
r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength());
private int compareTimestamp(KeyValue l, KeyValue r) {
// These arguments are intentionally reversed, with r then l, to sort
// the timestamps in descending order as is expected by HBase
return, l.getTimestamp());
private int compareType(KeyValue l, KeyValue r) {
return (int) r.getType() - (int) l.getType();
private static class FilterByFamilyFn<C extends Cell> extends FilterFn<C> {
private final byte[] family;
private FilterByFamilyFn(byte[] family) { = family;
public boolean accept(C input) {
return Bytes.equals(
input.getFamilyArray(), input.getFamilyOffset(), input.getFamilyLength(),
family, 0, family.length);
public boolean disableDeepCopy() {
return true;
private static class StartRowFilterFn<C extends Cell> extends FilterFn<C> {
private final byte[] startRow;
private StartRowFilterFn(byte[] startRow) {
this.startRow = startRow;
public boolean accept(C input) {
return Bytes.compareTo(
input.getRowArray(), input.getRowOffset(), input.getRowLength(),
startRow, 0, startRow.length) >= 0;
private static class StopRowFilterFn<C extends Cell> extends FilterFn<C> {
private final byte[] stopRow;
private StopRowFilterFn(byte[] stopRow) {
this.stopRow = stopRow;
public boolean accept(C input) {
return Bytes.compareTo(
input.getRowArray(), input.getRowOffset(), input.getRowLength(),
stopRow, 0, stopRow.length) < 0;
private static class FamilyMapFilterFn<C extends Cell> extends FilterFn<C> {
private static class Column implements Serializable {
private final byte[] family;
private final byte[] qualifier;
private Column(byte[] family, byte[] qualifier) { = family;
this.qualifier = qualifier;
private byte[] getFamily() {
return family;
private byte[] getQualifier() {
return qualifier;
private final List<byte[]> families = Lists.newArrayList();
private final List<Column> qualifiers = Lists.newArrayList();
private transient Set<ByteBuffer> familySet;
private transient Set<Pair<ByteBuffer, ByteBuffer>> qualifierSet;
private FamilyMapFilterFn(Map<byte[], NavigableSet<byte[]>> familyMap) {
// Holds good families and qualifiers in Lists, as ByteBuffer is not Serializable.
for (Map.Entry<byte[], NavigableSet<byte[]>> e : familyMap.entrySet()) {
byte[] f = e.getKey();
if (e.getValue() == null) {
} else {
for (byte[] q : e.getValue()) {
qualifiers.add(new Column(f, q));
public void initialize() {
ImmutableSet.Builder<ByteBuffer> familiySetBuilder = ImmutableSet.builder();
ImmutableSet.Builder<Pair<ByteBuffer, ByteBuffer>> qualifierSetBuilder
= ImmutableSet.builder();
for (byte[] f : families) {
for (Column e : qualifiers) {
byte[] f = e.getFamily();
byte[] q = e.getQualifier();
qualifierSetBuilder.add(Pair.of(ByteBuffer.wrap(f), ByteBuffer.wrap(q)));
this.familySet =;
this.qualifierSet =;
public boolean accept(C input) {
ByteBuffer f = ByteBuffer.wrap(input.getFamilyArray(), input.getFamilyOffset(), input.getFamilyLength());
ByteBuffer q = ByteBuffer.wrap(input.getQualifierArray(), input.getQualifierOffset(), input.getQualifierLength());
return familySet.contains(f) || qualifierSet.contains(Pair.of(f, q));
private static class TimeRangeFilterFn<C extends Cell> extends FilterFn<C> {
private final long minTimestamp;
private final long maxTimestamp;
private TimeRangeFilterFn(TimeRange timeRange) {
// Can't save TimeRange to member directly, as it is not Serializable.
this.minTimestamp = timeRange.getMin();
this.maxTimestamp = timeRange.getMax();
public boolean accept(C input) {
return (minTimestamp <= input.getTimestamp() && input.getTimestamp() < maxTimestamp);
public static class KeyValueComparator implements RawComparator<BytesWritable> {
public int compare(byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength) {
// BytesWritable serialize length in first 4 bytes.
// We simply ignore it here, because KeyValue has its own size serialized.
if (llength < 4) {
throw new AssertionError("Too small llength: " + llength);
if (rlength < 4) {
throw new AssertionError("Too small rlength: " + rlength);
Cell leftKey = HBaseTypes.bytesToKeyValue(left, loffset + 4, llength - 4);
Cell rightKey = HBaseTypes.bytesToKeyValue(right, roffset + 4, rlength - 4);
byte[] lRow = leftKey.getRow();
byte[] rRow = rightKey.getRow();
int rowCmp = Bytes.compareTo(lRow, rRow);
if (rowCmp != 0) {
return rowCmp;
} else {
return, rightKey);
public int compare(BytesWritable left, BytesWritable right) {
private static class ExtractRowFn<C extends Cell> extends MapFn<C, ByteBuffer> {
public ByteBuffer map(Cell input) {
// we have to make a copy of row, because the buffer may be changed after this call
return ByteBuffer.wrap(CellUtil.cloneRow(input));
public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) {
return scanHFiles(pipeline, path, new Scan());
* Scans HFiles with filter conditions.
* @param pipeline the pipeline
* @param path path to HFiles
* @param scan filtering conditions
* @return {@code Result}s
* @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan) {
return scanHFiles(pipeline, ImmutableList.of(path), scan);
public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan) {
PCollection<KeyValue> in = HFileSource(paths, scan));
return combineIntoRow(in, scan);
public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells) {
return combineIntoRow(cells, new Scan());
* Converts a bunch of {@link KeyValue}s into {@link Result}.
* All {@code KeyValue}s belong to the same row are combined. Users may provide some filter
* conditions (specified by {@code scan}). Deletes are dropped and only a specified number
* of versions are kept.
* @param cells the input {@code KeyValue}s
* @param scan filter conditions, currently we support start row, stop row and family map
* @return {@code Result}s
public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells, Scan scan) {
if (!Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) {
cells = cells.filter(new StartRowFilterFn<C>(scan.getStartRow()));
if (!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
cells = cells.filter(new StopRowFilterFn<C>(scan.getStopRow()));
if (scan.hasFamilies()) {
cells = cells.filter(new FamilyMapFilterFn<C>(scan.getFamilyMap()));
TimeRange timeRange = scan.getTimeRange();
if (timeRange != null && (timeRange.getMin() > 0 || timeRange.getMax() < Long.MAX_VALUE)) {
cells = cells.filter(new TimeRangeFilterFn<C>(timeRange));
// TODO(chaoshi): support Scan#getFilter
PTable<ByteBuffer, C> cellsByRow = ExtractRowFn<C>(), bytes());
final int versions = scan.getMaxVersions();
return cellsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow",
new DoFn<Pair<ByteBuffer, Iterable<C>>, Result>() {
public void process(Pair<ByteBuffer, Iterable<C>> input, Emitter<Result> emitter) {
List<KeyValue> cells = Lists.newArrayList();
for (Cell kv : input.second()) {
try {
cells.add(KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of())); // assuming the input fits into memory
} catch (Exception e) {
throw new RuntimeException(e);
Result result = doCombineIntoRow(cells, versions);
if (result == null) {
}, HBaseTypes.results());
public static <C extends Cell> void writeToHFilesForIncrementalLoad(
PCollection<C> cells,
HTable table,
Path outputPath) throws IOException {
writeToHFilesForIncrementalLoad(cells, table, outputPath, false);
* Writes out HFiles from the provided <code>cells</code> and <code>table</code>. <code>limitToAffectedRegions</code>
* is used to indicate that the regions the <code>cells</code> will be loaded into should be identified prior to writing
* HFiles. Identifying the regions ahead of time will reduce the number of reducers needed when writing. This is
* beneficial if the data to be loaded only touches a small enough subset of the total regions in the table. If set to
* false, the number of reducers will equal the number of regions in the table.
* @see <a href=''>CRUNCH-588</a>
public static <C extends Cell> void writeToHFilesForIncrementalLoad(
PCollection<C> cells,
HTable table,
Path outputPath,
boolean limitToAffectedRegions) throws IOException {
HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
if (families.length == 0) {
LOG.warn("{} has no column families", table);
PCollection<C> partitioned = sortAndPartition(cells, table, limitToAffectedRegions);
for (HColumnDescriptor f : families) {
byte[] family = f.getName();
.filter(new FilterByFamilyFn<C>(family))
.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f));
public static void writePutsToHFilesForIncrementalLoad(
PCollection<Put> puts,
HTable table,
Path outputPath) throws IOException {
writePutsToHFilesForIncrementalLoad(puts, table, outputPath, false);
* Writes out HFiles from the provided <code>puts</code> and <code>table</code>. <code>limitToAffectedRegions</code>
* is used to indicate that the regions the <code>puts</code> will be loaded into should be identified prior to writing
* HFiles. Identifying the regions ahead of time will reduce the number of reducers needed when writing. This is
* beneficial if the data to be loaded only touches a small enough subset of the total regions in the table. If set to
* false, the number of reducers will equal the number of regions in the table.
* @see <a href=''>CRUNCH-588</a>
public static void writePutsToHFilesForIncrementalLoad(
PCollection<Put> puts,
HTable table,
Path outputPath,
boolean limitToAffectedRegions) throws IOException {
PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() {
public void process(Put input, Emitter<Cell> emitter) {
for (Cell cell : Iterables.concat(input.getFamilyCellMap().values())) {
}, HBaseTypes.cells());
writeToHFilesForIncrementalLoad(cells, table, outputPath, limitToAffectedRegions);
public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table) throws IOException {
return sortAndPartition(cells, table, false);
* Sorts and partitions the provided <code>cells</code> for the given <code>table</code> to ensure all elements that belong
* in the same region end up in the same reducer. The flag <code>limitToAffectedRegions</code>, when set to true, will identify
* the regions the data in <code>cells</code> belongs to and will set the number of reducers equal to the number of identified
* affected regions. If set to false, then all regions will be used, and the number of reducers will be set to the number
* of regions in the table.
public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table, boolean limitToAffectedRegions) throws IOException {
Configuration conf = cells.getPipeline().getConfiguration();
PTable<C, Void> t = cells.parallelDo(
new MapFn<C, Pair<C, Void>>() {
public Pair<C, Void> map(C input) {
return Pair.of(input, (Void) null);
}, tableOf(cells.getPType(), nulls()));
List<KeyValue> splitPoints;
if(limitToAffectedRegions) {
splitPoints = getSplitPoints(table, t);
} else {
splitPoints = getSplitPoints(table);
Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "partition");
writePartitionInfo(conf, partitionFile, splitPoints);
GroupingOptions options = GroupingOptions.builder()
.conf(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString())
.numReducers(splitPoints.size() + 1)
return t.groupByKey(options).ungroup().keys();
private static List<KeyValue> getSplitPoints(HTable table) throws IOException {
List<byte[]> startKeys = ImmutableList.copyOf(table.getStartKeys());
if (startKeys.isEmpty()) {
throw new AssertionError(table + " has no regions!");
List<KeyValue> splitPoints = Lists.newArrayList();
for (byte[] startKey : startKeys.subList(1, startKeys.size())) {
KeyValue kv = KeyValueUtil.createFirstOnRow(startKey);
LOG.debug("split row: " + Bytes.toString(CellUtil.cloneRow(kv)));
return splitPoints;
private static <C> List<KeyValue> getSplitPoints(HTable table, PTable<C, Void> affectedRows) throws IOException {
List<byte[]> startKeys;
try {
startKeys = Lists.newArrayList(table.getStartKeys());
if (startKeys.isEmpty()) {
throw new AssertionError(table + " has no regions!");
} catch (IOException e) {
throw new CrunchRuntimeException(e);
Collections.sort(startKeys, Bytes.BYTES_COMPARATOR);
Iterable<ByteBuffer> bufferedStartKeys = affectedRows
.parallelDo(new DetermineAffectedRegionsFn(startKeys), Writables.bytes()).materialize();
// set to get rid of the potential duplicate start keys emitted
ImmutableSet.Builder<KeyValue> startKeyBldr = ImmutableSet.builder();
for (final ByteBuffer bufferedStartKey : bufferedStartKeys) {
return ImmutableList.copyOf(;
* Spins through the {@link Cell}s and determines which regions the data
* will be loaded into. Searching the regions is done via a binary search. The
* region start key should be provided by the caller to cut down on calls to
* HMaster to get those start keys.
public static class DetermineAffectedRegionsFn<C extends Cell> extends DoFn<Pair<C, Void>, ByteBuffer> {
private final Set<Cell> startKeysToEmit = new HashSet<>();
List<byte[]> startKeys;
TotalOrderPartitioner.Node partitions;
List<Cell> regionStartKeys = Lists.newArrayList();
public DetermineAffectedRegionsFn(List<byte[]> startKeys) {
this.startKeys = startKeys;
public void initialize() {
for (byte[] startKey : startKeys.subList(1, startKeys.size())) {
Cell cell = KeyValueUtil.createFirstOnRow(startKey);
partitions = new TotalOrderPartitioner.BinarySearchNode<>(regionStartKeys.toArray(new Cell[regionStartKeys.size()]),
new KeyValue.KVComparator());
public void process(Pair<C, Void> input, Emitter<ByteBuffer> emitter) {
int position = partitions.findPartition(new KeyValue(input.first().getFamilyArray()));
// if the position is after the last key, use the last start key
// as the split for this key, since it should fall into that region
if (position >= regionStartKeys.size() && regionStartKeys.size() > 1) {
position = regionStartKeys.size() - 1;
Cell foundCell = regionStartKeys.get(position);
if (!startKeysToEmit.contains(foundCell)) {
private static void writePartitionInfo(
Configuration conf,
Path path,
List<KeyValue> splitPoints) throws IOException {"Writing {} split points to {}", splitPoints.size(), path);
SequenceFile.Writer writer = SequenceFile.createWriter(
for (KeyValue key : splitPoints) {
writer.append(NullWritable.get(), HBaseTypes.keyValueToBytes(key));
private static Result doCombineIntoRow(List<KeyValue> kvs, int versions) {
// shortcut for the common case
if (kvs.isEmpty()) {
return null;
if (kvs.size() == 1 && kvs.get(0).getType() == KeyValue.Type.Put.getCode()) {
return new Result(kvs);
kvs = maybeDeleteFamily(kvs);
// In-place sort KeyValues by family, qualifier and then timestamp reversely (whenever ties, deletes appear first).
Collections.sort(kvs, KEY_VALUE_COMPARATOR);
List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size());
for (int i = 0, j; i < kvs.size(); i = j) {
j = i + 1;
while (j < kvs.size() && hasSameFamilyAndQualifier(kvs.get(i), kvs.get(j))) {
results.addAll(getLatestKeyValuesOfColumn(kvs.subList(i, j), versions));
if (results.isEmpty()) {
return null;
return new Result(results);
* In-place removes any {@link KeyValue}s whose timestamp is less than or equal to the
* delete family timestamp. Also removes the delete family {@code KeyValue}s.
private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) {
long deleteFamilyCut = -1;
for (KeyValue kv : kvs) {
if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp());
if (deleteFamilyCut == 0) {
return kvs;
List<KeyValue> results = Lists.newArrayList();
for (KeyValue kv : kvs) {
if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
if (kv.getTimestamp() <= deleteFamilyCut) {
return results;
private static boolean hasSameFamilyAndQualifier(KeyValue l, KeyValue r) {
return Bytes.equals(
l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(),
r.getBuffer(), r.getFamilyOffset(), r.getFamilyLength())
&& Bytes.equals(
l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(),
r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength());
* Goes over the given {@link KeyValue}s and remove {@code Delete}s and {@code DeleteColumn}s.
* @param kvs {@code KeyValue}s that of same row and column and sorted by timestamps in
* descending order
* @param versions the number of versions to keep
* @return the resulting {@code KeyValue}s that contains only {@code Put}s
private static List<KeyValue> getLatestKeyValuesOfColumn(List<KeyValue> kvs, int versions) {
if (kvs.isEmpty()) {
return kvs;
if (kvs.get(0).getType() == KeyValue.Type.Put.getCode()) {
return kvs; // shortcut for the common case
List<KeyValue> results = Lists.newArrayListWithCapacity(versions);
long previousDeleteTimestamp = -1;
for (KeyValue kv : kvs) {
if (results.size() >= versions) {
if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) {
} else if (kv.getType() == KeyValue.Type.Put.getCode()) {
if (kv.getTimestamp() != previousDeleteTimestamp) {
} else if (kv.getType() == KeyValue.Type.Delete.getCode()) {
previousDeleteTimestamp = kv.getTimestamp();
} else {
throw new AssertionError("Unexpected KeyValue type: " + kv.getType());
return results;