Fix HBASE index MOR tables not considering record index valid
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index aab12be..4540f2f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -177,13 +176,11 @@
}
private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
- HoodieTimeline commitTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
+ HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
// Check if the last commit ts for this row is 1) present in the timeline or
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty()
- && (commitTimeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))
- || HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, commitTs
- ));
+ && commitTimeline.containsOrBeforeTimelineStarts(commitTs);
}
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index 7339855..20406cd 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -21,9 +21,11 @@
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
@@ -116,7 +118,17 @@
}
@Test
- public void testSimpleTagLocationAndUpdate() throws Exception {
+ public void testSimpleTagLocationAndUpdateCOW() throws Exception {
+ testSimpleTagLocationAndUpdate(HoodieTableType.COPY_ON_WRITE);
+ }
+
+ @Test void testSimpleTagLocationAndUpdateMOR() throws Exception {
+ testSimpleTagLocationAndUpdate(HoodieTableType.MERGE_ON_READ);
+ }
+
+ public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception {
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath(), tableType);
+
final String newCommitTime = "001";
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
@@ -138,8 +150,7 @@
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
- // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
- // commit
+ // Now tagLocation for these records, hbaseIndex should not tag them since commit never occurred
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());