[HUDI-1184] Fix the support of hbase index partition path change (#1978)

When the hbase index is used, when the record partition is changed to another partition, the path does not change according to the value of the partition column

Co-authored-by: huangjing <huangjing@clinbrain.com>
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
index 709f08c..5d79776 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
@@ -102,6 +102,17 @@
   public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root";
   public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT";
 
+  /**
+   * Only applies if index type is Hbase.
+   * <p>
+   * When set to true, an update to a record with a different partition from its existing one
+   * will insert the record to the new partition and delete it from the old partition.
+   * <p>
+   * When set to false, a record will be updated to the old partition.
+   */
+  public static final String HBASE_INDEX_UPDATE_PARTITION_PATH = "hoodie.hbase.index.update.partition.path";
+  public static final Boolean DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH = false;
+
   public HoodieHBaseIndexConfig(final Properties props) {
     super(props);
   }
@@ -196,6 +207,11 @@
       return this;
     }
 
+    public Builder hbaseIndexUpdatePartitionPath(boolean updatePartitionPath) {
+      props.setProperty(HBASE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
+      return this;
+    }
+
     public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
       props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
       return this;
@@ -259,6 +275,8 @@
           HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
       setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), HBASE_INDEX_QPS_ALLOCATOR_CLASS,
           String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
+      setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_UPDATE_PARTITION_PATH), HBASE_INDEX_UPDATE_PARTITION_PATH,
+          String.valueOf(DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH));
       return config;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index bb65600..42d3e2b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -485,6 +485,10 @@
     return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP));
   }
 
+  public boolean getHbaseIndexUpdatePartitionPath() {
+    return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_UPDATE_PARTITION_PATH));
+  }
+
   public int getBloomIndexParallelism() {
     return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 21efd9b..072c71c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -22,6 +22,7 @@
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -188,6 +189,7 @@
         hoodieRecordIterator) -> {
 
       int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
+      boolean updatePartitionPath = config.getHbaseIndexUpdatePartitionPath();
 
       // Grab the global HBase connection
       synchronized (SparkHoodieHBaseIndex.class) {
@@ -205,35 +207,51 @@
           statements.add(generateStatement(rec.getRecordKey()));
           currentBatchOfRecords.add(rec);
           // iterator till we reach batch size
-          if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
-            // get results for batch from Hbase
-            Result[] results = doGet(hTable, statements);
-            // clear statements to be GC'd
-            statements.clear();
-            for (Result result : results) {
-              // first, attempt to grab location from HBase
-              HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
-              if (result.getRow() != null) {
-                String keyFromResult = Bytes.toString(result.getRow());
-                String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
-                String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
-                String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
-
-                if (checkIfValidCommit(metaClient, commitTs)) {
-                  currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
-                      currentRecord.getData());
-                  currentRecord.unseal();
-                  currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
-                  currentRecord.seal();
-                  taggedRecords.add(currentRecord);
-                  // the key from Result and the key being processed should be same
-                  assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
-                } else { // if commit is invalid, treat this as a new taggedRecord
-                  taggedRecords.add(currentRecord);
-                }
-              } else {
-                taggedRecords.add(currentRecord);
-              }
+          if (hoodieRecordIterator.hasNext() && statements.size() < multiGetBatchSize) {
+            continue;
+          }
+          // get results for batch from Hbase
+          Result[] results = doGet(hTable, statements);
+          // clear statements to be GC'd
+          statements.clear();
+          for (Result result : results) {
+            // first, attempt to grab location from HBase
+            HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
+            if (result.getRow() == null) {
+              taggedRecords.add(currentRecord);
+              continue;
+            }
+            String keyFromResult = Bytes.toString(result.getRow());
+            String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
+            String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
+            String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
+            if (!checkIfValidCommit(metaClient, commitTs)) {
+              // if commit is invalid, treat this as a new taggedRecord
+              taggedRecords.add(currentRecord);
+              continue;
+            }
+            // check whether to do partition change processing
+            if (updatePartitionPath && !partitionPath.equals(currentRecord.getPartitionPath())) {
+              // delete partition old data record
+              HoodieRecord emptyRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
+                  new EmptyHoodieRecordPayload());
+              emptyRecord.unseal();
+              emptyRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+              emptyRecord.seal();
+              // insert partition new data record
+              currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()),
+                  currentRecord.getData());
+              taggedRecords.add(emptyRecord);
+              taggedRecords.add(currentRecord);
+            } else {
+              currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
+                  currentRecord.getData());
+              currentRecord.unseal();
+              currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+              currentRecord.seal();
+              taggedRecords.add(currentRecord);
+              // the key from Result and the key being processed should be same
+              assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
             }
           }
         }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index b858516..b74daad 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -20,6 +20,8 @@
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -58,6 +60,7 @@
 import org.junit.jupiter.api.TestMethodOrder;
 
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -169,6 +172,59 @@
   }
 
   @Test
