[HUDI-988] Fix More Unit Test Flakiness
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 8e94857..41fb16c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -33,9 +33,9 @@
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.compact.OperationResult;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,13 +67,6 @@
     client = new CompactionAdminClient(jsc, basePath);
   }
 
-  @After
-  public void tearDown() {
-    client.close();
-    metaClient = null;
-    cleanupSparkContexts();
-  }
-
   @Test
   public void testUnscheduleCompactionPlan() throws Exception {
     int numEntriesPerInstant = 10;
@@ -273,10 +266,10 @@
         new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
     // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
     newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
-        .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
-          Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
-          Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
-        });
+            .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
+              Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
+              Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
+            });
 
     // Ensure same number of log-files before and after renaming per fileId
     Map<String, Long> fileIdToCountsAfterRenaming =
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 8d3fa13..24ecc8e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -63,9 +63,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupSparkContexts();
-    cleanupDFS();
-    cleanupTestDataGenerator();
+    cleanupResources();
   }
 
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index ab6e940..de853f5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -60,8 +60,7 @@
 
   @After
   public void tearDown() throws IOException {
-    cleanupSparkContexts();
-    cleanupFileSystem();
+    cleanupResources();
   }
 
   //@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4c7b890..54e6582 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -23,16 +23,21 @@
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.table.HoodieTable;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.slf4j.Logger;
@@ -58,7 +63,10 @@
   protected transient ExecutorService executorService;
   protected transient HoodieTableMetaClient metaClient;
   private static AtomicInteger instantGen = new AtomicInteger(1);
-  protected transient HoodieWriteClient client;
+  protected transient HoodieWriteClient writeClient;
+  protected transient HoodieReadClient readClient;
+  protected transient HoodieTableFileSystemView tableView;
+  protected transient HoodieTable hoodieTable;
 
   public String getNextInstant() {
     return String.format("%09d", instantGen.getAndIncrement());
@@ -89,6 +97,9 @@
     cleanupSparkContexts();
     cleanupTestDataGenerator();
     cleanupFileSystem();
+    cleanupDFS();
+    cleanupExecutorService();
+    System.gc();
   }
 
   /**
@@ -156,6 +167,7 @@
     if (fs != null) {
       LOG.warn("Closing file-system instance used in previous test-run");
       fs.close();
+      fs = null;
     }
   }
 
@@ -175,13 +187,22 @@
   }
 
   /**
-   * Cleanups table type.
+   * Cleanups hoodie clients.
    */
