blob: 1953253bbc39bee041622805d2febf38e395a4b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.backend.hadoop.hbase;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
import org.apache.pig.LoadStoreCaster;
import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.joda.time.DateTime;
import com.google.common.collect.Lists;
/**
* A HBase implementation of LoadFunc and StoreFunc.
* <P>
* Below is an example showing how to load data from HBase:
* <pre>{@code
* raw = LOAD 'hbase://SampleTable'
* USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
* 'info:first_name info:last_name friends:* info:*', '-loadKey true -limit 5')
* AS (id:bytearray, first_name:chararray, last_name:chararray, friends_map:map[], info_map:map[]);
* }</pre>
* This example loads data redundantly from the info column family just to
* illustrate usage. Note that the row key is inserted first in the result schema.
* To load only column names that start with a given prefix, specify the column
* name with a trailing '*'. For example passing <code>friends:bob_*</code> to
* the constructor in the above example would cause only columns that start with
* <i>bob_</i> to be loaded.
* <P>
* Note that when using a prefix like <code>friends:bob_*</code>, explicit HBase filters are set for
* all columns and prefixes specified. Querying HBase with many filters can cause performance
* degredation. This is typically seen when mixing one or more prefixed descriptors with a large list
* of columns. In that case better perfomance will be seen by either loading the entire family via
* <code>friends:*</code> or by specifying explicit column descriptor names.
* <P>
* Below is an example showing how to store data into HBase:
* <pre>{@code
* copy = STORE raw INTO 'hbase://SampleTableCopy'
* USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
* 'info:first_name info:last_name friends:* info:*');
* }</pre>
* Note that STORE will expect the first value in the tuple to be the row key.
* Scalars values need to map to an explicit column descriptor and maps need to
* map to a column family name. In the above examples, the <code>friends</code>
* column family data from <code>SampleTable</code> will be written to a
* <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
*
*/
public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc, StoreResources,
CollectableLoadFunc {
private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
private final static String STRING_CASTER = "UTF8StorageConverter";
private final static String BYTE_CASTER = "HBaseBinaryConverter";
private final static String CASTER_PROPERTY = "pig.hbase.caster";
private final static String ASTERISK = "*";
private final static String COLON = ":";
private final static String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication";
private final static String HBASE_CONFIG_SET = "hbase.config.set";
private final static String HBASE_TOKEN_SET = "hbase.token.set";
private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
//Use JobConf to store hbase delegation token
private JobConf m_conf;
private RecordReader reader;
private RecordWriter writer;
private TableOutputFormat outputFormat = null;
private Scan scan;
private String contextSignature = null;
private final CommandLine configuredOptions_;
private final static Options validOptions_ = new Options();
private final static CommandLineParser parser_ = new GnuParser();
private boolean loadRowKey_;
private String delimiter_;
private boolean ignoreWhitespace_;
private final long limit_;
private final boolean cacheBlocks_;
private final int caching_;
private boolean noWAL_;
private final long minTimestamp_;
private final long maxTimestamp_;
private final long timestamp_;
private boolean includeTimestamp_;
private boolean includeTombstone_;
protected transient byte[] gt_;
protected transient byte[] gte_;
protected transient byte[] lt_;
protected transient byte[] lte_;
private String regex_;
private LoadCaster caster_;
private ResourceSchema schema_;
private RequiredFieldList requiredFieldList;
private static void populateValidOptions() {
Option loadKey = OptionBuilder.hasOptionalArgs(1).withArgName("loadKey").withLongOpt("loadKey").withDescription("Load Key").create();
validOptions_.addOption(loadKey);
validOptions_.addOption("gt", true, "Records must be greater than this value " +
"(binary, double-slash-escaped)");
validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
validOptions_.addOption("regex", true, "Record must match this regular expression");
validOptions_.addOption("cacheBlocks", true, "Set whether blocks should be cached for the scan");
validOptions_.addOption("caching", true, "Number of rows scanners should cache");
validOptions_.addOption("limit", true, "Per-region limit");
validOptions_.addOption("maxResultsPerColumnFamily", true, "Limit the maximum number of values returned per row per column family");
validOptions_.addOption("delim", true, "Column delimiter");
validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns");
validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
"HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
Option noWal = OptionBuilder.hasOptionalArgs(1).withArgName("noWAL").withLongOpt("noWAL").withDescription("Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).").create();
validOptions_.addOption(noWal);
validOptions_.addOption("minTimestamp", true, "Record must have timestamp greater or equal to this value");
validOptions_.addOption("maxTimestamp", true, "Record must have timestamp less then this value");
validOptions_.addOption("timestamp", true, "Record must have timestamp equal to this value");
validOptions_.addOption("includeTimestamp", false, "Record will include the timestamp after the rowkey on store (rowkey, timestamp, ...)");
validOptions_.addOption("includeTombstone", false, "Record will include a tombstone marker on store after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, ...)");
}
/**
* Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store the cells of the
* provided columns.
*
* @param columnList
* columnlist that is a presented string delimited by space and/or
* commas. To retreive all columns in a column family <code>Foo</code>,
* specify a column as either <code>Foo:</code> or <code>Foo:*</code>.
* To fetch only columns in the CF that start with <I>bar</I>, specify
* <code>Foo:bar*</code>. The resulting tuple will always be the size
* of the number of tokens in <code>columnList</code>. Items in the
* tuple will be scalar values when a full column descriptor is
* specified, or a map of column descriptors to values when a column
* family is specified.
*
* @throws ParseException when unable to parse arguments
* @throws IOException
*/
public HBaseStorage(String columnList) throws ParseException, IOException {
this(columnList,"");
}
/**
* Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store.
* @param columnList
* @param optString Loader options. Known options:<ul>
* <li>-loadKey=(true|false) Load the row key as the first column
* <li>-gt=minKeyVal
* <li>-lt=maxKeyVal
* <li>-gte=minKeyVal
* <li>-lte=maxKeyVal
* <li>-regex=match regex on KeyVal
* <li>-limit=numRowsPerRegion max number of rows to retrieve per region
* <li>-maxResultsPerColumnFamily= Limit the maximum number of values returned per row per column family
* <li>-delim=char delimiter to use when parsing column names (default is space or comma)
* <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
* <li>-cacheBlocks=(true|false) Set whether blocks should be cached for the scan (default false).
* <li>-caching=numRows number of rows to cache (faster scans, more memory).
* <li>-noWAL=(true|false) Sets the write ahead to false for faster loading.
* <li>-minTimestamp= Scan's timestamp for min timeRange
* <li>-maxTimestamp= Scan's timestamp for max timeRange
* <li>-timestamp= Scan's specified timestamp
* <li>-includeTimestamp= Record will include the timestamp after the rowkey on store (rowkey, timestamp, ...)
* <li>-includeTombstone= Record will include a tombstone marker on store after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, ...)
* <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) Utf8StorageConverter is the default
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
* </ul>
* @throws ParseException
* @throws IOException
*/
public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
populateValidOptions();
String[] optsArr = optString.split(" ");
try {
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-maxResultsPerColumnFamily] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", validOptions_ );
throw e;
}
loadRowKey_ = false;
if (configuredOptions_.hasOption("loadKey")) {
String value = configuredOptions_.getOptionValue("loadKey");
if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) {//the empty string and null check is for backward compat.
loadRowKey_ = true;
}
}
delimiter_ = ",";
if (configuredOptions_.getOptionValue("delim") != null) {
delimiter_ = configuredOptions_.getOptionValue("delim");
}
ignoreWhitespace_ = true;
if (configuredOptions_.hasOption("ignoreWhitespace")) {
String value = configuredOptions_.getOptionValue("ignoreWhitespace");
if (!"true".equalsIgnoreCase(value)) {
ignoreWhitespace_ = false;
}
}
columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
//In mr, UDFContext.deserialize is first called and then UDFContext.getUDFContext().getClientSystemProps() is called,
//the value is not null.
//In spark mode, when spark executor first initializes all
//the object,UDFContext.getUDFContext().getClientSystemProps() is null and then UDFContext.deserialize is called.
//so we need check whether UDFContext.getUDFContext().getClientSystemProps()
//is null or not, if is null, defaultCaster =STRING_CASTER, otherwise is
//UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER)
//Detail see PIG-4920
String defaultCaster = UDFContext.getUDFContext().getClientSystemProps() != null ? UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) : STRING_CASTER;
String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
caster_ = new Utf8StorageConverter();
} else if (BYTE_CASTER.equalsIgnoreCase(casterOption)) {
caster_ = new HBaseBinaryConverter();
} else {
try {
caster_ = (LoadCaster) PigContext.instantiateFuncFromSpec(casterOption);
} catch (ClassCastException e) {
LOG.error("Configured caster does not implement LoadCaster interface.");
throw new IOException(e);
} catch (RuntimeException e) {
LOG.error("Configured caster class not found.", e);
throw new IOException(e);
}
}
LOG.debug("Using caster " + caster_.getClass());
caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
cacheBlocks_ = Boolean.valueOf(configuredOptions_.getOptionValue("cacheBlocks", "false"));
limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
noWAL_ = false;
if (configuredOptions_.hasOption("noWAL")) {
String value = configuredOptions_.getOptionValue("noWAL");
if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
noWAL_ = true;
}
}
if (configuredOptions_.hasOption("minTimestamp")){
minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
} else {
minTimestamp_ = 0;
}
if (configuredOptions_.hasOption("maxTimestamp")){
maxTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("maxTimestamp"));
} else {
maxTimestamp_ = Long.MAX_VALUE;
}
if (configuredOptions_.hasOption("timestamp")){
timestamp_ = Long.parseLong(configuredOptions_.getOptionValue("timestamp"));
} else {
timestamp_ = 0;
}
includeTimestamp_ = false;
if (configuredOptions_.hasOption("includeTimestamp")) {
String value = configuredOptions_.getOptionValue("includeTimestamp");
if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) {//the empty string and null check is for backward compat.
includeTimestamp_ = true;
}
}
includeTombstone_ = false;
if (configuredOptions_.hasOption("includeTombstone")) {
String value = configuredOptions_.getOptionValue("includeTombstone");
if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) {
includeTombstone_ = true;
}
}
initScan();
}
/**
* Returns UDFProperties based on <code>contextSignature</code>.
*/
private Properties getUDFProperties() {
return UDFContext.getUDFContext()
.getUDFProperties(this.getClass(), new String[] {contextSignature});
}
/**
* @return <code> contextSignature + "_projectedFields" </code>
*/
private String projectedFieldsName() {
return contextSignature + "_projectedFields";
}
/**
*
* @param columnList
* @param delimiter
* @param ignoreWhitespace
* @return
*/
private List<ColumnInfo> parseColumnList(String columnList,
String delimiter,
boolean ignoreWhitespace) {
List<ColumnInfo> columnInfo = new ArrayList<ColumnInfo>();
// Default behavior is to allow combinations of spaces and delimiter
// which defaults to a comma. Setting to not ignore whitespace will
// include the whitespace in the columns names
String[] colNames = columnList.split(delimiter);
if(ignoreWhitespace) {
List<String> columns = new ArrayList<String>();
for (String colName : colNames) {
String[] subColNames = colName.split(" ");
for (String subColName : subColNames) {
subColName = subColName.trim();
if (subColName.length() > 0) columns.add(subColName);
}
}
colNames = columns.toArray(new String[columns.size()]);
}
for (String colName : colNames) {
columnInfo.add(new ColumnInfo(colName));
}
return columnInfo;
}
private void initScan() throws IOException{
scan = new Scan();
scan.setCacheBlocks(cacheBlocks_);
scan.setCaching(caching_);
// Set filters, if any.
if (configuredOptions_.hasOption("gt")) {
gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
addRowFilter(CompareOp.GREATER, gt_);
scan.setStartRow(gt_);
}
if (configuredOptions_.hasOption("lt")) {
lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
addRowFilter(CompareOp.LESS, lt_);
scan.setStopRow(lt_);
}
if (configuredOptions_.hasOption("gte")) {
gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
scan.setStartRow(gte_);
}
if (configuredOptions_.hasOption("lte")) {
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
byte[] lt = increment(lte_);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Incrementing lte value of %s from bytes %s to %s to set stop row",
Bytes.toString(lte_), toString(lte_), toString(lt)));
}
if (lt != null) {
scan.setStopRow(increment(lte_));
}
// The WhileMatchFilter will short-circuit the scan after we no longer match. The
// setStopRow call will limit the number of regions we need to scan
addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
}
if (configuredOptions_.hasOption("regex")) {
regex_ = Utils.slashisize(configuredOptions_.getOptionValue("regex"));
addFilter(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regex_)));
}
if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
scan.setTimeRange(minTimestamp_, maxTimestamp_);
}
if (configuredOptions_.hasOption("timestamp")){
scan.setTimeStamp(timestamp_);
}
if (configuredOptions_.hasOption("maxResultsPerColumnFamily")){
int maxResultsPerColumnFamily_ = Integer.valueOf(configuredOptions_.getOptionValue("maxResultsPerColumnFamily"));
scan.setMaxResultsPerColumnFamily(maxResultsPerColumnFamily_);
}
// if the group of columnInfos for this family doesn't contain a prefix, we don't need
// to set any filters, we can just call addColumn or addFamily. See javadocs below.
boolean columnPrefixExists = false;
for (ColumnInfo columnInfo : columnInfo_) {
if (columnInfo.getColumnPrefix() != null) {
columnPrefixExists = true;
break;
}
}
if (!columnPrefixExists) {
addFiltersWithoutColumnPrefix(columnInfo_);
}
else {
addFiltersWithColumnPrefix(columnInfo_);
}
}
/**
* If there is no column with a prefix, we don't need filters, we can just call addColumn and
* addFamily on the scan
*/
private void addFiltersWithoutColumnPrefix(List<ColumnInfo> columnInfos) {
// Need to check for mixed types in a family, so we don't call addColumn
// after addFamily on the same family
Map<String, List<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
for (Entry<String, List<ColumnInfo>> entrySet : groupedMap.entrySet()) {
boolean onlyColumns = true;
for (ColumnInfo columnInfo : entrySet.getValue()) {
if (columnInfo.isColumnMap()) {
onlyColumns = false;
break;
}
}
if (onlyColumns) {
for (ColumnInfo columnInfo : entrySet.getValue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding column to scan via addColumn with cf:name = "
+ Bytes.toString(columnInfo.getColumnFamily()) + ":"
+ Bytes.toString(columnInfo.getColumnName()));
}
scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());
}
} else {
String family = entrySet.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding column family to scan via addFamily with cf:name = "
+ family);
}
scan.addFamily(Bytes.toBytes(family));
}
}
}
/**
* If we have a qualifier with a prefix and a wildcard (i.e. cf:foo*), we need a filter on every
* possible column to be returned as shown below. This will become very inneficient for long
* lists of columns mixed with a prefixed wildcard.
*
* FilterList - must pass ALL of
* - FamilyFilter
* - AND a must pass ONE FilterList of
* - either Qualifier
* - or ColumnPrefixFilter
*
* If we have only column family filters (i.e. cf:*) or explicit column descriptors
* (i.e., cf:foo) or a mix of both then we don't need filters, since the scan will take
* care of that.
*/
private void addFiltersWithColumnPrefix(List<ColumnInfo> columnInfos) {
// we need to apply a CF AND column list filter for each family
FilterList allColumnFilters = null;
Map<String, List<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
for (String cfString : groupedMap.keySet()) {
List<ColumnInfo> columnInfoList = groupedMap.get(cfString);
byte[] cf = Bytes.toBytes(cfString);
// all filters roll up to one parent OR filter
if (allColumnFilters == null) {
allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
}
// each group contains a column family filter AND (all) and an OR (one of) of
// the column filters
FilterList thisColumnGroupFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
thisColumnGroupFilter.addFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(cf)));
FilterList columnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (ColumnInfo colInfo : columnInfoList) {
if (colInfo.isColumnMap()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding family:prefix filters with values " +
Bytes.toString(colInfo.getColumnFamily()) + COLON +
Bytes.toString(colInfo.getColumnPrefix()));
}
// add a PrefixFilter to the list of column filters
if (colInfo.getColumnPrefix() != null) {
columnFilters.addFilter(new ColumnPrefixFilter(
colInfo.getColumnPrefix()));
}
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding family:descriptor filters with values " +
Bytes.toString(colInfo.getColumnFamily()) + COLON +
Bytes.toString(colInfo.getColumnName()));
}
// add a QualifierFilter to the list of column filters
columnFilters.addFilter(new QualifierFilter(CompareOp.EQUAL,
new BinaryComparator(colInfo.getColumnName())));
}
}
if (columnFilters.getFilters().size() != 0) {
thisColumnGroupFilter.addFilter(columnFilters);
}
allColumnFilters.addFilter(thisColumnGroupFilter);
}
if (allColumnFilters != null) {
addFilter(allColumnFilters);
}
}
private void addRowFilter(CompareOp op, byte[] val) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding filter " + op.toString() +
" with value " + Bytes.toStringBinary(val));
}
addFilter(new RowFilter(op, new BinaryComparator(val)));
}
private void addFilter(Filter filter) {
FilterList scanFilter = (FilterList) scan.getFilter();
if (scanFilter == null) {
scanFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
}
scanFilter.addFilter(filter);
scan.setFilter(scanFilter);
}
/**
* Returns the ColumnInfo list so external objects can inspect it.
* @return List of ColumnInfo objects
*/
public List<ColumnInfo> getColumnInfoList() {
return columnInfo_;
}
/**
* Updates the ColumnInfo List. Use this if you need to implement custom projections
*/
protected void setColumnInfoList(List<ColumnInfo> columnInfoList) {
this.columnInfo_ = columnInfoList;
}
/**
* Stores the requiredFieldsList as a serialized object so it can be fetched on the cluster. If
* you plan to overwrite pushProjection, you need to call this with the requiredFieldList so it
* they can be accessed on the cluster.
*/
protected void storeProjectedFieldNames(RequiredFieldList requiredFieldList) throws FrontendException {
try {
getUDFProperties().setProperty(projectedFieldsName(),
ObjectSerializer.serialize(requiredFieldList));
} catch (IOException e) {
throw new FrontendException(e);
}
}
@Override
public Tuple getNext() throws IOException {
try {
if (reader.nextKeyValue()) {
ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
.getCurrentKey();
Result result = (Result) reader.getCurrentValue();
int tupleSize = columnInfo_.size();
// use a map of families -> qualifiers with the most recent
// version of the cell. Fetching multiple vesions could be a
// useful feature.
NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultsMap =
result.getNoVersionMap();
if (loadRowKey_){
tupleSize++;
}
Tuple tuple=TupleFactory.getInstance().newTuple(tupleSize);
int startIndex=0;
if (loadRowKey_){
tuple.set(0, new DataByteArray(rowKey.get()));
startIndex++;
}
for (int i = 0;i < columnInfo_.size(); ++i){
int currentIndex = startIndex + i;
ColumnInfo columnInfo = columnInfo_.get(i);
if (columnInfo.isColumnMap()) {
// It's a column family so we need to iterate and set all
// values found
NavigableMap<byte[], byte[]> cfResults =
resultsMap.get(columnInfo.getColumnFamily());
Map<String, DataByteArray> cfMap =
new HashMap<String, DataByteArray>();
if (cfResults != null) {
for (byte[] quantifier : cfResults.keySet()) {
// We need to check against the prefix filter to
// see if this value should be included. We can't
// just rely on the server-side filter, since a
// user could specify multiple CF filters for the
// same CF.
if (columnInfo.getColumnPrefix() == null ||
columnInfo.hasPrefixMatch(quantifier)) {
byte[] cell = cfResults.get(quantifier);
DataByteArray value =
cell == null ? null : new DataByteArray(cell);
cfMap.put(Bytes.toString(quantifier), value);
}
}
}
tuple.set(currentIndex, cfMap);
} else {
// It's a column so set the value
byte[] cell=result.getValue(columnInfo.getColumnFamily(),
columnInfo.getColumnName());
DataByteArray value =
cell == null ? null : new DataByteArray(cell);
tuple.set(currentIndex, value);
}
}
if (LOG.isDebugEnabled()) {
for (int i = 0; i < tuple.size(); i++) {
LOG.debug("tuple value:" + tuple.get(i));
}
}
return tuple;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
return null;
}
@Override
public InputFormat getInputFormat() {
TableInputFormat inputFormat = new HBaseTableIFBuilder()
.withLimit(limit_)
.withGt(gt_)
.withGte(gte_)
.withLt(lt_)
.withLte(lte_)
.withConf(m_conf)
.build();
inputFormat.setScan(scan);
return inputFormat;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
this.reader = reader;
}
@Override
public void setUDFContextSignature(String signature) {
this.contextSignature = signature;
}
@Override
public void setLocation(String location, Job job) throws IOException {
Properties udfProps = getUDFProperties();
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
m_conf = initializeLocalJobConfig(job);
String tablename = location;
if (location.startsWith("hbase://")) {
tablename = location.substring(8);
}
m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
String projectedFields = udfProps.getProperty( projectedFieldsName() );
if (projectedFields != null) {
// update columnInfo_
pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
}
addFiltersWithoutColumnPrefix(columnInfo_);
if (requiredFieldList != null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
new String[] {contextSignature});
p.setProperty(contextSignature + "_projectedFields", ObjectSerializer.serialize(requiredFieldList));
}
}
@Override
public List<String> getShipFiles() {
// Depend on HBase to do the right thing when available, as of HBASE-9165
try {
Configuration conf = new Configuration();
TableMapReduceUtil.addHBaseDependencyJars(conf);
if (conf.get("tmpjars") != null) {
String[] tmpjars = conf.getStrings("tmpjars");
List<String> shipFiles = new ArrayList<String>(tmpjars.length);
for (String tmpjar : tmpjars) {
shipFiles.add(new URL(tmpjar).getPath());
}
return shipFiles;
}
} catch (IOException e) {
if(e instanceof MalformedURLException){
LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+ " had malformed url. Falling back to previous logic.", e);
}else {
LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ " failed. Falling back to previous logic.", e);
}
}
List<Class> classList = new ArrayList<Class>();
classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
addClassToList("org.cloudera.htrace.Trace", classList); // htrace
addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
return FuncUtils.getShipFiles(classList);
}
@Override
public void addCredentials(Credentials credentials, Configuration conf) {
JobConf jobConf = initializeLocalJobConfig(conf);
if ("kerberos".equalsIgnoreCase(jobConf.get(HBASE_SECURITY_CONF_KEY))) {
LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
try {
User currentUser = User.getCurrent();
UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
if (currentUserGroupInformation.hasKerberosCredentials()) {
try (Connection connection = ConnectionFactory.createConnection(jobConf, currentUser)) {
TokenUtil.obtainTokenForJob(connection, jobConf, currentUser);
LOG.info("Token retrieval succeeded for user " + currentUser.getName());
credentials.addAll(jobConf.getCredentials());
}
} else {
LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
}
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new UndeclaredThrowableException(e,
"Unexpected error calling TokenUtil.obtainTokenForJob()");
}
} else {
LOG.info("hbase is not configured to use kerberos, skipping delegation token");
}
}
private void addClassToList(String className, List<Class> classList) {
try {
Class klass = Class.forName(className);
classList.add(klass);
} catch (ClassNotFoundException e) {
LOG.debug("Skipping adding jar for class: " + className);
}
}
private JobConf initializeLocalJobConfig(Configuration jobConf) {
Properties udfProps = getUDFProperties();
JobConf localConf = new JobConf(jobConf);
if (udfProps.containsKey(HBASE_CONFIG_SET)) {
for (Entry<Object, Object> entry : udfProps.entrySet()) {
localConf.set((String) entry.getKey(), (String) entry.getValue());
}
} else {
Configuration hbaseConf = HBaseConfiguration.create();
for (Entry<String, String> entry : hbaseConf) {
// JobConf may have some conf overriding ones in hbase-site.xml
// So only copy hbase config not in job config to UDFContext
// Also avoids copying core-default.xml and core-site.xml
// props in hbaseConf to UDFContext which would be redundant.
if (jobConf.get(entry.getKey()) == null) {
udfProps.setProperty(entry.getKey(), entry.getValue());
localConf.set(entry.getKey(), entry.getValue());
}
}
udfProps.setProperty(HBASE_CONFIG_SET, "true");
}
return localConf;
}
private JobConf initializeLocalJobConfig(Job job) {
return initializeLocalJobConfig(job.getConfiguration());
}
@Override
public String relativeToAbsolutePath(String location, Path curDir)
throws IOException {
return location;
}
/**
* Set up the caster to use for reading values out of, and writing to, HBase.
*/
@Override
public LoadCaster getLoadCaster() throws IOException {
return caster_;
}
/*
* StoreFunc Methods
* @see org.apache.pig.StoreFuncInterface#getOutputFormat()
*/
@Override
public OutputFormat getOutputFormat() throws IOException {
if (outputFormat == null) {
if (m_conf == null) {
throw new IllegalStateException("setStoreLocation has not been called");
} else {
this.outputFormat = new TableOutputFormat();
this.outputFormat.setConf(m_conf);
}
}
return outputFormat;
}
@Override
public void checkSchema(ResourceSchema s) throws IOException {
if (! (caster_ instanceof LoadStoreCaster)) {
LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
throw new IOException("Bad Caster " + caster_.getClass());
}
schema_ = s;
getUDFProperties().setProperty(contextSignature + "_schema",
ObjectSerializer.serialize(schema_));
}
// Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
@Override
public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
this.writer = writer;
}
// Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
@SuppressWarnings("unchecked")
@Override
public void putNext(Tuple t) throws IOException {
ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType();
long ts;
int startIndex=1;
if (includeTimestamp_) {
byte timestampType = (fieldSchemas == null) ? DataType.findType(t.get(startIndex)) : fieldSchemas[startIndex].getType();
LoadStoreCaster caster = (LoadStoreCaster) caster_;
switch (timestampType) {
case DataType.BYTEARRAY: ts = caster.bytesToLong(((DataByteArray)t.get(startIndex)).get()); break;
case DataType.LONG: ts = ((Long)t.get(startIndex)).longValue(); break;
case DataType.DATETIME: ts = ((DateTime)t.get(startIndex)).getMillis(); break;
default: throw new IOException("Unable to find a converter for timestamp field " + t.get(startIndex));
}
startIndex++;
} else {
ts = System.currentTimeMillis();
}
// check for deletes
if (includeTombstone_) {
if (((Boolean)t.get(startIndex)).booleanValue()) {
Delete delete = createDelete(t.get(0), type, ts);
try {
// this is a delete so there will be
// no put and we are done here
writer.write(null, delete);
return;
} catch (InterruptedException e) {
throw new IOException(e);
}
}
startIndex++;
}
Put put = createPut(t.get(0), type);
if (LOG.isDebugEnabled()) {
LOG.debug("putNext -- WAL disabled: " + noWAL_);
for (ColumnInfo columnInfo : columnInfo_) {
LOG.debug("putNext -- col: " + columnInfo);
}
}
for (int i=startIndex;i<t.size();++i){
ColumnInfo columnInfo = columnInfo_.get(i-startIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
", cf:column=" + columnInfo);
}
if (!columnInfo.isColumnMap()) {
put.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
ts, objToBytes(t.get(i), (fieldSchemas == null) ?
DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
} else {
Map<String, Object> cfMap = (Map<String, Object>) t.get(i);
if (cfMap!=null) {
for (String colName : cfMap.keySet()) {
if (LOG.isDebugEnabled()) {
LOG.debug("putNext - colName=" + colName +
", class: " + colName.getClass());
}
// TODO deal with the fact that maps can have types now. Currently we detect types at
// runtime in the case of storing to a cf, which is suboptimal.
put.addColumn(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
}
}
}
}
try {
if (!put.isEmpty()) {
writer.write(null, put);
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* Public method to initialize a Delete.
*
* @param key
* @param type
* @param timestamp
* @return new delete
* @throws IOException
*/
public Delete createDelete(Object key, byte type, long timestamp) throws IOException {
Delete delete = new Delete(objToBytes(key, type));
delete.setTimestamp(timestamp);
if(noWAL_) {
delete.setDurability(Durability.SKIP_WAL);
}
return delete;
}
/**
* Public method to initialize a Put. Used to allow assertions of how Puts
* are initialized by unit tests.
*
* @param key
* @param type
* @return new put
* @throws IOException
*/
public Put createPut(Object key, byte type) throws IOException {
Put put = new Put(objToBytes(key, type));
if(noWAL_) {
put.setDurability(Durability.SKIP_WAL);
}
return put;
}
@SuppressWarnings("unchecked")
private byte[] objToBytes(Object o, byte type) throws IOException {
LoadStoreCaster caster = (LoadStoreCaster) caster_;
if (o == null) return null;
switch (type) {
case DataType.BYTEARRAY: return ((DataByteArray) o).get();
case DataType.BAG: return caster.toBytes((DataBag) o);
case DataType.CHARARRAY: return caster.toBytes((String) o);
case DataType.DOUBLE: return caster.toBytes((Double) o);
case DataType.FLOAT: return caster.toBytes((Float) o);
case DataType.INTEGER: return caster.toBytes((Integer) o);
case DataType.LONG: return caster.toBytes((Long) o);
case DataType.BIGINTEGER: return caster.toBytes((BigInteger) o);
case DataType.BIGDECIMAL: return caster.toBytes((BigDecimal) o);
case DataType.BOOLEAN: return caster.toBytes((Boolean) o);
case DataType.DATETIME: return caster.toBytes((DateTime) o);
// The type conversion here is unchecked.
// Relying on DataType.findType to do the right thing.
case DataType.MAP: return caster.toBytes((Map<String, Object>) o);
case DataType.NULL: return null;
case DataType.TUPLE: return caster.toBytes((Tuple) o);
case DataType.ERROR: throw new IOException("Unable to determine type of " + o.getClass());
default: throw new IOException("Unable to find a converter for tuple field " + o);
}
}
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
throws IOException {
return location;
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if (location.startsWith("hbase://")){
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8));
}else{
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
}
String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema");
if (serializedSchema!= null) {
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
m_conf = initializeLocalJobConfig(job);
}
@Override
public void cleanupOnFailure(String location, Job job) throws IOException {
}
@Override
public void cleanupOnSuccess(String location, Job job) throws IOException {
}
/*
* LoadPushDown Methods.
*/
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
@Override
public RequiredFieldResponse pushProjection(
RequiredFieldList requiredFieldList) throws FrontendException {
List<RequiredField> requiredFields = requiredFieldList.getFields();
List<ColumnInfo> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
if (this.requiredFieldList != null) {
// in addition to PIG, this is also called by this.setLocation().
LOG.debug("projection is already set. skipping.");
return new RequiredFieldResponse(true);
}
/* How projection is handled :
* - pushProjection() is invoked by PIG on the front end
* - pushProjection here both stores serialized projection in the
* context and adjusts columnInfo_.
* - setLocation() is invoked on the backend and it reads the
* projection from context. setLocation invokes this method again
* so that columnInfo_ is adjected.
*/
// colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
// (row key is not a real column)
int colOffset = loadRowKey_ ? 1 : 0;
// projOffset is the offset to the requiredFieldList we need to apply when figuring out which columns to prune.
// (if key is pruned, we should skip row key's element in this list when trimming colList)
int projOffset = colOffset;
this.requiredFieldList = requiredFieldList;
if (requiredFieldList != null && requiredFields.size() > (columnInfo_.size() + colOffset)) {
throw new FrontendException("The list of columns to project from HBase (" + requiredFields.size() + ") is larger than HBaseStorage is configured to load (" + (columnInfo_.size() + colOffset) + ").");
}
// remember the projection
storeProjectedFieldNames(requiredFieldList);
if (loadRowKey_ &&
( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
loadRowKey_ = false;
projOffset = 0;
}
for (int i = projOffset; i < requiredFields.size(); i++) {
int fieldIndex = requiredFields.get(i).getIndex();
newColumns.add(columnInfo_.get(fieldIndex - colOffset));
}
if (LOG.isDebugEnabled()) {
LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
for (ColumnInfo colInfo : newColumns) {
LOG.debug("pushProjection -- col: " + colInfo);
}
}
setColumnInfoList(newColumns);
return new RequiredFieldResponse(true);
}
@Override
public void ensureAllKeyInstancesInSameSplit() throws IOException {
/**
* no-op because hbase keys are unique
* This will also work with things like DelimitedKeyPrefixRegionSplitPolicy
* if you need a partial key match to be included in the split
*/
LOG.debug("ensureAllKeyInstancesInSameSplit");
}
@Override
public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException {
if (split instanceof TableSplit) {
return new TableSplitComparable((TableSplit) split);
} else {
throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName());
}
}
/**
* Class to encapsulate logic around which column names were specified in each
* position of the column list. Users can specify columns names in one of 4
* ways: 'Foo:', 'Foo:*', 'Foo:bar*' or 'Foo:bar'. The first 3 result in a
* Map being added to the tuple, while the last results in a scalar. The 3rd
* form results in a prefix-filtered Map.
*/
public class ColumnInfo {
final String originalColumnName; // always set
final byte[] columnFamily; // always set
final byte[] columnName; // set if it exists and doesn't contain '*'
final byte[] columnPrefix; // set if contains a prefix followed by '*'
public ColumnInfo(String colName) {
originalColumnName = colName;
String[] cfAndColumn = colName.split(COLON, 2);
//CFs are byte[1] and columns are byte[2]
columnFamily = Bytes.toBytes(cfAndColumn[0]);
if (cfAndColumn.length > 1 &&
cfAndColumn[1].length() > 0 && !ASTERISK.equals(cfAndColumn[1])) {
if (cfAndColumn[1].endsWith(ASTERISK)) {
columnPrefix = Bytes.toBytes(cfAndColumn[1].substring(0,
cfAndColumn[1].length() - 1));
columnName = null;
}
else {
columnName = Bytes.toBytes(cfAndColumn[1]);
columnPrefix = null;
}
} else {
columnPrefix = null;
columnName = null;
}
}
public byte[] getColumnFamily() { return columnFamily; }
public byte[] getColumnName() { return columnName; }
public byte[] getColumnPrefix() { return columnPrefix; }
public boolean isColumnMap() { return columnName == null; }
public boolean hasPrefixMatch(byte[] qualifier) {
return Bytes.startsWith(qualifier, columnPrefix);
}
@Override
public String toString() { return originalColumnName; }
}
/**
* Group the list of ColumnInfo objects by their column family and returns a map of CF to its
* list of ColumnInfo objects. Using String as key since it implements Comparable.
* @param columnInfos the columnInfo list to group
* @return a Map of lists, keyed by their column family.
*/
static Map<String, List<ColumnInfo>> groupByFamily(List<ColumnInfo> columnInfos) {
Map<String, List<ColumnInfo>> groupedMap = new HashMap<String, List<ColumnInfo>>();
for (ColumnInfo columnInfo : columnInfos) {
String cf = Bytes.toString(columnInfo.getColumnFamily());
List<ColumnInfo> columnInfoList = groupedMap.get(cf);
if (columnInfoList == null) {
columnInfoList = new ArrayList<ColumnInfo>();
}
columnInfoList.add(columnInfo);
groupedMap.put(cf, columnInfoList);
}
return groupedMap;
}
static String toString(byte[] bytes) {
if (bytes == null) { return null; }
StringBuffer sb = new StringBuffer();
for (int i = 0; i < bytes.length; i++) {
if (i > 0) { sb.append("|"); }
sb.append(bytes[i]);
}
return sb.toString();
}
/**
* Increments the byte array by one for use with setting stopRow. If all bytes in the array are
* set to the maximum byte value, then the original array will be returned with a 0 byte appended
* to it. This is because HBase compares bytes from left to right. If byte array B is equal to
* byte array A, but with an extra byte appended, A will be < B. For example
* {@code}A = byte[] {-1}{@code} increments to
* {@code}B = byte[] {-1, 0}{@code} and {@code}A < B{@code}
* @param bytes array to increment bytes on
* @return a copy of the byte array incremented by 1
*/
static byte[] increment(byte[] bytes) {
boolean allAtMax = true;
for(int i = 0; i < bytes.length; i++) {
if((bytes[bytes.length - i - 1] & 0x0ff) != 255) {
allAtMax = false;
break;
}
}
if (allAtMax) {
return Arrays.copyOf(bytes, bytes.length + 1);
}
byte[] incremented = bytes.clone();
for(int i = bytes.length - 1; i >= 0; i--) {
boolean carry = false;
int val = bytes[i] & 0x0ff;
int total = val + 1;
if(total > 255) {
carry = true;
total %= 256;
} else if (total < 0) {
carry = true;
}
incremented[i] = (byte)total;
if (!carry) return incremented;
}
return incremented;
}
}