[CARBONDATA-3582] support table status file backup

When overwriting table status file, if process crashed, table status
file will be in corrupted state. This can happen in an unstable
environment, like in the cloud. To prevent the table corruption, user
can enable a newly added CarbonProperty to enable backup of the table
status before overwriting it.

New CarbonProperty: ENABLE_TABLE_STATUS_BACKUP (default is false)
When enabling this property, "tablestatus.backup" file will be created
in the same folder of "tablestatus" file

This closes #3459
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index bd60a16..2e3e7d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1296,6 +1296,17 @@
   public static final String ENABLE_VECTOR_READER_DEFAULT = "true";
 
   /**
+   * In cloud object store scenario, overwriting table status file is not an atomic
+   * operation since it uses rename API. Thus, it is possible that table status is corrupted
+   * if process crashed when overwriting the table status file.
+   * To protect from file corruption, user can enable this property.
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String ENABLE_TABLE_STATUS_BACKUP = "carbon.enable.tablestatus.backup";
+
+  public static final String ENABLE_TABLE_STATUS_BACKUP_DEFAULT = "false";
+
+  /**
    * property to set is IS_DRIVER_INSTANCE
    */
   @CarbonProperty
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index bc52082..2b76db1 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -35,6 +35,7 @@
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -254,18 +255,23 @@
     }
   }
 
-  public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
-      throws IOException {
-    Gson gsonObjectToRead = new Gson();
+  /**
+   * Read file and return its content as string
+   *
+   * @param tableStatusPath path of the table status to read
+   * @return file content, null is file does not exist
+   * @throws IOException if IO errors
+   */
+  private static String readFileAsString(String tableStatusPath) throws IOException {
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
-    LoadMetadataDetails[] loadFolderDetails = null;
+
     AtomicFileOperations fileOperation =
         AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath);
 
     if (!FileFactory.isFileExist(tableStatusPath)) {
-      return new LoadMetadataDetails[0];
+      return null;
     }
 
     // When storing table status file in object store, reading of table status file may
@@ -277,8 +283,7 @@
         dataInputStream = fileOperation.openForRead();
         inStream = new InputStreamReader(dataInputStream, Charset.forName(DEFAULT_CHARSET));
         buffReader = new BufferedReader(inStream);
-        loadFolderDetails = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
-        retry = 0;
+        return buffReader.readLine();
       } catch (EOFException ex) {
         retry--;
         if (retry == 0) {
@@ -299,13 +304,23 @@
         closeStreams(buffReader, inStream, dataInputStream);
       }
     }
+    return null;
+  }
 
-    // if listOfLoadFolderDetailsArray is null, return empty array
-    if (null == loadFolderDetails) {
+  /**
+   * Read table status file and decoded to segment meta arrays
+   *
+   * @param tableStatusPath table status file path
+   * @return segment metadata
+   * @throws IOException if IO errors
+   */
+  public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
+      throws IOException {
+    String content = readFileAsString(tableStatusPath);
+    if (content == null) {
       return new LoadMetadataDetails[0];
     }
-
-    return loadFolderDetails;
+    return new Gson().fromJson(content, LoadMetadataDetails[].class);
   }
 
   /**
@@ -314,7 +329,7 @@
    * @param loadMetadataDetails
    * @return
    */
-  public static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
+  private static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
     int newSegmentId = -1;
     for (int i = 0; i < loadMetadataDetails.length; i++) {
       try {
@@ -525,45 +540,78 @@
   }
 
   /**
-   * writes load details into a given file at @param dataLoadLocation
+   * Backup the table status file as 'tablestatus.backup' in the same path
    *
-   * @param dataLoadLocation
-   * @param listOfLoadFolderDetailsArray
-   * @throws IOException
+   * @param tableStatusPath table status file path
    */
-  public static void writeLoadDetailsIntoFile(String dataLoadLocation,
+  private static void backupTableStatus(String tableStatusPath) throws IOException {
+    CarbonFile file = FileFactory.getCarbonFile(tableStatusPath);
+    if (file.exists()) {
+      String backupPath = tableStatusPath + ".backup";
+      String currentContent = readFileAsString(tableStatusPath);
+      if (currentContent != null) {
+        writeStringIntoFile(backupPath, currentContent);
+      }
+    }
+  }
+
+  /**
+   * writes load details to specified path
+   *
+   * @param tableStatusPath path of the table status file
+   * @param listOfLoadFolderDetailsArray segment metadata
+   * @throws IOException if IO errors
+   */
+  public static void writeLoadDetailsIntoFile(
+      String tableStatusPath,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
-    AtomicFileOperations fileWrite =
-        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
+    // When overwriting table status file, if process crashed, table status file
+    // will be in corrupted state. This can happen in an unstable environment,
+    // like in the cloud. To prevent the table corruption, user can enable following
+    // property to enable backup of the table status before overwriting it.
+    if (tableStatusPath.endsWith(CarbonTablePath.TABLE_STATUS_FILE) &&
+        CarbonProperties.isEnableTableStatusBackup()) {
+      backupTableStatus(tableStatusPath);
+    }
+    String content = new Gson().toJson(listOfLoadFolderDetailsArray);
+    mockForTest();
+    // make the table status file smaller by removing fields that are default value
+    for (LoadMetadataDetails loadMetadataDetails : listOfLoadFolderDetailsArray) {
+      loadMetadataDetails.removeUnnecessaryField();
+    }
+    // If process crashed during following write, table status file need to be
+    // manually recovered.
+    writeStringIntoFile(tableStatusPath, content);
+  }
+
+  // a dummy func for mocking in testcase, which simulates IOException
+  private static void mockForTest() throws IOException {
+  }
+
+  /**
+   * writes string content to specified path
+   *
+   * @param filePath path of the file to write
+   * @param content content to write
+   * @throws IOException if IO errors
+   */
+  private static void writeStringIntoFile(String filePath, String content) throws IOException {
+    AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(filePath);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
-    Gson gsonObjectToWrite = new Gson();
-    // write the updated data into the metadata file.
-
     try {
       dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(DEFAULT_CHARSET)));
-
-      // make the table status file smaller by removing fields that are default value
-      for (LoadMetadataDetails loadMetadataDetails : listOfLoadFolderDetailsArray) {
-        loadMetadataDetails.removeUnnecessaryField();
-      }
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
-      brWriter.write(metadataInstance);
+      brWriter = new BufferedWriter(new OutputStreamWriter(
+          dataOutputStream, Charset.forName(DEFAULT_CHARSET)));
+      brWriter.write(content);
     } catch (IOException ioe) {
-      LOG.error("Error message: " + ioe.getLocalizedMessage());
+      LOG.error("Write file failed: " + ioe.getLocalizedMessage());
       fileWrite.setFailed();
       throw ioe;
     } finally {
-      if (null != brWriter) {
-        brWriter.flush();
-      }
       CarbonUtil.closeStreams(brWriter);
       fileWrite.close();
     }
-
   }
 
   /**
@@ -637,7 +685,7 @@
    * @param invalidLoadTimestamps
    * @return invalidLoadTimestamps
    */
-  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
+  private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
       String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
       List<String> invalidLoadTimestamps, Long loadStartTime) {
     // For each load timestamp loop through data and if the
@@ -708,8 +756,7 @@
    * @param newMetadata
    * @return
    */
-
-  public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
+  private static List<LoadMetadataDetails> updateLatestTableStatusDetails(
       LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
 
     List<LoadMetadataDetails> newListMetadata =
@@ -727,7 +774,7 @@
    *
    * @param loadMetadata
    */
-  public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
+  private static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
     // update status only if the segment is not marked for delete
     if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
       loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
@@ -893,7 +940,7 @@
    * @param newList
    * @return
    */
-  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+  private static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
       LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
 
     List<LoadMetadataDetails> newListMetadata =
@@ -1024,7 +1071,9 @@
               // update the metadata details from old to new status.
               List<LoadMetadataDetails> latestStatus =
                   updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata);
-              writeLoadMetadata(identifier, latestStatus);
+              writeLoadDetailsIntoFile(
+                  CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()),
+                  latestStatus.toArray(new LoadMetadataDetails[0]));
             }
             updationCompletionStatus = true;
           } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c5e1dcc..35f11f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1132,6 +1132,11 @@
         CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).equalsIgnoreCase("true");
   }
 
