[HUDI-1184] Fix the support of hbase index partition path change (#1978)
When the hbase index is used, when the record partition is changed to another partition, the path does not change according to the value of the partition column
Co-authored-by: huangjing <huangjing@clinbrain.com>
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
index 709f08c..5d79776 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
@@ -102,6 +102,17 @@
public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root";
public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT";
+ /**
+ * Only applies if index type is Hbase.
+ * <p>
+ * When set to true, an update to a record with a different partition from its existing one
+ * will insert the record to the new partition and delete it from the old partition.
+ * <p>
+ * When set to false, a record will be updated to the old partition.
+ */
+ public static final String HBASE_INDEX_UPDATE_PARTITION_PATH = "hoodie.hbase.index.update.partition.path";
+ public static final Boolean DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH = false;
+
public HoodieHBaseIndexConfig(final Properties props) {
super(props);
}
@@ -196,6 +207,11 @@
return this;
}
+ public Builder hbaseIndexUpdatePartitionPath(boolean updatePartitionPath) {
+ props.setProperty(HBASE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
+ return this;
+ }
+
public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
return this;
@@ -259,6 +275,8 @@
HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), HBASE_INDEX_QPS_ALLOCATOR_CLASS,
String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
+ setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_UPDATE_PARTITION_PATH), HBASE_INDEX_UPDATE_PARTITION_PATH,
+ String.valueOf(DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH));
return config;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index bb65600..42d3e2b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -485,6 +485,10 @@
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP));
}
+ public boolean getHbaseIndexUpdatePartitionPath() {
+ return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_UPDATE_PARTITION_PATH));
+ }
+
public int getBloomIndexParallelism() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 21efd9b..072c71c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -22,6 +22,7 @@
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -188,6 +189,7 @@
hoodieRecordIterator) -> {
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
+ boolean updatePartitionPath = config.getHbaseIndexUpdatePartitionPath();
// Grab the global HBase connection
synchronized (SparkHoodieHBaseIndex.class) {
@@ -205,35 +207,51 @@
statements.add(generateStatement(rec.getRecordKey()));
currentBatchOfRecords.add(rec);
// iterator till we reach batch size
- if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
- // get results for batch from Hbase
- Result[] results = doGet(hTable, statements);
- // clear statements to be GC'd
- statements.clear();
- for (Result result : results) {
- // first, attempt to grab location from HBase
- HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
- if (result.getRow() != null) {
- String keyFromResult = Bytes.toString(result.getRow());
- String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
- String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
- String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
-
- if (checkIfValidCommit(metaClient, commitTs)) {
- currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
- currentRecord.getData());
- currentRecord.unseal();
- currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
- currentRecord.seal();
- taggedRecords.add(currentRecord);
- // the key from Result and the key being processed should be same
- assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
- } else { // if commit is invalid, treat this as a new taggedRecord
- taggedRecords.add(currentRecord);
- }
- } else {
- taggedRecords.add(currentRecord);
- }
+ if (hoodieRecordIterator.hasNext() && statements.size() < multiGetBatchSize) {
+ continue;
+ }
+ // get results for batch from Hbase
+ Result[] results = doGet(hTable, statements);
+ // clear statements to be GC'd
+ statements.clear();
+ for (Result result : results) {
+ // first, attempt to grab location from HBase
+ HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
+ if (result.getRow() == null) {
+ taggedRecords.add(currentRecord);
+ continue;
+ }
+ String keyFromResult = Bytes.toString(result.getRow());
+ String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
+ String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
+ String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
+ if (!checkIfValidCommit(metaClient, commitTs)) {
+ // if commit is invalid, treat this as a new taggedRecord
+ taggedRecords.add(currentRecord);
+ continue;
+ }
+ // check whether to do partition change processing
+ if (updatePartitionPath && !partitionPath.equals(currentRecord.getPartitionPath())) {
+ // delete partition old data record
+ HoodieRecord emptyRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
+ new EmptyHoodieRecordPayload());
+ emptyRecord.unseal();
+ emptyRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+ emptyRecord.seal();
+ // insert partition new data record
+ currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()),
+ currentRecord.getData());
+ taggedRecords.add(emptyRecord);
+ taggedRecords.add(currentRecord);
+ } else {
+ currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
+ currentRecord.getData());
+ currentRecord.unseal();
+ currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+ currentRecord.seal();
+ taggedRecords.add(currentRecord);
+ // the key from Result and the key being processed should be same
+ assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index b858516..b74daad 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -20,6 +20,8 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -58,6 +60,7 @@
import org.junit.jupiter.api.TestMethodOrder;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
@@ -169,6 +172,59 @@
}
@Test
+ public void testTagLocationAndPartitionPathUpdate() throws Exception {
+ final String newCommitTime = "001";
+ final int numRecords = 10;
+ final String oldPartitionPath = "1970/01/01";
+ final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
+
+ List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, numRecords);
+ List<HoodieRecord> oldRecords = new LinkedList();
+ for (HoodieRecord newRecord: newRecords) {
+ HoodieKey key = new HoodieKey(newRecord.getRecordKey(), oldPartitionPath);
+ HoodieRecord hoodieRecord = new HoodieRecord(key, newRecord.getData());
+ oldRecords.add(hoodieRecord);
+ }
+
+ JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
+ JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
+
+ HoodieWriteConfig config = getConfig(true);
+ SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true));
+
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
+ // allowed path change test
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+
+ JavaRDD<HoodieRecord> oldHoodieRecord = index.tagLocation(oldWriteRecords, context, hoodieTable);
+ assertEquals(0, oldHoodieRecord.filter(record -> record.isCurrentLocationKnown()).count());
+ writeClient.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords, newCommitTime);
+ writeClient.commit(newCommitTime, writeStatues);
+ assertNoWriteErrors(writeStatues.collect());
+ index.updateLocation(writeStatues, context, hoodieTable);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+ List<HoodieRecord> taggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
+ assertEquals(numRecords * 2L, taggedRecords.stream().count());
+ // Verify the number of deleted records
+ assertEquals(numRecords, taggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
+ && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count());
+ // Verify the number of inserted records
+ assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
+
+ // not allowed path change test
+ index = new SparkHoodieHBaseIndex<>(getConfig(false));
+ List<HoodieRecord> notAllowPathChangeRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
+ assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
+ assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> hoodieRecord.isCurrentLocationKnown()
+ && hoodieRecord.getKey().getPartitionPath().equals(oldPartitionPath)).count());
+ }
+ }
+
+ @Test
public void testTagLocationAndDuplicateUpdate() throws Exception {
final String newCommitTime = "001";
final int numRecords = 10;
@@ -454,14 +510,18 @@
}
private HoodieWriteConfig getConfig() {
- return getConfigBuilder(100).build();
+ return getConfigBuilder(100, false).build();
}
private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
- return getConfigBuilder(hbaseIndexBatchSize).build();
+ return getConfigBuilder(hbaseIndexBatchSize, false).build();
}
- private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
+ private HoodieWriteConfig getConfig(boolean updatePartitionPath) {
+ return getConfigBuilder(100, updatePartitionPath).build();
+ }
+
+ private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath) {
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1).withDeleteParallelism(1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
@@ -475,6 +535,7 @@
.hbaseIndexPutBatchSizeAutoCompute(true)
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
+ .hbaseIndexUpdatePartitionPath(updatePartitionPath)
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
.build());
}