[HOTFIX] Avoid cleaning segments after reading tablestatus failed

Why is this PR needed?
1. After reading tablestatus file failed, method TableProcessingOperations.
deletePartialLoadDataIfExist will delete all related segments.
2. If the tablestatus file was removed, the system will delete all original
segment before loading.

What changes were proposed in this PR?
1. check the result of reading tablestatus file, if it is empty, no need to process.
2. if the table status was removed, the system shouldn't delete any segment.
3. re-factory code to avoid invoking getAbsolutePath method too many times.

This closes #3899
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index da7403b..7482a1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -41,7 +41,7 @@
   private static final String DICTIONARY_EXT = ".dict";
   public static final String SCHEMA_FILE = "schema";
   private static final String FACT_DIR = "Fact";
-  private static final String SEGMENT_PREFIX = "Segment_";
+  public static final String SEGMENT_PREFIX = "Segment_";
   private static final String PARTITION_PREFIX = "Part";
   private static final String DATA_PART_PREFIX = "part-";
   public static final String BATCH_PREFIX = "_batchno";
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 6ae9e6b..aaaa88d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -19,25 +19,25 @@
 
 import java.math.BigDecimal
 
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.RandomStringUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.util.BadRecordUtil
-import org.apache.commons.io.FileUtils
-import org.apache.commons.lang3.RandomStringUtils
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
 
   val badRecordAction = CarbonProperties.getInstance()
     .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION);
-  val testdata =s"$resourcesPath/MoreThan32KChar.csv"
+  val testdata = s"$resourcesPath/MoreThan32KChar.csv"
   val longChar: String = RandomStringUtils.randomAlphabetic(33000)
 
   override def beforeEach {
@@ -310,7 +310,8 @@
     val tableStatusFile = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
     FileFactory.getCarbonFile(tableStatusFile).delete()
     sql("insert into stale values('k')")
-    checkAnswer(sql("select * from stale"), Row("k"))
+    // if table lose tablestatus file, the system should keep all data.
+    checkAnswer(sql("select * from stale"), Seq(Row("k"), Row("k")))
   }
 
   test("test data loading with directly writing fact data to hdfs") {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index f0f14d6..4aba751 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -19,13 +19,17 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -46,45 +50,58 @@
       LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
 
   /**
-   *
-   * @param carbonTable
-   * @param isCompactionFlow
-   * @throws IOException
+   * delete folder which metadata no exist in tablestatus
+   * this method don't check tablestatus history.
    */
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
     String metaDataLocation = carbonTable.getMetadataPath();
-    //delete folder which metadata no exist in tablestatus
     String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
     if (FileFactory.isFileExist(partitionPath)) {
-      final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
-      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile path) {
-          String segmentId =
-              CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
-          boolean found = false;
-          for (int j = 0; j < details.length; j++) {
-            if (details[j].getLoadName().equals(segmentId)) {
-              found = true;
-              break;
+      // list all segments before reading tablestatus file.
+      CarbonFile[] allSegments = FileFactory.getCarbonFile(partitionPath).listFiles();
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) {
+        return;
+      }
+      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+      // there is no segment or failed to read tablestatus file.
+      // so it should stop immediately.
+      if (details == null || details.length == 0) {
+        return;
+      }
+      Set<String> metadataSet = new HashSet<>(details.length);
+      for (LoadMetadataDetails detail : details) {
+        metadataSet.add(detail.getLoadName());
+      }
+      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+      for (CarbonFile segment : allSegments) {
+        String segmentName = segment.getName();
+        // check segment folder pattern
+        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+          if (parts.length == 2) {
+            boolean isOriginal = !parts[1].contains(".");
+            if (isCompactionFlow) {
+              // in compaction flow, it should be big segment and segment metadata is not exists
+              if (!isOriginal && !metadataSet.contains(parts[1])) {
+                staleSegments.add(segment);
+              }
+            } else {
+              // in loading flow, it should be original segment and segment metadata is not exists
+              if (isOriginal && !metadataSet.contains(parts[1])) {
+                staleSegments.add(segment);
+              }
             }
           }
-          return !found;
         }
-      });
-      for (int k = 0; k < listFiles.length; k++) {
-        String segmentId = CarbonTablePath.DataFileUtil
-            .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
-        if (isCompactionFlow) {
-          if (segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
-        } else {
-          if (!segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
+      }
+      // delete segment one by one
+      for (CarbonFile staleSegment : staleSegments) {
+        try {
+          CarbonUtil.deleteFoldersAndFiles(staleSegment);
+        } catch (IOException | InterruptedException e) {
+          LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
         }
       }
     }