-  protected void cleanupClients() {
-    metaClient = null;
-    if (null != client) {
-      client.close();
-      client = null;
+  protected void cleanupClients() throws IOException {
+    if (metaClient != null) {
+      metaClient = null;
+    }
+    if (readClient != null) {
+      readClient = null;
+    }
+    if (writeClient != null) {
+      writeClient.close();
+      writeClient = null;
+    }
+    if (tableView != null) {
+      tableView.close();
+      tableView = null;
     }
   }
 
@@ -196,7 +217,9 @@
    * Cleanups test data generator.
    */
   protected void cleanupTestDataGenerator() {
-    dataGen = null;
+    if (dataGen != null) {
+      dataGen = null;
+    }
   }
 
   /**
@@ -272,16 +295,32 @@
   }
 
   public HoodieReadClient getHoodieReadClient(String basePath) {
-    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    return readClient;
   }
 
   public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
       HoodieIndex index) {
-    if (null != client) {
-      client.close();
-      client = null;
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
     }
-    client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-    return client;
+    writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+    return writeClient;
+  }
+
+  public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
+    metaClient = new HoodieTableMetaClient(conf, basePath);
+    return metaClient;
+  }
+
+  public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+      FileStatus[] fileStatuses) {
+    if (tableView == null) {
+      tableView =  new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+    } else {
+      tableView.init(metaClient, visibleActiveTimeline, fileStatuses);
+    }
+    return tableView;
   }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
index 8fd418a..5a626c7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
@@ -52,7 +52,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupTestDataGenerator();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index acd2ec1..25f5a59 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -69,8 +69,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupTestDataGenerator();
-    cleanupExecutorService();
+    cleanupResources();
   }
 
   // Test to ensure that we are reading all records from queue iterator in the same order
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index b97fefc..913ab7b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -31,21 +31,20 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+
 import static org.junit.Assert.assertTrue;
 
 public class TestHoodieIndex extends HoodieClientTestHarness {
 
   @Before
   public void setUp() throws Exception {
-    initSparkContexts("TestHoodieIndex");
-    initPath();
-    initMetaClient();
+    initResources();
   }
 
   @After
-  public void tearDown() {
-    cleanupSparkContexts();
-    cleanupClients();
+  public void tearDown() throws IOException {
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 105b0e8..09b5782 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -105,9 +105,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupSparkContexts();
-    cleanupFileSystem();
-    cleanupClients();
+    cleanupResources();
   }
 
   private HoodieWriteConfig makeConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 55d4526..1065c23 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -78,9 +78,8 @@
   }
 
   @After
-  public void tearDown() {
-    cleanupSparkContexts();
-    cleanupClients();
+  public void tearDown() throws IOException {
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 0972385c..3409eea 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -67,8 +67,7 @@
 
   @After
   public void clean() throws IOException {
-    cleanupDFS();
-    cleanupSparkContexts();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 7fd02bc..fe99816 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -65,11 +65,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupFileSystem();
-    cleanupTestDataGenerator();
-    cleanupSparkContexts();
-    cleanupClients();
-    cleanupFileSystem();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index cc78a64..f5baf37 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -42,7 +42,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupFileSystem();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 99d9b2b..7f1e538 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload;
@@ -219,7 +219,7 @@
       if (file.getName().endsWith(".parquet")) {
         if (FSUtils.getFileId(file.getName()).equals(FSUtils.getFileId(parquetFile.getName()))
             && HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()),
-                FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
+            FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
           updatedParquetFile = file;
           break;
         }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index fdc8b27..3cbc48e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -18,17 +18,17 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieMergeOnReadTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -94,10 +94,7 @@
 
   @After
   public void clean() throws IOException {
-    cleanupDFS();
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupClients();
+    cleanupResources();
   }
 
   @Test
@@ -128,13 +125,14 @@
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+
       BaseFileOnlyView roView =
           new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
       assertTrue(!dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -167,8 +165,9 @@
       client.compact(compactionCommitTime);
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
       // verify that there is a commit
@@ -225,7 +224,7 @@
       List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -236,13 +235,12 @@
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -278,11 +276,11 @@
       assertFalse(commit.isPresent());
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+      List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
       List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
       // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
       assertEquals("Must contain 0 records", 0, recordsRead.size());
@@ -311,7 +309,7 @@
       // verify there are no errors
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertTrue(commit.isPresent());
       assertEquals("commit should be 001", "001", commit.get().getTimestamp());
@@ -337,11 +335,10 @@
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      HoodieTableFileSystemView roView =
-          new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
 
       final String absentCommit = newCommitTime;
-      assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
+      assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
     }
   }
 
@@ -366,7 +363,7 @@
       List<WriteStatus> statuses = writeStatusJavaRDD.collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -377,13 +374,13 @@
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+      tableView =
+          getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(!dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -399,7 +396,7 @@
         copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-        List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         assertEquals(recordsRead.size(), 200);
 
@@ -413,7 +410,7 @@
         // After rollback, there should be no parquet file with the failed commit time
         Assert.assertEquals(Arrays.stream(allFiles)
             .filter(file -> file.getPath().getName().contains(commitTime1)).count(), 0);
-        dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         assertEquals(recordsRead.size(), 200);
       }
@@ -429,7 +426,7 @@
         copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
 
-        List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         assertEquals(recordsRead.size(), 200);
 
@@ -449,8 +446,8 @@
 
         metaClient = HoodieTableMetaClient.reload(metaClient);
         hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
-        roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-        dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+        dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         // check that the number of records read is still correct after rollback operation
         assertEquals(recordsRead.size(), 200);
@@ -476,20 +473,20 @@
 
         allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
-        roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+        tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
         final String compactedCommitTime =
             metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
 
-        assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+        assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
 
         thirdClient.rollback(compactedCommitTime);
 
         allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
-        roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+        tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
-        assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+        assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
       }
     }
   }
@@ -513,7 +510,7 @@
       List<WriteStatus> statuses = writeStatusJavaRDD.collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -524,13 +521,12 @@
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("Should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -546,7 +542,7 @@
       copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
       copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+      List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
       List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
       assertEquals(recordsRead.size(), 200);
 
@@ -604,12 +600,12 @@
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
       final String compactedCommitTime =
           metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
 
-      assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+      assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
 
       /**
        * Write 5 (updates)
@@ -631,12 +627,10 @@
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
-      SliceView rtView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       List<HoodieFileGroup> fileGroups =
           ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
       assertTrue(fileGroups.isEmpty());
@@ -678,7 +672,7 @@
       List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -689,13 +683,13 @@
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient,
+      BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
           metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
       Map<String, Long> parquetFileIdToSize =
           dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       dataFilesToRead = roView.getLatestBaseFiles();
       List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
       assertTrue("Should list the parquet files we wrote in the delta commit",
@@ -723,7 +717,7 @@
       assertFalse(commit.isPresent());
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      roView = new HoodieTableFileSystemView(metaClient,
+      roView = getHoodieTableFileSystemView(metaClient,
           hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
       dataFilesToRead = roView.getLatestBaseFiles();
       List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
@@ -752,7 +746,7 @@
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
 
       newCommitTime = "101";
       writeClient.startCommitWithTime(newCommitTime);
@@ -887,7 +881,7 @@
       // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
       // and calling rollback twice
       final String lastCommitTime = newCommitTime;
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
           .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
       String fileName = last.getFileName();
@@ -980,7 +974,7 @@
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
       // Create a commit without rolling stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index 1a366a4..2a19d2c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -542,7 +542,7 @@
   private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
     FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
     HoodieTableFileSystemView view =
-        new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+        getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
     return view.getLatestBaseFiles().collect(Collectors.toList());
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 86a2e1f..3e36d43 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -75,10 +75,7 @@
 
   @After
   public void tearDown() throws Exception {
-    cleanupFileSystem();
-    cleanupTestDataGenerator();
-    cleanupSparkContexts();
-    cleanupClients();
+    cleanupResources();
   }
 
   private HoodieWriteConfig getConfig() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 1f7165b..3ba021b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -90,6 +90,12 @@
     super.init(metaClient, visibleActiveTimeline);
   }
 
+  public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+      FileStatus[] fileStatuses) {
+    init(metaClient, visibleActiveTimeline);
+    addFilesToView(fileStatuses);
+  }
+
   @Override
   protected void resetViewState() {
     this.fgIdToPendingCompaction = null;
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index 5538d66..4410193 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -117,7 +117,7 @@
       synchronized (view) {
         if (isLocalViewBehind(ctx)) {
           HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
-          LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+          LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
               + " as last known instant but server has the folling timeline :"
               + localTimeline.getInstants().collect(Collectors.toList()));
           view.sync();
diff --git a/pom.xml b/pom.xml
index 6370c37..f1081b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,7 +242,7 @@
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
           <skip>${skipUTs}</skip>
-          <argLine>-Xmx4g</argLine>
+          <argLine>-Xmx2g</argLine>
           <systemPropertyVariables>
             <log4j.configuration>
               ${surefire-log4j.file}