SQOOP-3267: Incremental import to HBase deletes only last version of column

(Daniel Voros by Szabolcs Vasas)
diff --git a/src/docs/man/hbase-args.txt b/src/docs/man/hbase-args.txt
index afd5c5b..af9c96a 100644
--- a/src/docs/man/hbase-args.txt
+++ b/src/docs/man/hbase-args.txt
@@ -36,4 +36,9 @@
 --hbase-table (table-name)::
   Specifies an HBase table to use as the target instead of HDFS
 
+--hbase-null-incremental-mode (mode)::
+  How to handle columns updated to null during incremental imports. +ignore+ is the default and
+  will result in retaining the previously imported value. +delete+ mode will delete all previous
+  versions of the column from HBase.
+
 
diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt
index 53040f5..4076214 100644
--- a/src/docs/user/hbase-args.txt
+++ b/src/docs/user/hbase-args.txt
@@ -20,19 +20,22 @@
 
 .HBase arguments:
 [grid="all"]
-`-----------------------------`-------------------------------------------
-Argument                      Description
---------------------------------------------------------------------------
-+\--column-family <family>+   Sets the target column family for the import
-+\--hbase-create-table+       If specified, create missing HBase tables
-+\--hbase-row-key <col>+      Specifies which input column to use as the\
-                              row key
-                              In case, if input table contains composite
-                              key, then <col> must be in the form of a
-                              comma-separated list of composite key
-                              attributes
-+\--hbase-table <table-name>+ Specifies an HBase table to use as the \
-                              target instead of HDFS
-+\--hbase-bulkload+           Enables bulk loading
---------------------------------------------------------------------------
+`---------------------------------------`-------------------------------------------
+Argument                                Description
+------------------------------------------------------------------------------------
++\--column-family <family>+             Sets the target column family for the import
++\--hbase-create-table+                 If specified, create missing HBase tables
++\--hbase-row-key <col>+                Specifies which input column to use as the\
+                                        row key
+                                        In case, if input table contains composite
+                                        key, then <col> must be in the form of a
+                                        comma-separated list of composite key
+                                        attributes
++\--hbase-table <table-name>+           Specifies an HBase table to use as the \
+                                        target instead of HDFS
++\--hbase-bulkload+                     Enables bulk loading
++\--hbase-null-incremental-mode <mode>+ How to handle columns updated to null. \
+                                        Legal values for <mode> are +ignore+ \
+                                        (default) and +delete+.
+------------------------------------------------------------------------------------
 
diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt
index ab4aedc..817956d 100644
--- a/src/docs/user/hbase.txt
+++ b/src/docs/user/hbase.txt
@@ -58,5 +58,10 @@
 cell. Sqoop will skip all rows containing null values in all columns
 except the row key column.
 
