[HUDI-544] Archived commits command code cleanup (#1242)

* Archived commits command code cleanup
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index c531eea..1dc925b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -25,6 +25,7 @@
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -63,7 +64,7 @@
       throws IOException {
     System.out.println("===============> Showing only " + limit + " archived commits <===============");
     String basePath = HoodieCLI.getTableMetaClient().getBasePath();
-    Path archivePath = new Path(basePath + "/.hoodie/.commits_.archive*");
+    Path archivePath = new Path(HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
     if (folder != null && !folder.isEmpty()) {
       archivePath = new Path(basePath + "/.hoodie/" + folder);
     }
@@ -138,9 +139,11 @@
       throws IOException {
 
     System.out.println("===============> Showing only " + limit + " archived commits <===============");
-    String basePath = HoodieCLI.getTableMetaClient().getBasePath();
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    String basePath = metaClient.getBasePath();
+    Path archivePath = new Path(metaClient.getArchivePath() + "/.commits_.archive*");
     FileStatus[] fsStatuses =
-        FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
+        FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
     List<Comparable[]> allCommits = new ArrayList<>();
     for (FileStatus fs : fsStatuses) {
       // read the archived file
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index aba1d54..e432f9d 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -132,6 +132,13 @@
     String hdfsPath = "/" + hiveTableName;
     String hdfsUrl = HDFS_BASE_URL + hdfsPath;
 
+    // Delete hdfs path if it exists
+    try {
+      executeCommandStringInDocker(ADHOC_1_CONTAINER, "hdfs dfs -rm -r " + hdfsUrl, true);
+    } catch (AssertionError ex) {
+      // Path not exists, pass
+    }
+
     // Drop Table if it exists
     try {
       dropHiveTables(hiveTableName, tableType);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 450bd73..8cab7c1 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -29,8 +29,11 @@
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
 import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.model.HoodieRecordPayload
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.WriteOperationType
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
@@ -107,8 +110,10 @@
       handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
       // Create the table if not present
       if (!tableExists) {
+        val archiveLogFolder = parameters.getOrElse(
+          HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
         val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
-          HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
+          HoodieTableType.valueOf(tableType), tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
           null.asInstanceOf[String])
         tableConfig = tableMetaClient.getTableConfig
       }
@@ -244,8 +249,10 @@
     }
 
     if (!tableExists) {
+      val archiveLogFolder = parameters.getOrElse(
+        HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
       HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
-        HoodieTableType.valueOf(tableType), tableName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
+        HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
         null, bootstrapIndexClass, bootstrapBasePath)
     }