+  public static boolean isEnableTableStatusBackup() {
+    return getInstance().getProperty(CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP,
+        CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP_DEFAULT).equalsIgnoreCase("true");
+  }
+
   /**
    * Validate the restrictions
    *
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 736670e..5df3c09 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -50,6 +50,7 @@
 | carbon.lock.retry.timeout.sec | 5 | Specifies the interval between the retries to obtain the lock for any operation other than load. **NOTE:** Refer to ***carbon.lock.retries*** for understanding why CarbonData uses locks for operations. |
 | carbon.fs.custom.file.provider | None | To support FileTypeInterface for configuring custom CarbonFile implementation to work with custom FileSystem. |
 | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures which day of the week to be considered as first day of the week. Because first day of the week will be different in different parts of the world. |
+| carbon.enable.tablestatus.backup | false | In cloud object store scenario, overwriting table status file is not an atomic operation since it uses rename API. Thus, it is possible that table status is corrupted if process crashed when overwriting the table status file. To protect from file corruption, user can enable this property. |
 
 ## Data Loading Configuration
 
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 9fe8498..7b65e0b 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -100,6 +100,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.jmockit</groupId>
+      <artifactId>jmockit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TableStatusBackupTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TableStatusBackupTest.scala
new file mode 100644
index 0000000..c1649b0
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TableStatusBackupTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.IOException
+
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TableStatusBackupTest extends QueryTest with BeforeAndAfterAll {
+  override protected def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP, "true")
+    sql("drop table if exists source")
+    sql("create table source(a string) stored as carbondata")
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("drop table if exists source")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP, "false")
+  }
+
+  test("backup table status file") {
+    sql("insert into source values ('A'), ('B')")
+    val tablePath = CarbonEnv.getCarbonTable(None, "source")(sqlContext.sparkSession).getTablePath
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(tablePath)
+    val oldTableStatus = SegmentStatusManager.readTableStatusFile(tableStatusFilePath)
+
+    var mock = new MockUp[SegmentStatusManager]() {
+      @Mock
+      @throws[IOException]
+      def mockForTest(): Unit = {
+        throw new IOException("thrown in mock")
+      }
+    }
+
+    val exception = intercept[IOException] {
+      sql("insert into source values ('A'), ('B')")
+    }
+    assert(exception.getMessage.contains("thrown in mock"))
+    val backupPath = tableStatusFilePath + ".backup"
+    assert(FileFactory.isFileExist(backupPath))
+    val backupTableStatus = SegmentStatusManager.readTableStatusFile(backupPath)
+    assertResult(oldTableStatus)(backupTableStatus)
+
+    mock = new MockUp[SegmentStatusManager]() {
+      @Mock
+      def mockForTest(): Unit = {
+      }
+    }
+  }
+}
\ No newline at end of file