blob: 29e9034cb2afc01008287f536305ed8f0758ba9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.ImportJobBase;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY;
import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT;
import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
/**
* PutTransformer that calls toString on all non-null fields.
*/
public class ToStringPutTransformer extends PutTransformer {
public static final Log LOG = LogFactory.getLog(
ToStringPutTransformer.class.getName());
// A mapping from field name -> bytes for that field name.
// Used to cache serialization work done for fields names.
private Map<String, byte[]> serializedFieldNames;
protected boolean bigDecimalFormatString;
protected boolean addRowKey;
private boolean isCompositeKey = false;
private List<String> compositeKeyAttributes;
private SqoopOptions.HBaseNullIncrementalMode nullMode;
/**
* Used as delimiter to combine composite-key column names when passed as.
* argument to --hbase-row-key
*/
public static final String DELIMITER_COMMAND_LINE = ",";
/**
* Used as connecting char for storing composite-key values to form.
* composite row-key on hbase
*/
public static final String DELIMITER_HBASE = "_";
public ToStringPutTransformer() {
serializedFieldNames = new TreeMap<String, byte[]>();
}
/**
* Return the serialized bytes for a field name, using.
* the cache if it's already in there.
*/
private byte [] getFieldNameBytes(String fieldName) {
byte [] cachedName = serializedFieldNames.get(fieldName);
if (null != cachedName) {
// Cache hit. We're done.
return cachedName;
}
// Do the serialization and memoize the result.
byte [] nameBytes = Bytes.toBytes(fieldName);
serializedFieldNames.put(fieldName, nameBytes);
return nameBytes;
}
/**
* Checks whether --hbase-row-key parameter is a comma separated list of.
* attributes i.e composite key
*/
public void detectCompositeKey() {
String rowKeyCol = getRowKeyColumn();
if (null != rowKeyCol && rowKeyCol.contains(DELIMITER_COMMAND_LINE)) {
// Set the flag as true
isCompositeKey = true;
String[] compositeKeyArray = rowKeyCol.split(DELIMITER_COMMAND_LINE);
compositeKeyAttributes = Arrays.asList(compositeKeyArray);
}
}
@Override
/** {@inheritDoc} */
public List<Mutation> getMutationCommand(Map<String, Object> fields)
throws IOException {
String rowKeyCol = getRowKeyColumn();
if (null == rowKeyCol) {
throw new IOException("Row key column can't be NULL.");
}
String colFamily = getColumnFamily();
if (null == colFamily) {
throw new IOException("Column family can't be NULL.");
}
if (isCompositeKey) {
// Indicates row-key is a composite key (multiple attribute key)
List<String> rowKeyList = new ArrayList<String>();
// storing each comma-separated attribute into list
for (String fieldName : compositeKeyAttributes) {
Object fieldValue = fields.get(fieldName);
if (null == fieldValue) {
// If the row-key column value is null, we don't insert this row.
throw new IOException("Could not insert row with null "
+ "value for row-key column: " + fieldName);
}
String rowKey = toHBaseString(fieldValue);
// inserting value of each attribute (rowKey) into list
rowKeyList.add(rowKey);
}
// construct rowKey by combining attribute values
// from composite key
String compositeRowKey = StringUtils.join(DELIMITER_HBASE, rowKeyList);
// Insert record in HBase
return mutationRecordInHBase(fields, colFamily, compositeRowKey);
} else {
// if row-key is regular primary key
// i.e. it contains only one attribute
Object rowKey = fields.get(rowKeyCol);
if (null == rowKey) {
// If the row-key column is null, we don't insert this row.
throw new IOException("Could not insert row with null "
+ "value for row-key column: " + rowKeyCol);
}
String hBaseRowKey = toHBaseString(rowKey);
return mutationRecordInHBase(fields, colFamily, hBaseRowKey);
}
}
/**
* Performs actual Put/delete operation for the specified record in HBase.
* @param record
* @param colFamily
* @param rowKey
* @return List containing a put/delete command
*/
private List<Mutation> mutationRecordInHBase(Map<String, Object> record,
String colFamily, String rowKey) {
byte[] colFamilyBytes = Bytes.toBytes(colFamily);
List<Mutation> mutationList = new ArrayList<Mutation>();
Put put = null;
for (Map.Entry<String, Object> fieldEntry : record.entrySet()) {
String colName = fieldEntry.getKey();
boolean rowKeyCol = false;
/*
* For both composite key and normal primary key,
* check if colName is part of rowKey.
*/
if ((isCompositeKey && compositeKeyAttributes.contains(colName))
|| colName.equals(getRowKeyColumn())) {
rowKeyCol = true;
}
if (!rowKeyCol || addRowKey) {
// check addRowKey flag before including rowKey field.
Object val = fieldEntry.getValue();
if (null != val) {
// Put row-key in HBase
if (put == null) {
put = new Put(Bytes.toBytes(rowKey));
mutationList.add(put);
}
if ( val instanceof byte[]) {
put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
(byte[])val);
} else {
put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
Bytes.toBytes(toHBaseString(val)));
}
} else {
switch (nullMode) {
case Delete:
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumns(colFamilyBytes, getFieldNameBytes(colName));
mutationList.add(delete);
break;
case Ignore:
// Do nothing
break;
}
}
}
}
return Collections.unmodifiableList(mutationList);
}
private String toHBaseString(Object val) {
String valString;
if (val instanceof BigDecimal && bigDecimalFormatString) {
valString = ((BigDecimal) val).toPlainString();
} else {
valString = val.toString();
}
return valString;
}
@Override
public void init(Configuration conf) {
nullMode = conf.getEnum(HBasePutProcessor.NULL_INCREMENTAL_MODE, SqoopOptions.HBaseNullIncrementalMode.Ignore);
setColumnFamily(conf.get(COL_FAMILY_KEY, null));
setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
this.bigDecimalFormatString = conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT);
detectCompositeKey();
}
}