+By default Sqoop will retain the previously imported value for columns
+updated to null during incremental imports. This can be changed to
+delete all previous versions of the column by using
++\--hbase-null-incremental-mode delete+.
+
 To decrease the load on hbase, Sqoop can do bulk loading as opposed to
 direct writes. To use bulk loading, enable it using +\--hbase-bulkload+.
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 73d0757..651cebd 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -105,6 +105,18 @@
   }
 
   /**
+   * How to handle null values when doing incremental import into HBase table:
+   * <ul>
+   * <li>Ignore: ignore update, retain previous value</li>
+   * <li>Delete: delete all previous values of column</li>
+   * </ul>
+   */
+  public enum HBaseNullIncrementalMode {
+    Ignore,
+    Delete,
+  }
+
+  /**
    * Update mode option specifies how updates are performed when
    * new rows are found with non-matching keys in database.
    * It supports two modes:
@@ -322,6 +334,9 @@
   @StoredAsProperty("incremental.last.value")
   private String incrementalLastValue;
 
+  @StoredAsProperty("hbase.null.incremental.mode")
+  private HBaseNullIncrementalMode hbaseNullIncrementalMode;
+
   // exclude these tables when importing all tables.
   @StoredAsProperty("import.all_tables.exclude")
   private String allTablesExclude;
@@ -1085,6 +1100,7 @@
     this.dbOutColumns = null;
 
     this.incrementalMode = IncrementalMode.None;
+    this.hbaseNullIncrementalMode = HBaseNullIncrementalMode.Ignore;
 
     this.updateMode = UpdateMode.UpdateOnly;
 
@@ -2301,6 +2317,20 @@
   }
 
   /**
+   * Get HBase null incremental mode to use.
+   */
+  public HBaseNullIncrementalMode getHbaseNullIncrementalMode() {
+    return hbaseNullIncrementalMode;
+  }
+
+  /**
+   * Set HBase null incremental mode to use.
+   */
+  public void setHbaseNullIncrementalMode(HBaseNullIncrementalMode hbaseNullIncrementalMode) {
+    this.hbaseNullIncrementalMode = hbaseNullIncrementalMode;
+  }
+
+  /**
    * Set the tables to be excluded when doing all table import.
    */
   public void setAllTablesExclude(String exclude) {
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 27d6006..df9836b 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -63,6 +63,8 @@
   public static final String ROW_KEY_COLUMN_KEY =
       "sqoop.hbase.insert.row.key.column";
 
+  public static final String NULL_INCREMENTAL_MODE = "hbase.null.incremental.mode";
+
   /**
    * Configuration key specifying the PutTransformer implementation to use.
    */
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index 0bd6169..8600382 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.mapreduce.ImportJobBase;
 
 import java.io.IOException;
@@ -57,6 +58,7 @@
   protected boolean addRowKey;
   private boolean isCompositeKey = false;
   private List<String> compositeKeyAttributes;
+  private SqoopOptions.HBaseNullIncrementalMode nullMode;
 
   /**
    * Used as delimiter to combine composite-key column names when passed as.
@@ -170,6 +172,7 @@
     String colFamily, String rowKey) {
     byte[] colFamilyBytes = Bytes.toBytes(colFamily);
     List<Mutation> mutationList = new ArrayList<Mutation>();
+    Put put = null;
     for (Map.Entry<String, Object> fieldEntry : record.entrySet()) {
       String colName = fieldEntry.getKey();
       boolean rowKeyCol = false;
@@ -187,7 +190,10 @@
         Object val = fieldEntry.getValue();
         if (null != val) {
           // Put row-key in HBase
-          Put put = new Put(Bytes.toBytes(rowKey));
+          if (put == null) {
+            put = new Put(Bytes.toBytes(rowKey));
+            mutationList.add(put);
+          }
           if ( val instanceof byte[]) {
             put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
                 (byte[])val);
@@ -197,9 +203,16 @@
           }
           mutationList.add(put);
         } else {
+          switch (nullMode) {
+          case Delete:
             Delete delete = new Delete(Bytes.toBytes(rowKey));
-            delete.addColumn(colFamilyBytes, getFieldNameBytes(colName));
+            delete.addColumns(colFamilyBytes, getFieldNameBytes(colName));
             mutationList.add(delete);
+            break;
+          case Ignore:
+            // Do nothing
+            break;
+          }
         }
       }
     }
@@ -218,6 +231,7 @@
 
   @Override
   public void init(Configuration conf) {
+    nullMode = conf.getEnum(HBasePutProcessor.NULL_INCREMENTAL_MODE, SqoopOptions.HBaseNullIncrementalMode.Ignore);
     setColumnFamily(conf.get(COL_FAMILY_KEY, null));
     setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
 
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
index 33da487..a09a45e 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
@@ -93,9 +93,10 @@
         HBasePutProcessor.class,
         FieldMapProcessor.class);
 
-    // Set the HBase parameters (table, column family, row key):
+    // Set the HBase parameters (table, column family, row key, null mode):
     conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
     conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
+    conf.set(HBasePutProcessor.NULL_INCREMENTAL_MODE, options.getHbaseNullIncrementalMode().toString());
 
     // What column of the input becomes the row key?
     String rowKeyCol = options.getHBaseRowKeyColumn();
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index ce21918..b02e4fe 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -199,6 +199,7 @@
   public static final String HBASE_BULK_LOAD_ENABLED_ARG =
       "hbase-bulkload";
   public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
+  public static final String HBASE_NULL_INCREMENTAL_MODE_ARG = "hbase-null-incremental-mode";
 
   //Accumulo arguments.
   public static final String ACCUMULO_TABLE_ARG = "accumulo-table";
@@ -853,6 +854,11 @@
         .withDescription("If specified, create missing HBase tables")
         .withLongOpt(HBASE_CREATE_TABLE_ARG)
         .create());
+    hbaseOpts.addOption(OptionBuilder.withArgName("nullmode")
+        .hasArg()
+        .withDescription("How to handle null values during incremental import into HBase.")
+        .withLongOpt(HBASE_NULL_INCREMENTAL_MODE_ARG)
+        .create());
 
     return hbaseOpts;
   }
@@ -1398,7 +1404,7 @@
     }
   }
 
-  protected void applyHBaseOptions(CommandLine in, SqoopOptions out) {
+  protected void applyHBaseOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
     if (in.hasOption(HBASE_TABLE_ARG)) {
       out.setHBaseTable(in.getOptionValue(HBASE_TABLE_ARG));
     }
@@ -1416,6 +1422,19 @@
     if (in.hasOption(HBASE_CREATE_TABLE_ARG)) {
       out.setCreateHBaseTable(true);
     }
+
+    if (in.hasOption(HBASE_NULL_INCREMENTAL_MODE_ARG)) {
+      String nullMode = in.getOptionValue(HBASE_NULL_INCREMENTAL_MODE_ARG);
+      if ("ignore".equals(nullMode)) {
+        out.setHbaseNullIncrementalMode(SqoopOptions.HBaseNullIncrementalMode.Ignore);
+      } else if ("delete".equals(nullMode)) {
+        out.setHbaseNullIncrementalMode(SqoopOptions.HBaseNullIncrementalMode.Delete);
+      } else {
+        throw new InvalidOptionsException("Unknown HBase null incremental mode: "
+            + nullMode + ". Use 'ignore' or 'delete'."
+            + HELP_STR);
+      }
+    }
   }
 
   protected void applyValidationOptions(CommandLine in, SqoopOptions out)
diff --git a/src/test/org/apache/sqoop/TestSqoopOptions.java b/src/test/org/apache/sqoop/TestSqoopOptions.java
index 16901ca..bb7c20d 100644
--- a/src/test/org/apache/sqoop/TestSqoopOptions.java
+++ b/src/test/org/apache/sqoop/TestSqoopOptions.java
@@ -89,6 +89,7 @@
     excludedFieldsFromClone.add("updateMode");
     excludedFieldsFromClone.add("layout");
     excludedFieldsFromClone.add("activeSqoopTool");
+    excludedFieldsFromClone.add("hbaseNullIncrementalMode");
   }
 
   @After
diff --git a/src/test/org/apache/sqoop/hbase/HBaseImportTest.java b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java
index 2e73cf3..58c21ce 100644
--- a/src/test/org/apache/sqoop/hbase/HBaseImportTest.java
+++ b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java
@@ -72,24 +72,6 @@
   }
 
   @Test
-  public void testOverwriteNullColumnsSucceeds() throws IOException {
-    // Test that we can create a table and then import immediately
-    // back on top of it without problem and then update with null to validate
-    String [] argv = getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null);
-    String [] types = { "INT", "INT", "INT", "DATETIME" };
-    String [] vals = { "0", "1", "1", "'2017-03-20'" };
-    createTableWithColTypes(types, vals);
-    runImport(argv);
-    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), "1");
-    // Run a second time.
-    argv = getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, "DATA_COL3", "2017-03-24 01:01:01.0", null);
-    vals = new String[] { "0", "1", null, "'2017-03-25'" };
-    updateTable(types, vals);
-    runImport(argv);
-    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), null);
-  }
-
-  @Test
   public void testAppendWithTimestampSucceeds() throws IOException {
     // Test that we can create a table and then import multiple rows
     // validate for append scenario with time stamp
@@ -100,7 +82,7 @@
     runImport(argv);
     verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
     // Run a second time.
-    argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null);
+    argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null, "ignore");
     vals = new String[] { "1", "2", "3", "'2017-06-15'" };
     insertIntoTable(types, vals);
     runImport(argv);
@@ -118,7 +100,7 @@
     runImport(argv);
     verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
     // Run a second time.
-    argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3");
+    argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3", "ignore");
     vals = new String[] { "1", "2", "3", "'2017-06-15'" };
     insertIntoTable(types, vals);
     runImport(argv);
@@ -126,6 +108,49 @@
   }
 
   @Test
+  public void testNullIncrementalModeIgnore() throws Exception {
+    // Latest value retained with 'ignore' mode
+    runInsertUpdateUpdateDeleteAndExpectValue("ignore", "2");
+  }
+
+  @Test
+  public void testNullIncrementalModeDelete() throws Exception {
+    // All previous values deleted with 'delete' mode
+    runInsertUpdateUpdateDeleteAndExpectValue("delete", null);
+  }
+
+  /**
+   * Does the following
+   *  - create HBase table
+   *  - insert value "1"
+   *  - update value to "2"
+   *  - update value to null
+   *  - asserts its value equals expectedValue
+   *
+   * @param nullMode hbase-null-incremental-mode to use ('ignore' or 'delete')
+   * @param expectedValue expected value in the end
+   * @throws Exception
+   */
+  private void runInsertUpdateUpdateDeleteAndExpectValue(String nullMode, String expectedValue) throws Exception {
+    // Create table and import with initial values
+    String [] types = { "INT", "INT", "DATETIME" };
+    createTableWithColTypes(types, new String[] { "0", "1", "'2017-03-20'" });
+    runImport(getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null));
+    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), "1");
+
+    // Run a second time after updating.
+    updateTable(types, new String[] { "0", "2", "'2017-03-25'" });
+    runImport(getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, getColName(2), "2017-03-24 01:01:01.0", null, nullMode));
+    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), "2");
+
+    // Run third time after deleting (setting to null)
+    updateTable(types, new String[] { "0", null, "'2017-03-28'" });
+    runImport(getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, getColName(2), "2017-03-26 01:01:01.0", null, nullMode));
+    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), expectedValue);
+  }
+
+
+  @Test
   public void testExitFailure() throws IOException {
     String [] types = { "INT", "INT", "INT" };
     String [] vals = { "0", "42", "43" };
diff --git a/src/test/org/apache/sqoop/hbase/HBaseTestCase.java b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java
index 98f8698..f96b658 100644
--- a/src/test/org/apache/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java
@@ -138,7 +138,7 @@
    */
   protected String [] getIncrementalArgv(boolean includeHadoopFlags,
       String hbaseTable, String hbaseColFam, boolean hbaseCreate,
-      String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn) {
+      String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn, String nullMode) {
 
     String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr);
     List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray));
@@ -161,6 +161,14 @@
       args.add("--last-value");
       args.add(checkValue);
     }
+
+    // Set --hbase-null-incremental-mode (default is 'ignore')
+    if (nullMode == null) {
+      nullMode = "ignore";
+    }
+    args.add("--hbase-null-incremental-mode");
+    args.add(nullMode);
+
     return args.toArray(new String[0]);
   }