SQOOP-3267: Incremental import to HBase deletes only last version of column
(Daniel Voros by Szabolcs Vasas)
diff --git a/src/docs/man/hbase-args.txt b/src/docs/man/hbase-args.txt
index afd5c5b..af9c96a 100644
--- a/src/docs/man/hbase-args.txt
+++ b/src/docs/man/hbase-args.txt
@@ -36,4 +36,9 @@
--hbase-table (table-name)::
Specifies an HBase table to use as the target instead of HDFS
+--hbase-null-incremental-mode (mode)::
+ How to handle columns updated to null during incremental imports. +ignore+ is the default and
+ will result in retaining the previously imported value. +delete+ mode will delete all previous
+ versions of the column from HBase.
+
diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt
index 53040f5..4076214 100644
--- a/src/docs/user/hbase-args.txt
+++ b/src/docs/user/hbase-args.txt
@@ -20,19 +20,22 @@
.HBase arguments:
[grid="all"]
-`-----------------------------`-------------------------------------------
-Argument Description
---------------------------------------------------------------------------
-+\--column-family <family>+ Sets the target column family for the import
-+\--hbase-create-table+ If specified, create missing HBase tables
-+\--hbase-row-key <col>+ Specifies which input column to use as the\
- row key
- In case, if input table contains composite
- key, then <col> must be in the form of a
- comma-separated list of composite key
- attributes
-+\--hbase-table <table-name>+ Specifies an HBase table to use as the \
- target instead of HDFS
-+\--hbase-bulkload+ Enables bulk loading
---------------------------------------------------------------------------
+`---------------------------------------`-------------------------------------------
+Argument Description
+------------------------------------------------------------------------------------
++\--column-family <family>+ Sets the target column family for the import
++\--hbase-create-table+ If specified, create missing HBase tables
++\--hbase-row-key <col>+ Specifies which input column to use as the\
+ row key
+ In case, if input table contains composite
+ key, then <col> must be in the form of a
+ comma-separated list of composite key
+ attributes
++\--hbase-table <table-name>+ Specifies an HBase table to use as the \
+ target instead of HDFS
++\--hbase-bulkload+ Enables bulk loading
++\--hbase-null-incremental-mode <mode>+ How to handle columns updated to null. \
+ Legal values for <mode> are +ignore+ \
+ (default) and +delete+.
+------------------------------------------------------------------------------------
diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt
index ab4aedc..817956d 100644
--- a/src/docs/user/hbase.txt
+++ b/src/docs/user/hbase.txt
@@ -58,5 +58,10 @@
cell. Sqoop will skip all rows containing null values in all columns
except the row key column.
+By default Sqoop will retain the previously imported value for columns
+updated to null during incremental imports. This can be changed to
+delete all previous versions of the column by using
++\--hbase-null-incremental-mode delete+.
+
To decrease the load on hbase, Sqoop can do bulk loading as opposed to
direct writes. To use bulk loading, enable it using +\--hbase-bulkload+.
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 73d0757..651cebd 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -105,6 +105,18 @@
}
/**
+ * How to handle null values when doing incremental import into HBase table:
+ * <ul>
+ * <li>Ignore: ignore update, retain previous value</li>
+ * <li>Delete: delete all previous values of column</li>
+ * </ul>
+ */
+ public enum HBaseNullIncrementalMode {
+ Ignore,
+ Delete,
+ }
+
+ /**
* Update mode option specifies how updates are performed when
* new rows are found with non-matching keys in database.
* It supports two modes:
@@ -322,6 +334,9 @@
@StoredAsProperty("incremental.last.value")
private String incrementalLastValue;
+ @StoredAsProperty("hbase.null.incremental.mode")
+ private HBaseNullIncrementalMode hbaseNullIncrementalMode;
+
// exclude these tables when importing all tables.
@StoredAsProperty("import.all_tables.exclude")
private String allTablesExclude;
@@ -1085,6 +1100,7 @@
this.dbOutColumns = null;
this.incrementalMode = IncrementalMode.None;
+ this.hbaseNullIncrementalMode = HBaseNullIncrementalMode.Ignore;
this.updateMode = UpdateMode.UpdateOnly;
@@ -2301,6 +2317,20 @@
}
/**
+ * Get HBase null incremental mode to use.
+ */
+ public HBaseNullIncrementalMode getHbaseNullIncrementalMode() {
+ return hbaseNullIncrementalMode;
+ }
+
+ /**
+ * Set HBase null incremental mode to use.
+ */
+ public void setHbaseNullIncrementalMode(HBaseNullIncrementalMode hbaseNullIncrementalMode) {
+ this.hbaseNullIncrementalMode = hbaseNullIncrementalMode;
+ }
+
+ /**
* Set the tables to be excluded when doing all table import.
*/
public void setAllTablesExclude(String exclude) {
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 27d6006..df9836b 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -63,6 +63,8 @@
public static final String ROW_KEY_COLUMN_KEY =
"sqoop.hbase.insert.row.key.column";
+ public static final String NULL_INCREMENTAL_MODE = "hbase.null.incremental.mode";
+
/**
* Configuration key specifying the PutTransformer implementation to use.
*/
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index 0bd6169..8600382 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.ImportJobBase;
import java.io.IOException;
@@ -57,6 +58,7 @@
protected boolean addRowKey;
private boolean isCompositeKey = false;
private List<String> compositeKeyAttributes;
+ private SqoopOptions.HBaseNullIncrementalMode nullMode;
/**
* Used as delimiter to combine composite-key column names when passed as.
@@ -170,6 +172,7 @@
String colFamily, String rowKey) {
byte[] colFamilyBytes = Bytes.toBytes(colFamily);
List<Mutation> mutationList = new ArrayList<Mutation>();
+ Put put = null;
for (Map.Entry<String, Object> fieldEntry : record.entrySet()) {
String colName = fieldEntry.getKey();
boolean rowKeyCol = false;
@@ -187,7 +190,10 @@
Object val = fieldEntry.getValue();
if (null != val) {
// Put row-key in HBase
- Put put = new Put(Bytes.toBytes(rowKey));
+ if (put == null) {
+ put = new Put(Bytes.toBytes(rowKey));
+ mutationList.add(put);
+ }
if ( val instanceof byte[]) {
put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
(byte[])val);
@@ -197,9 +203,16 @@
}
mutationList.add(put);
} else {
+ switch (nullMode) {
+ case Delete:
Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addColumn(colFamilyBytes, getFieldNameBytes(colName));
+ delete.addColumns(colFamilyBytes, getFieldNameBytes(colName));
mutationList.add(delete);
+ break;
+ case Ignore:
+ // Do nothing
+ break;
+ }
}
}
}
@@ -218,6 +231,7 @@
@Override
public void init(Configuration conf) {
+ nullMode = conf.getEnum(HBasePutProcessor.NULL_INCREMENTAL_MODE, SqoopOptions.HBaseNullIncrementalMode.Ignore);
setColumnFamily(conf.get(COL_FAMILY_KEY, null));
setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
index 33da487..a09a45e 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
@@ -93,9 +93,10 @@
HBasePutProcessor.class,
FieldMapProcessor.class);
- // Set the HBase parameters (table, column family, row key):
+ // Set the HBase parameters (table, column family, row key, null mode):
conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
+ conf.set(HBasePutProcessor.NULL_INCREMENTAL_MODE, options.getHbaseNullIncrementalMode().toString());
// What column of the input becomes the row key?
String rowKeyCol = options.getHBaseRowKeyColumn();
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index ce21918..b02e4fe 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -199,6 +199,7 @@
public static final String HBASE_BULK_LOAD_ENABLED_ARG =
"hbase-bulkload";
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
+ public static final String HBASE_NULL_INCREMENTAL_MODE_ARG = "hbase-null-incremental-mode";
//Accumulo arguments.
public static final String ACCUMULO_TABLE_ARG = "accumulo-table";
@@ -853,6 +854,11 @@
.withDescription("If specified, create missing HBase tables")
.withLongOpt(HBASE_CREATE_TABLE_ARG)
.create());
+ hbaseOpts.addOption(OptionBuilder.withArgName("nullmode")
+ .hasArg()
+ .withDescription("How to handle null values during incremental import into HBase.")
+ .withLongOpt(HBASE_NULL_INCREMENTAL_MODE_ARG)
+ .create());
return hbaseOpts;
}
@@ -1398,7 +1404,7 @@
}
}
- protected void applyHBaseOptions(CommandLine in, SqoopOptions out) {
+ protected void applyHBaseOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
if (in.hasOption(HBASE_TABLE_ARG)) {
out.setHBaseTable(in.getOptionValue(HBASE_TABLE_ARG));
}
@@ -1416,6 +1422,19 @@
if (in.hasOption(HBASE_CREATE_TABLE_ARG)) {
out.setCreateHBaseTable(true);
}
+
+ if (in.hasOption(HBASE_NULL_INCREMENTAL_MODE_ARG)) {
+ String nullMode = in.getOptionValue(HBASE_NULL_INCREMENTAL_MODE_ARG);
+ if ("ignore".equals(nullMode)) {
+ out.setHbaseNullIncrementalMode(SqoopOptions.HBaseNullIncrementalMode.Ignore);
+ } else if ("delete".equals(nullMode)) {
+ out.setHbaseNullIncrementalMode(SqoopOptions.HBaseNullIncrementalMode.Delete);
+ } else {
+ throw new InvalidOptionsException("Unknown HBase null incremental mode: "
+ + nullMode + ". Use 'ignore' or 'delete'."
+ + HELP_STR);
+ }
+ }
}
protected void applyValidationOptions(CommandLine in, SqoopOptions out)
diff --git a/src/test/org/apache/sqoop/TestSqoopOptions.java b/src/test/org/apache/sqoop/TestSqoopOptions.java
index 16901ca..bb7c20d 100644
--- a/src/test/org/apache/sqoop/TestSqoopOptions.java
+++ b/src/test/org/apache/sqoop/TestSqoopOptions.java
@@ -89,6 +89,7 @@
excludedFieldsFromClone.add("updateMode");
excludedFieldsFromClone.add("layout");
excludedFieldsFromClone.add("activeSqoopTool");
+ excludedFieldsFromClone.add("hbaseNullIncrementalMode");
}
@After
diff --git a/src/test/org/apache/sqoop/hbase/HBaseImportTest.java b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java
index 2e73cf3..58c21ce 100644
--- a/src/test/org/apache/sqoop/hbase/HBaseImportTest.java
+++ b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java
@@ -72,24 +72,6 @@
}
@Test
- public void testOverwriteNullColumnsSucceeds() throws IOException {
- // Test that we can create a table and then import immediately
- // back on top of it without problem and then update with null to validate
- String [] argv = getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null);
- String [] types = { "INT", "INT", "INT", "DATETIME" };
- String [] vals = { "0", "1", "1", "'2017-03-20'" };
- createTableWithColTypes(types, vals);
- runImport(argv);
- verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), "1");
- // Run a second time.
- argv = getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, "DATA_COL3", "2017-03-24 01:01:01.0", null);
- vals = new String[] { "0", "1", null, "'2017-03-25'" };
- updateTable(types, vals);
- runImport(argv);
- verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), null);
- }
-
- @Test
public void testAppendWithTimestampSucceeds() throws IOException {
// Test that we can create a table and then import multiple rows
// validate for append scenario with time stamp
@@ -100,7 +82,7 @@
runImport(argv);
verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
// Run a second time.
- argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null);
+ argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null, "ignore");
vals = new String[] { "1", "2", "3", "'2017-06-15'" };
insertIntoTable(types, vals);
runImport(argv);
@@ -118,7 +100,7 @@
runImport(argv);
verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
// Run a second time.
- argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3");
+ argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3", "ignore");
vals = new String[] { "1", "2", "3", "'2017-06-15'" };
insertIntoTable(types, vals);
runImport(argv);
@@ -126,6 +108,49 @@
}
@Test
+ public void testNullIncrementalModeIgnore() throws Exception {
+ // Latest value retained with 'ignore' mode
+ runInsertUpdateUpdateDeleteAndExpectValue("ignore", "2");
+ }
+
+ @Test
+ public void testNullIncrementalModeDelete() throws Exception {
+ // All previous values deleted with 'delete' mode
+ runInsertUpdateUpdateDeleteAndExpectValue("delete", null);
+ }
+
+ /**
+ * Does the following
+ * - create HBase table
+ * - insert value "1"
+ * - update value to "2"
+ * - update value to null
+ * - asserts its value equals expectedValue
+ *
+ * @param nullMode hbase-null-incremental-mode to use ('ignore' or 'delete')
+ * @param expectedValue expected value in the end
+ * @throws Exception
+ */
+ private void runInsertUpdateUpdateDeleteAndExpectValue(String nullMode, String expectedValue) throws Exception {
+ // Create table and import with initial values
+ String [] types = { "INT", "INT", "DATETIME" };
+ createTableWithColTypes(types, new String[] { "0", "1", "'2017-03-20'" });
+ runImport(getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null));
+ verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), "1");
+
+ // Run a second time after updating.
+ updateTable(types, new String[] { "0", "2", "'2017-03-25'" });
+ runImport(getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, getColName(2), "2017-03-24 01:01:01.0", null, nullMode));
+ verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), "2");
+
+ // Run third time after deleting (setting to null)
+ updateTable(types, new String[] { "0", null, "'2017-03-28'" });
+ runImport(getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, getColName(2), "2017-03-26 01:01:01.0", null, nullMode));
+ verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), expectedValue);
+ }
+
+
+ @Test
public void testExitFailure() throws IOException {
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "43" };
diff --git a/src/test/org/apache/sqoop/hbase/HBaseTestCase.java b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java
index 98f8698..f96b658 100644
--- a/src/test/org/apache/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java
@@ -138,7 +138,7 @@
*/
protected String [] getIncrementalArgv(boolean includeHadoopFlags,
String hbaseTable, String hbaseColFam, boolean hbaseCreate,
- String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn) {
+ String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn, String nullMode) {
String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr);
List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray));
@@ -161,6 +161,14 @@
args.add("--last-value");
args.add(checkValue);
}
+
+ // Set --hbase-null-incremental-mode (default is 'ignore')
+ if (nullMode == null) {
+ nullMode = "ignore";
+ }
+ args.add("--hbase-null-incremental-mode");
+ args.add(nullMode);
+
return args.toArray(new String[0]);
}