[HUDI-988] Fix More Unit Test Flakiness
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 8e94857..41fb16c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -33,9 +33,9 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.compact.OperationResult;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -67,13 +67,6 @@
client = new CompactionAdminClient(jsc, basePath);
}
- @After
- public void tearDown() {
- client.close();
- metaClient = null;
- cleanupSparkContexts();
- }
-
@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
@@ -273,10 +266,10 @@
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
- .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
- Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
- Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
- });
+ .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
+ Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
+ Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
+ });
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 8d3fa13..24ecc8e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -63,9 +63,7 @@
@After
public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupDFS();
- cleanupTestDataGenerator();
+ cleanupResources();
}
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index ab6e940..de853f5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -60,8 +60,7 @@
@After
public void tearDown() throws IOException {
- cleanupSparkContexts();
- cleanupFileSystem();
+ cleanupResources();
}
//@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4c7b890..54e6582 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -23,16 +23,21 @@
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.table.HoodieTable;
+
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
@@ -58,7 +63,10 @@
protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1);
- protected transient HoodieWriteClient client;
+ protected transient HoodieWriteClient writeClient;
+ protected transient HoodieReadClient readClient;
+ protected transient HoodieTableFileSystemView tableView;
+ protected transient HoodieTable hoodieTable;
public String getNextInstant() {
return String.format("%09d", instantGen.getAndIncrement());
@@ -89,6 +97,9 @@
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
+ cleanupDFS();
+ cleanupExecutorService();
+ System.gc();
}
/**
@@ -156,6 +167,7 @@
if (fs != null) {
LOG.warn("Closing file-system instance used in previous test-run");
fs.close();
+ fs = null;
}
}
@@ -175,13 +187,22 @@
}
/**
- * Cleanups table type.
+ * Cleanups hoodie clients.
*/
- protected void cleanupClients() {
- metaClient = null;
- if (null != client) {
- client.close();
- client = null;
+ protected void cleanupClients() throws IOException {
+ if (metaClient != null) {
+ metaClient = null;
+ }
+ if (readClient != null) {
+ readClient = null;
+ }
+ if (writeClient != null) {
+ writeClient.close();
+ writeClient = null;
+ }
+ if (tableView != null) {
+ tableView.close();
+ tableView = null;
}
}
@@ -196,7 +217,9 @@
* Cleanups test data generator.
*/
protected void cleanupTestDataGenerator() {
- dataGen = null;
+ if (dataGen != null) {
+ dataGen = null;
+ }
}
/**
@@ -272,16 +295,32 @@
}
public HoodieReadClient getHoodieReadClient(String basePath) {
- return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+ readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+ return readClient;
}
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
HoodieIndex index) {
- if (null != client) {
- client.close();
- client = null;
+ if (null != writeClient) {
+ writeClient.close();
+ writeClient = null;
}
- client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
- return client;
+ writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+ return writeClient;
+ }
+
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
+ metaClient = new HoodieTableMetaClient(conf, basePath);
+ return metaClient;
+ }
+
+ public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+ FileStatus[] fileStatuses) {
+ if (tableView == null) {
+ tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+ } else {
+ tableView.init(metaClient, visibleActiveTimeline, fileStatuses);
+ }
+ return tableView;
}
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
index 8fd418a..5a626c7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
@@ -52,7 +52,7 @@
@After
public void tearDown() throws Exception {
- cleanupTestDataGenerator();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index acd2ec1..25f5a59 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -69,8 +69,7 @@
@After
public void tearDown() throws Exception {
- cleanupTestDataGenerator();
- cleanupExecutorService();
+ cleanupResources();
}
// Test to ensure that we are reading all records from queue iterator in the same order
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index b97fefc..913ab7b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -31,21 +31,20 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+
import static org.junit.Assert.assertTrue;
public class TestHoodieIndex extends HoodieClientTestHarness {
@Before
public void setUp() throws Exception {
- initSparkContexts("TestHoodieIndex");
- initPath();
- initMetaClient();
+ initResources();
}
@After
- public void tearDown() {
- cleanupSparkContexts();
- cleanupClients();
+ public void tearDown() throws IOException {
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 105b0e8..09b5782 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -105,9 +105,7 @@
@After
public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupFileSystem();
- cleanupClients();
+ cleanupResources();
}
private HoodieWriteConfig makeConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 55d4526..1065c23 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -78,9 +78,8 @@
}
@After
- public void tearDown() {
- cleanupSparkContexts();
- cleanupClients();
+ public void tearDown() throws IOException {
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 0972385c..3409eea 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -67,8 +67,7 @@
@After
public void clean() throws IOException {
- cleanupDFS();
- cleanupSparkContexts();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 7fd02bc..fe99816 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -65,11 +65,7 @@
@After
public void tearDown() throws Exception {
- cleanupFileSystem();
- cleanupTestDataGenerator();
- cleanupSparkContexts();
- cleanupClients();
- cleanupFileSystem();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index cc78a64..f5baf37 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -42,7 +42,7 @@
@After
public void tearDown() throws Exception {
- cleanupFileSystem();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 99d9b2b..7f1e538 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,8 +18,8 @@
package org.apache.hudi.table;
-import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload;
@@ -219,7 +219,7 @@
if (file.getName().endsWith(".parquet")) {
if (FSUtils.getFileId(file.getName()).equals(FSUtils.getFileId(parquetFile.getName()))
&& HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()),
- FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
+ FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
updatedParquetFile = file;
break;
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index fdc8b27..3cbc48e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -18,17 +18,17 @@
package org.apache.hudi.table;
-import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieMergeOnReadTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -94,10 +94,7 @@
@After
public void clean() throws IOException {
- cleanupDFS();
- cleanupSparkContexts();
- cleanupTestDataGenerator();
- cleanupClients();
+ cleanupResources();
}
@Test
@@ -128,13 +125,14 @@
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+
BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
@@ -167,8 +165,9 @@
client.compact(compactionCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
@@ -225,7 +224,7 @@
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -236,13 +235,12 @@
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
@@ -278,11 +276,11 @@
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
- List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals("Must contain 0 records", 0, recordsRead.size());
@@ -311,7 +309,7 @@
// verify there are no errors
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent());
assertEquals("commit should be 001", "001", commit.get().getTimestamp());
@@ -337,11 +335,10 @@
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- HoodieTableFileSystemView roView =
- new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
- assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
+ assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
}
}
@@ -366,7 +363,7 @@
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -377,13 +374,13 @@
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ tableView =
+ getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
@@ -399,7 +396,7 @@
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
- List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -413,7 +410,7 @@
// After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count(), 0);
- dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
}
@@ -429,7 +426,7 @@
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
- List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -449,8 +446,8 @@
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
assertEquals(recordsRead.size(), 200);
@@ -476,20 +473,20 @@
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
- roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
- assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
- roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
- assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
}
}
}
@@ -513,7 +510,7 @@
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -524,13 +521,12 @@
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue("Should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
@@ -546,7 +542,7 @@
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
- List<String> dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -604,12 +600,12 @@
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
- roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
- assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
/**
* Write 5 (updates)
@@ -631,12 +627,10 @@
metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- SliceView rtView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List<HoodieFileGroup> fileGroups =
((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
assertTrue(fileGroups.isEmpty());
@@ -678,7 +672,7 @@
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -689,13 +683,13 @@
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient,
+ BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
Map<String, Long> parquetFileIdToSize =
dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue("Should list the parquet files we wrote in the delta commit",
@@ -723,7 +717,7 @@
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- roView = new HoodieTableFileSystemView(metaClient,
+ roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
@@ -752,7 +746,7 @@
writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
@@ -887,7 +881,7 @@
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice
final String lastCommitTime = newCommitTime;
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName();
@@ -980,7 +974,7 @@
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
// Create a commit without rolling stats in metadata to test backwards compatibility
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index 1a366a4..2a19d2c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -542,7 +542,7 @@
private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView view =
- new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+ getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
return view.getLatestBaseFiles().collect(Collectors.toList());
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 86a2e1f..3e36d43 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -75,10 +75,7 @@
@After
public void tearDown() throws Exception {
- cleanupFileSystem();
- cleanupTestDataGenerator();
- cleanupSparkContexts();
- cleanupClients();
+ cleanupResources();
}
private HoodieWriteConfig getConfig() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 1f7165b..3ba021b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -90,6 +90,12 @@
super.init(metaClient, visibleActiveTimeline);
}
+ public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+ FileStatus[] fileStatuses) {
+ init(metaClient, visibleActiveTimeline);
+ addFilesToView(fileStatuses);
+ }
+
@Override
protected void resetViewState() {
this.fgIdToPendingCompaction = null;
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index 5538d66..4410193 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -117,7 +117,7 @@
synchronized (view) {
if (isLocalViewBehind(ctx)) {
HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
- LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+ LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+ " as last known instant but server has the folling timeline :"
+ localTimeline.getInstants().collect(Collectors.toList()));
view.sync();
diff --git a/pom.xml b/pom.xml
index 6370c37..f1081b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,7 +242,7 @@
<version>${maven-surefire-plugin.version}</version>
<configuration>
<skip>${skipUTs}</skip>
- <argLine>-Xmx4g</argLine>
+ <argLine>-Xmx2g</argLine>
<systemPropertyVariables>
<log4j.configuration>
${surefire-log4j.file}