+  public void testTagLocationAndPartitionPathUpdate() throws Exception {
+    final String newCommitTime = "001";
+    final int numRecords = 10;
+    final String oldPartitionPath = "1970/01/01";
+    final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
+
+    List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, numRecords);
+    List<HoodieRecord> oldRecords = new LinkedList();
+    for (HoodieRecord newRecord: newRecords) {
+      HoodieKey key = new HoodieKey(newRecord.getRecordKey(), oldPartitionPath);
+      HoodieRecord hoodieRecord = new HoodieRecord(key, newRecord.getData());
+      oldRecords.add(hoodieRecord);
+    }
+
+    JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
+    JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
+
+    HoodieWriteConfig config = getConfig(true);
+    SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true));
+
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
+      // allowed path change test
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+
+      JavaRDD<HoodieRecord> oldHoodieRecord = index.tagLocation(oldWriteRecords, context, hoodieTable);
+      assertEquals(0, oldHoodieRecord.filter(record -> record.isCurrentLocationKnown()).count());
+      writeClient.startCommitWithTime(newCommitTime);
+      JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords, newCommitTime);
+      writeClient.commit(newCommitTime, writeStatues);
+      assertNoWriteErrors(writeStatues.collect());
+      index.updateLocation(writeStatues, context, hoodieTable);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+      List<HoodieRecord> taggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
+      assertEquals(numRecords * 2L, taggedRecords.stream().count());
+      // Verify the number of deleted records
+      assertEquals(numRecords, taggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
+          && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count());
+      // Verify the number of inserted records
+      assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
+
+      // not allowed path change test
+      index = new SparkHoodieHBaseIndex<>(getConfig(false));
+      List<HoodieRecord> notAllowPathChangeRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
+      assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
+      assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> hoodieRecord.isCurrentLocationKnown()
+          && hoodieRecord.getKey().getPartitionPath().equals(oldPartitionPath)).count());
+    }
+  }
+
+  @Test
   public void testTagLocationAndDuplicateUpdate() throws Exception {
     final String newCommitTime = "001";
     final int numRecords = 10;
@@ -454,14 +510,18 @@
   }
 
   private HoodieWriteConfig getConfig() {
-    return getConfigBuilder(100).build();
+    return getConfigBuilder(100, false).build();
   }
 
   private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
-    return getConfigBuilder(hbaseIndexBatchSize).build();
+    return getConfigBuilder(hbaseIndexBatchSize, false).build();
   }
 
-  private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
+  private HoodieWriteConfig getConfig(boolean updatePartitionPath) {
+    return getConfigBuilder(100,  updatePartitionPath).build();
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath) {
     return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(1, 1).withDeleteParallelism(1)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
@@ -475,6 +535,7 @@
                 .hbaseIndexPutBatchSizeAutoCompute(true)
                 .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
                 .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
+                .hbaseIndexUpdatePartitionPath(updatePartitionPath)
                 .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
             .build());
   }