/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.statusmanager;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.ConcurrentOperationException;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DeleteLoadFolders;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
 * Manages Load/Segment status
 */
public class SegmentStatusManager {

  private static final Logger LOG =
      LogServiceFactory.getLogService(SegmentStatusManager.class.getName());

  private AbsoluteTableIdentifier identifier;

  private Configuration configuration;

  private static final int READ_TABLE_STATUS_RETRY_COUNT = CarbonLockUtil
          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK,
                  CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT);

  private static final int READ_TABLE_STATUS_RETRY_TIMEOUT = CarbonLockUtil
          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK,
                  CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT);

  public SegmentStatusManager(AbsoluteTableIdentifier identifier) {
    this.identifier = identifier;
    configuration = FileFactory.getConfiguration();
  }

  public SegmentStatusManager(AbsoluteTableIdentifier identifier, Configuration configuration) {
    this.identifier = identifier;
    this.configuration = configuration;
  }

  /**
   * This will return the lock object used to lock the table status file before updation.
   *
   * @return
   */
  public ICarbonLock getTableStatusLock() {
    return CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
  }

  /**
   * This method will return last modified time of tablestatus file
   */
  public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier)
      throws IOException {
    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
    if (!FileFactory.isFileExist(tableStatusPath)) {
      return 0L;
    } else {
      return FileFactory.getCarbonFile(tableStatusPath)
          .getLastModifiedTime();
    }
  }

  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
    return getValidAndInvalidSegments(false, null, null);
  }

  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable)
      throws IOException {
    return getValidAndInvalidSegments(isChildTable, null, null);
  }

  /**
   * get valid segment for given load status details.
   */
  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable,
      LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope)
      throws IOException {

    // @TODO: move reading LoadStatus file to separate class
    List<Segment> listOfValidSegments = new ArrayList<>(10);
    List<Segment> listOfValidUpdatedSegments = new ArrayList<>(10);
    List<Segment> listOfInvalidSegments = new ArrayList<>(10);
    List<Segment> listOfStreamSegments = new ArrayList<>(10);
    List<Segment> listOfInProgressSegments = new ArrayList<>(10);

    try {
      if (loadMetadataDetails == null) {
        loadMetadataDetails = readTableStatusFile(
            CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
      }

      if (readCommittedScope == null) {
        readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails,
            configuration);
      }
      //just directly iterate Array
      for (LoadMetadataDetails segment : loadMetadataDetails) {
        if (SegmentStatus.SUCCESS == segment.getSegmentStatus()
            || SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()
            || SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus()
            || SegmentStatus.STREAMING == segment.getSegmentStatus()
            || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
          // check for merged loads.
          if (null != segment.getMergedLoadName()) {
            Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile(),
                readCommittedScope, segment);
            if (!listOfValidSegments.contains(seg)) {
              listOfValidSegments.add(seg);
            }
            // if merged load is updated then put it in updated list
            if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
              listOfValidUpdatedSegments.add(seg);
            }
            continue;
          }

          if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {

            listOfValidUpdatedSegments.add(
                new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
          }
          if (SegmentStatus.STREAMING == segment.getSegmentStatus()
              || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
            listOfStreamSegments.add(
                new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
            continue;
          }
          // In case of child table, during loading, if no record is loaded to the segment, then
          // segmentStatus will be marked as 'Success'. During query, don't need to add that segment
          // to validSegment list, as segment does not exists
          if (isChildTable) {
            if (!segment.getDataSize().equalsIgnoreCase("0") && !segment.getIndexSize()
                .equalsIgnoreCase("0")) {
              listOfValidSegments.add(
                  new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
                      segment));
            }
          } else {
            listOfValidSegments.add(
                new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
                    segment));
          }
        } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
            || SegmentStatus.COMPACTED == segment.getSegmentStatus()
            || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
          listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
        } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() ||
            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) {
          listOfInProgressSegments.add(
              new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
        }
      }
    } catch (IOException e) {
      LOG.error(e.getMessage(), e);
      throw e;
    }
    return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
        listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments);
  }

  /**
   * This method reads the load metadata file
   *
   * @param metadataFolderPath
   * @return
   */
  public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) {
    String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
        + CarbonTablePath.TABLE_STATUS_FILE;
    try {
      return readTableStatusFile(metadataFileName);
    } catch (IOException e) {
      return new LoadMetadataDetails[0];
    }
  }

  /**
   * Returns valid segment list for a given RelationIdentifier
   *
   * @param relationIdentifier get list of segments for relation identifier
   * @return list of valid segment id's
   */
  public static List<String> getValidSegmentList(RelationIdentifier relationIdentifier)
      throws IOException {
    List<String> segmentList = new ArrayList<>();
    List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier
        .from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
            relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments();
    for (Segment segment : validSegments) {
      segmentList.add(segment.getSegmentNo());
    }
    return segmentList;
  }

  /**
   * Reads the table status file with the specified UUID if non empty.
   */
  public static LoadMetadataDetails[] readLoadMetadata(String metaDataFolderPath, String uuid)
      throws IOException {
    String tableStatusFileName;
    if (uuid.isEmpty()) {
      tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
          + CarbonTablePath.TABLE_STATUS_FILE;
    } else {
      tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
          + CarbonTablePath.TABLE_STATUS_FILE + CarbonCommonConstants.UNDERSCORE + uuid;
    }
    return readTableStatusFile(tableStatusFileName);
  }

  /**
   * This method reads the load history metadata file
   *
   * @param metadataFolderPath
   * @return
   */
  public static LoadMetadataDetails[] readLoadHistoryMetadata(String metadataFolderPath) {
    String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
        + CarbonTablePath.TABLE_STATUS_HISTORY_FILE;
    try {
      return readTableStatusFile(metadataFileName);
    } catch (IOException e) {
      return new LoadMetadataDetails[0];
    }
  }

  /**
   * Read file and return its content as string
   *
   * @param tableStatusPath path of the table status to read
   * @return file content, null is file does not exist
   * @throws IOException if IO errors
   */
  private static String readFileAsString(String tableStatusPath) throws IOException {
    DataInputStream dataInputStream = null;
    BufferedReader buffReader = null;
    InputStreamReader inStream = null;

    AtomicFileOperations fileOperation =
        AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath);

    if (!FileFactory.isFileExist(tableStatusPath)) {
      return null;
    }

    try {
      dataInputStream = fileOperation.openForRead();
      inStream = new InputStreamReader(dataInputStream, Charset.forName(DEFAULT_CHARSET));
      buffReader = new BufferedReader(inStream);
      return buffReader.readLine();
    } catch (EOFException ex) {
      throw ex;
    } catch (IOException e) {
      LOG.error("Failed to read table status file", e);
      throw e;
    } finally {
      closeStreams(buffReader, inStream, dataInputStream);
    }
  }

  /**
   * Read table status file and decoded to segment meta arrays
   *
   * @param tableStatusPath table status file path
   * @return segment metadata
   * @throws IOException if IO errors
   */
  public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
      throws IOException {
    int retry = READ_TABLE_STATUS_RETRY_COUNT;

    // When storing table status file in object store, reading of table status file may
    // fail (receive IOException or JsonSyntaxException)
    // when table status file is being modifying
    // so here we retry multiple times before
    // throwing IOException or JsonSyntaxException
    while (retry > 0) {
      try {
        String content = readFileAsString(tableStatusPath);
        if (content == null) {
          return new LoadMetadataDetails[0];
        }
        return new Gson().fromJson(content, LoadMetadataDetails[].class);
      } catch (JsonSyntaxException | IOException ex) {
        retry--;
        if (retry == 0) {
          // we have retried several times, throw this exception to make the execution failed
          LOG.error("Failed to read table status file:" + tableStatusPath);
          throw ex;
        }
        try {
          LOG.warn("Failed to read table status file, retrycount:" + retry);
          // sleep for some time before retry
          TimeUnit.SECONDS.sleep(READ_TABLE_STATUS_RETRY_TIMEOUT);
        } catch (InterruptedException e) {
          // ignored
        }
      }
    }
    return null;
  }

  /**
   * This method will get the max segment id
   *
   * @param loadMetadataDetails
   * @return
   */
  private static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
    int newSegmentId = -1;
    for (int i = 0; i < loadMetadataDetails.length; i++) {
      try {
        int loadCount = Integer.parseInt(loadMetadataDetails[i].getLoadName());
        if (newSegmentId < loadCount) {
          newSegmentId = loadCount;
        }
      } catch (NumberFormatException ne) {
        // this case is for compacted folders. For compacted folders Id will be like 0.1, 2.1
        // consider a case when 12 loads are completed and after that major compaction is triggered.
        // In this case new compacted folder will be created with name 12.1 and after query time
        // out all the compacted folders will be deleted and entry will also be removed from the
        // table status file. In that case also if a new load comes the new segment Id assigned
        // should be 13 and not 0
        String loadName = loadMetadataDetails[i].getLoadName();
        if (loadName.contains(".")) {
          int loadCount = Integer.parseInt(loadName.split("\\.")[0]);
          if (newSegmentId < loadCount) {
            newSegmentId = loadCount;
          }
        }
      }
    }
    return newSegmentId;
  }

  /**
   * This method will create new segment id
   *
   * @param loadMetadataDetails
   * @return
   */
  public static int createNewSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
    int newSegmentId = getMaxSegmentId(loadMetadataDetails);
    newSegmentId++;
    return newSegmentId;
  }

  /**
   * compares two given date strings
   *
   * @param loadValue
   * @param userValue
   * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
   * 0 otherwise
   */
  private static Integer compareDateValues(Long loadValue, Long userValue) {
    return loadValue.compareTo(userValue);
  }

  /**
   * updates deletion status
   *
   * @param loadIds
   * @param tableFolderPath
   * @return
   */
  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
      List<String> loadIds, String tableFolderPath) throws Exception {
    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
    ICarbonLock carbonDeleteSegmentLock =
        CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.DELETE_SEGMENT_LOCK);
    ICarbonLock carbonTableStatusLock =
        CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
    String tableDetails =
        carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName();
    List<String> invalidLoadIds = new ArrayList<String>(0);
    try {
      if (carbonDeleteSegmentLock.lockWithRetries()) {
        LOG.info("Delete segment lock has been successfully acquired");

        String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
        if (!FileFactory.isFileExist(dataLoadLocation)) {
          // log error.
          LOG.error("Load metadata file is not present.");
          return loadIds;
        }
        // read existing metadata details in load metadata.
        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
        if (listOfLoadFolderDetailsArray.length != 0) {
          updateDeletionStatus(identifier, loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
          if (invalidLoadIds.isEmpty()) {
            // All or None , if anything fails then don't write
            if (carbonTableStatusLock.lockWithRetries()) {
              LOG.info("Table status lock has been successfully acquired");
              // To handle concurrency scenarios, always take latest metadata before writing
              // into status file.
              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
                  latestLoadMetadataDetails);
              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
            }
            else {
              String errorMsg = "Delete segment by id is failed for " + tableDetails
                  + ". Not able to acquire the table status lock due to other operation running "
                  + "in the background.";
              LOG.error(errorMsg);
              throw new Exception(errorMsg + " Please try after some time.");
            }

          } else {
            return invalidLoadIds;
          }

        } else {
          LOG.error("Delete segment by Id is failed. No matching segment id found.");
          return loadIds;
        }

      } else {
        String errorMsg = "Delete segment by id is failed for " + tableDetails
            + ". Not able to acquire the delete segment lock due to another delete "
            + "operation is running in the background.";
        LOG.error(errorMsg);
        throw new Exception(errorMsg + " Please try after some time.");
      }
    } catch (IOException e) {
      LOG.error("IOException" + e.getMessage(), e);
      throw e;
    } finally {
      CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
    }

    return invalidLoadIds;
  }

  /**
   * updates deletion status
   *
   * @param loadDate
   * @param tableFolderPath
   * @return
   */
  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
      String loadDate, String tableFolderPath, Long loadStartTime) throws Exception {
    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
    ICarbonLock carbonDeleteSegmentLock =
        CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.DELETE_SEGMENT_LOCK);
    ICarbonLock carbonTableStatusLock =
        CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
    String tableDetails =
        carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName();
    List<String> invalidLoadTimestamps = new ArrayList<String>(0);
    try {
      if (carbonDeleteSegmentLock.lockWithRetries()) {
        LOG.info("Delete segment lock has been successfully acquired");

        String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;

        if (!FileFactory.isFileExist(dataLoadLocation)) {
          // Table status file is not present, maybe table is empty, ignore this operation
          LOG.warn("Trying to update table metadata file which is not present.");
          return invalidLoadTimestamps;
        }
        // read existing metadata details in load metadata.
        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
        if (listOfLoadFolderDetailsArray.length != 0) {
          updateDeletionStatus(identifier, loadDate, listOfLoadFolderDetailsArray,
              invalidLoadTimestamps, loadStartTime);
          if (invalidLoadTimestamps.isEmpty()) {
            if (carbonTableStatusLock.lockWithRetries()) {
              LOG.info("Table status lock has been successfully acquired.");
              // To handle concurrency scenarios, always take latest metadata before writing
              // into status file.
              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
                  latestLoadMetadataDetails);
              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
            }
            else {

              String errorMsg = "Delete segment by date is failed for " + tableDetails
                  + ". Not able to acquire the table status lock due to other operation running "
                  + "in the background.";
              LOG.error(errorMsg);
              throw new Exception(errorMsg + " Please try after some time.");

            }
          } else {
            return invalidLoadTimestamps;
          }

        } else {
          LOG.error("Delete segment by date is failed. No matching segment found.");
          invalidLoadTimestamps.add(loadDate);
          return invalidLoadTimestamps;
        }

      } else {
        String errorMsg = "Delete segment by date is failed for " + tableDetails
            + ". Not able to acquire the delete segment lock due to another delete "
            + "operation is running in the background.";
        LOG.error(errorMsg);
        throw new Exception(errorMsg + " Please try after some time.");
      }
    } catch (IOException e) {
      LOG.error("Error message: " + "IOException" + e.getMessage(), e);
      throw e;
    } finally {
      CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
    }

    return invalidLoadTimestamps;
  }

  /**
   * Backup the table status file as 'tablestatus.backup' in the same path
   *
   * @param tableStatusPath table status file path
   */
  private static void backupTableStatus(String tableStatusPath) throws IOException {
    CarbonFile file = FileFactory.getCarbonFile(tableStatusPath);
    if (file.exists()) {
      String backupPath = tableStatusPath + ".backup";
      String currentContent = readFileAsString(tableStatusPath);
      if (currentContent != null) {
        writeStringIntoFile(backupPath, currentContent);
      }
    }
  }

  /**
   * writes load details to specified path
   *
   * @param tableStatusPath path of the table status file
   * @param listOfLoadFolderDetailsArray segment metadata
   * @throws IOException if IO errors
   */
  public static void writeLoadDetailsIntoFile(
      String tableStatusPath,
      LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
    // When overwriting table status file, if process crashed, table status file
    // will be in corrupted state. This can happen in an unstable environment,
    // like in the cloud. To prevent the table corruption, user can enable following
    // property to enable backup of the table status before overwriting it.
    if (tableStatusPath.endsWith(CarbonTablePath.TABLE_STATUS_FILE) &&
        CarbonProperties.isEnableTableStatusBackup()) {
      backupTableStatus(tableStatusPath);
    }
    String content = new Gson().toJson(listOfLoadFolderDetailsArray);
    mockForTest();
    // make the table status file smaller by removing fields that are default value
    for (LoadMetadataDetails loadMetadataDetails : listOfLoadFolderDetailsArray) {
      loadMetadataDetails.removeUnnecessaryField();
    }
    // If process crashed during following write, table status file need to be
    // manually recovered.
    writeStringIntoFile(FileFactory.getUpdatedFilePath(tableStatusPath), content);
  }

  // a dummy func for mocking in testcase, which simulates IOException
  private static void mockForTest() {
  }

  /**
   * writes string content to specified path
   *
   * @param filePath path of the file to write
   * @param content content to write
   * @throws IOException if IO errors
   */
  private static void writeStringIntoFile(String filePath, String content) throws IOException {
    AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(filePath);
    BufferedWriter brWriter = null;
    DataOutputStream dataOutputStream = null;
    try {
      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
      brWriter = new BufferedWriter(new OutputStreamWriter(
          dataOutputStream, Charset.forName(DEFAULT_CHARSET)));
      brWriter.write(content);
    } catch (IOException ioe) {
      LOG.error("Write file failed: " + ioe.getLocalizedMessage());
      fileWrite.setFailed();
      throw ioe;
    } finally {
      CarbonUtil.closeStreams(brWriter);
      fileWrite.close();
    }
  }

  /**
   * updates deletion status details for each load and returns invalidLoadIds
   *
   * @param loadIds
   * @param listOfLoadFolderDetailsArray
   * @param invalidLoadIds
   * @return invalidLoadIds
   */
  private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
      List<String> loadIds, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
      List<String> invalidLoadIds) {
    SegmentStatus segmentStatus = null;
    for (String loadId : loadIds) {
      boolean loadFound = false;
      // For each load id loop through data and if the
      // load id is found then mark
      // the metadata as deleted.
      for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {

        if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
          segmentStatus = loadMetadata.getSegmentStatus();
          if (SegmentStatus.COMPACTED == segmentStatus) {
            // if the segment is compacted then no need to delete that.
            LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
            invalidLoadIds.add(loadId);
            return invalidLoadIds;
          } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus
              && isLoadInProgress(absoluteTableIdentifier, loadId)) {
            // if the segment status is in progress then no need to delete that.
            LOG.error("Cannot delete the segment " + loadId + " which is load in progress");
            invalidLoadIds.add(loadId);
            return invalidLoadIds;
          } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus
              && isLoadInProgress(absoluteTableIdentifier, loadId)) {
            // if the segment status is overwrite in progress, then no need to delete that.
            LOG.error("Cannot delete the segment " + loadId + " which is load overwrite " +
                    "in progress");
            invalidLoadIds.add(loadId);
            return invalidLoadIds;
          } else if (SegmentStatus.STREAMING == segmentStatus) {
            // if the segment status is streaming, the segment can't be deleted directly.
            LOG.error("Cannot delete the segment " + loadId + " which is streaming in progress");
            invalidLoadIds.add(loadId);
            return invalidLoadIds;
          } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) {
            loadFound = true;
            loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
            loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
            LOG.info("Segment ID " + loadId + " Marked for Delete");
          }
          break;
        }
      }

      if (!loadFound) {
        LOG.error("Delete segment by ID is failed. No matching segment id found :" + loadId);
        invalidLoadIds.add(loadId);
        return invalidLoadIds;
      }
    }
    return invalidLoadIds;
  }

  /**
   * updates deletion status details for load and returns invalidLoadTimestamps
   *
   * @param loadDate
   * @param listOfLoadFolderDetailsArray
   * @param invalidLoadTimestamps
   * @return invalidLoadTimestamps
   */
  private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
      String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
      List<String> invalidLoadTimestamps, Long loadStartTime) {
    // For each load timestamp loop through data and if the
    // required load timestamp is found then mark
    // the metadata as deleted.
    boolean loadFound = false;
    String loadStartTimeString = "Load Start Time: ";
    SegmentStatus segmentStatus = null;
    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
      Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
      if (result < 0) {
        segmentStatus = loadMetadata.getSegmentStatus();
        if (SegmentStatus.COMPACTED == segmentStatus) {
          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
              + "as the segment has been compacted.");
        } else if (SegmentStatus.STREAMING == segmentStatus) {
          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
              + "as the segment is streaming in progress.");
        } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus && isLoadInProgress(
            absoluteTableIdentifier, loadMetadata.getLoadName())) {
          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
              + "as the segment is insert in progress.");
        } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus
            && isLoadInProgress(absoluteTableIdentifier, loadMetadata.getLoadName())) {
          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
              + "as the segment is insert overwrite in progress.");
        } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) {
          loadFound = true;
          updateSegmentMetadataDetails(loadMetadata);
          LOG.info("Info: " + loadStartTimeString + loadMetadata.getLoadStartTime()
              + " Marked for Delete");
        }
      }
    }

    if (!loadFound) {
      invalidLoadTimestamps.add(loadDate);
      LOG.error("Delete segment by date is failed. No matching segment found.");
      return invalidLoadTimestamps;
    }
    return invalidLoadTimestamps;
  }

  /**
   * This method closes the streams
   *
   * @param streams - streams to close.
   */
  private static void closeStreams(Closeable... streams) {
    // Added if to avoid NullPointerException in case one stream is being passed as null
    if (null != streams) {
      for (Closeable stream : streams) {
        if (null != stream) {
          try {
            stream.close();
          } catch (IOException e) {
            LOG.error("Error while closing stream" + stream);
          }
        }
      }
    }
  }

  /**
   * updates table status details using latest metadata
   *
   * @param oldMetadata
   * @param newMetadata
   * @return
   */
  private static List<LoadMetadataDetails> updateLatestTableStatusDetails(
      LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {

    List<LoadMetadataDetails> newListMetadata =
        new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata));
    for (LoadMetadataDetails oldSegment : oldMetadata) {
      if (SegmentStatus.MARKED_FOR_DELETE == oldSegment.getSegmentStatus()) {
        updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment)));
      }
    }
    return newListMetadata;
  }

  /**
   * updates segment status and modificaton time details
   *
   * @param loadMetadata
   */
  private static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
    // update status only if the segment is not marked for delete
    if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
      loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
      loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
    }
  }

  /**
   * This API will return the update status file name.
   * @param segmentList
   * @return
   */
  public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
    if (segmentList.length == 0) {
      return "";
    }
    else {
      for (LoadMetadataDetails eachSeg : segmentList) {
        // file name stored in 0th segment.
        if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
          return eachSeg.getUpdateStatusFileName();
        }
      }
    }
    return "";
  }

  public static class ValidAndInvalidSegmentsInfo {
    private final List<Segment> listOfValidSegments;
    private final List<Segment> listOfValidUpdatedSegments;
    private final List<Segment> listOfInvalidSegments;
    private final List<Segment> listOfStreamSegments;
    private final List<Segment> listOfInProgressSegments;

    private ValidAndInvalidSegmentsInfo(List<Segment> listOfValidSegments,
        List<Segment> listOfValidUpdatedSegments, List<Segment> listOfInvalidUpdatedSegments,
        List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments) {
      this.listOfValidSegments = listOfValidSegments;
      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
      this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
      this.listOfStreamSegments = listOfStreamSegments;
      this.listOfInProgressSegments = listOfInProgressSegments;
    }

    public List<Segment> getInvalidSegments() {
      return listOfInvalidSegments;
    }

    public List<Segment> getValidSegments() {
      return listOfValidSegments;
    }

    public List<Segment> getStreamSegments() {
      return listOfStreamSegments;
    }

    public List<Segment> getListOfInProgressSegments() {
      return listOfInProgressSegments;
    }
  }

  /**
   * Return true if any load or insert overwrite is in progress for specified table
   */
  public static Boolean isLoadInProgressInTable(CarbonTable carbonTable) {
    if (carbonTable == null) {
      return false;
    }
    boolean loadInProgress = false;
    String metaPath = carbonTable.getMetadataPath();
    LoadMetadataDetails[] listOfLoadFolderDetailsArray =
              SegmentStatusManager.readLoadMetadata(metaPath);
    if (listOfLoadFolderDetailsArray.length != 0) {
      for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) {
        SegmentStatus segmentStatus = loaddetail.getSegmentStatus();
        if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS
            || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
          loadInProgress =
              isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
                  loaddetail.getLoadName());
        }
      }
    }
    return loadInProgress;
  }

  /**
   * Return true if the compaction is in progress for the table
   * @param carbonTable
   * @return
   */
  public static Boolean isCompactionInProgress(CarbonTable carbonTable) {
    if (carbonTable == null) {
      return false;
    }
    boolean compactionInProgress;
    ICarbonLock lock = CarbonLockFactory
        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.COMPACTION_LOCK);
    try {
      compactionInProgress = !lock.lockWithRetries(1, 0);
    } finally {
      lock.unlock();
    }
    return compactionInProgress;
  }

  /**
   * Return true if insert overwrite is in progress for specified table
   */
  public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) {
    if (carbonTable == null) {
      return false;
    }
    boolean loadInProgress = false;
    String metaPath = carbonTable.getMetadataPath();
    LoadMetadataDetails[] listOfLoadFolderDetailsArray =
        SegmentStatusManager.readLoadMetadata(metaPath);
    if (listOfLoadFolderDetailsArray.length != 0) {
      for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) {
        SegmentStatus segmentStatus = loaddetail.getSegmentStatus();
        if (segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
          loadInProgress =
              isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
                  loaddetail.getLoadName());
        }
      }
    }
    return loadInProgress;
  }

  /**
   * Return true if the specified `loadName` is in progress, by checking the load lock.
   */
  public static Boolean isLoadInProgress(AbsoluteTableIdentifier absoluteTableIdentifier,
      String loadName) {
    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
        CarbonTablePath.addSegmentPrefix(loadName) + LockUsage.LOCK);
    try {
      return !segmentLock.lockWithRetries(1, 0);
    } finally {
      segmentLock.unlock();
    }
  }

  private static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) {
    if (details != null && details.length > 0) {
      for (LoadMetadataDetails oneRow : details) {
        if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus()
            || SegmentStatus.COMPACTED == oneRow.getSegmentStatus()
            || SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus()
            || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus())
            && oneRow.getVisibility().equalsIgnoreCase("true")) {
          return true;
        }
      }
    }
    return false;
  }

  /**
   * This will update the old table status details before clean files to the latest table status.
   * @param oldList
   * @param newList
   * @return
   */
  private static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {

    List<LoadMetadataDetails> newListMetadata =
        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
    for (LoadMetadataDetails oldSegment : oldList) {
      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
      }
    }
    return newListMetadata;
  }

  private static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());

    DataOutputStream dataOutputStream;
    Gson gsonObjectToWrite = new Gson();
    BufferedWriter brWriter = null;

    AtomicFileOperations writeOperation =
        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);

    try {

      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
          Charset.forName(DEFAULT_CHARSET)));

      // make the table status file smaller by removing fields that are default value
      listOfLoadFolderDetails.forEach(LoadMetadataDetails::removeUnnecessaryField);

      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
      brWriter.write(metadataInstance);
    } catch (IOException ie) {
      LOG.error("Error message: " + ie.getLocalizedMessage());
      writeOperation.setFailed();
      throw ie;
    } finally {
      try {
        if (null != brWriter) {
          brWriter.flush();
        }
      } catch (Exception e) {
        LOG.error("error in  flushing ");

      }
      CarbonUtil.closeStreams(brWriter);
      writeOperation.close();
    }
  }

  private static class ReturnTuple {
    LoadMetadataDetails[] details;
    boolean isUpdateRequired;
    ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) {
      this.details = details;
      this.isUpdateRequired = isUpdateRequired;
    }
  }

  private static ReturnTuple isUpdationRequired(boolean isForceDeletion, CarbonTable carbonTable,
      AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details) {
    // Delete marked loads
    boolean isUpdationRequired = DeleteLoadFolders
        .deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, isForceDeletion, details,
            carbonTable.getMetadataPath());
    return new ReturnTuple(details, isUpdationRequired);
  }

  public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean isForceDeletion,
      List<PartitionSpec> partitionSpecs) throws IOException {
    LoadMetadataDetails[] metadataDetails =
        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
    // delete the expired segment lock files
    CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable);
    if (isLoadDeletionRequired(metadataDetails)) {
      AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
      boolean updationCompletionStatus = false;
      LoadMetadataDetails[] newAddedLoadHistoryList = null;
      ReturnTuple tuple =
          isUpdationRequired(isForceDeletion, carbonTable, identifier, metadataDetails);
      if (tuple.isUpdateRequired) {
        ICarbonLock carbonTableStatusLock =
            CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
        boolean locked = false;
        try {
          // Update load metadate file after cleaning deleted nodes
          locked = carbonTableStatusLock.lockWithRetries();
          if (locked) {
            LOG.info("Table status lock has been successfully acquired.");
            // Again read status and check to verify updation required or not.
            LoadMetadataDetails[] details =
                SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
            ReturnTuple tuple2 =
                isUpdationRequired(isForceDeletion, carbonTable, identifier, details);
            if (!tuple2.isUpdateRequired) {
              return;
            }
            // read latest table status again.
            LoadMetadataDetails[] latestMetadata =
                SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());

            int invisibleSegmentPreserveCnt =
                CarbonProperties.getInstance().getInvisibleSegmentPreserveCount();
            int maxSegmentId = SegmentStatusManager.getMaxSegmentId(tuple2.details);
            int invisibleSegmentCnt = SegmentStatusManager.countInvisibleSegments(
                tuple2.details, maxSegmentId);
            // if execute command 'clean files' or the number of invisible segment info
            // exceeds the value of 'carbon.invisible.segments.preserve.count',
            // it need to append the invisible segment list to 'tablestatus.history' file.
            if (isForceDeletion || (invisibleSegmentCnt > invisibleSegmentPreserveCnt)) {
              TableStatusReturnTuple tableStatusReturn = separateVisibleAndInvisibleSegments(
                  tuple2.details, latestMetadata, invisibleSegmentCnt, maxSegmentId);
              LoadMetadataDetails[] oldLoadHistoryList = readLoadHistoryMetadata(
                  carbonTable.getMetadataPath());
              LoadMetadataDetails[] newLoadHistoryList = appendLoadHistoryList(
                  oldLoadHistoryList, tableStatusReturn.arrayOfLoadHistoryDetails);
              writeLoadDetailsIntoFile(
                  CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
                  tableStatusReturn.arrayOfLoadDetails);
              writeLoadDetailsIntoFile(
                  CarbonTablePath.getTableStatusHistoryFilePath(carbonTable.getTablePath()),
                  newLoadHistoryList);
              // the segments which will be moved to history file need to be deleted
              newAddedLoadHistoryList = tableStatusReturn.arrayOfLoadHistoryDetails;
            } else {
              // update the metadata details from old to new status.
              List<LoadMetadataDetails> latestStatus =
                  updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata);
              writeLoadDetailsIntoFile(
                  CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()),
                  latestStatus.toArray(new LoadMetadataDetails[0]));
            }
            updationCompletionStatus = true;
          } else {
            String dbName = identifier.getCarbonTableIdentifier().getDatabaseName();
            String tableName = identifier.getCarbonTableIdentifier().getTableName();
            String errorMsg = "Clean files request is failed for " +
                dbName + "." + tableName +
                ". Not able to acquire the table status lock due to other operation " +
                "running in the background.";
            LOG.error(errorMsg);
            throw new IOException(errorMsg + " Please try after some time.");
          }
        } finally {
          if (locked) {
            CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
          }
          if (updationCompletionStatus) {
            DeleteLoadFolders
                .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
                    isForceDeletion, partitionSpecs);
          }
        }
      }
    }
  }

  public static void truncateTable(CarbonTable carbonTable)
      throws ConcurrentOperationException, IOException {
    ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
        carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
    boolean locked = false;
    try {
      // Update load metadate file after cleaning deleted nodes
      locked = carbonTableStatusLock.lockWithRetries();
      if (locked) {
        LOG.info("Table status lock has been successfully acquired.");
        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
            SegmentStatusManager.readLoadMetadata(
                CarbonTablePath.getMetadataPath(carbonTable.getTablePath()));
        for (LoadMetadataDetails listOfLoadFolderDetails : listOfLoadFolderDetailsArray) {
          boolean writing;
          switch (listOfLoadFolderDetails.getSegmentStatus()) {
            case INSERT_IN_PROGRESS:
            case INSERT_OVERWRITE_IN_PROGRESS:
            case STREAMING:
              writing = true;
              break;
            default:
              writing = false;
          }
          if (writing) {
            throw new ConcurrentOperationException(carbonTable, "insert", "truncate");
          }
        }
        for (LoadMetadataDetails listOfLoadFolderDetails : listOfLoadFolderDetailsArray) {
          listOfLoadFolderDetails.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
        }
        SegmentStatusManager
            .writeLoadDetailsIntoFile(
                CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
                listOfLoadFolderDetailsArray);
      } else {
        String dbName = carbonTable.getDatabaseName();
        String tableName = carbonTable.getTableName();
        String errorMsg = "truncate table request is failed for " +
            dbName + "." + tableName +
            ". Not able to acquire the table status lock due to other operation " +
            "running in the background.";
        LOG.error(errorMsg);
        throw new IOException(errorMsg + " Please try after some time.");
      }
    } finally {
      if (locked) {
        CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
      }
    }
  }

  /**
   * Get the number of invisible segment info from segment info list.
   */
  public static int countInvisibleSegments(
      LoadMetadataDetails[] segmentList, int maxSegmentId) {
    int invisibleSegmentCnt = 0;
    if (segmentList.length != 0) {
      for (LoadMetadataDetails eachSeg : segmentList) {
        // can not remove segment 0, there are some info will be used later
        // for example: updateStatusFileName
        // also can not remove the max segment id,
        // otherwise will impact the generation of segment id
        if (!eachSeg.getLoadName().equalsIgnoreCase("0")
            && !eachSeg.getLoadName().equalsIgnoreCase(String.valueOf(maxSegmentId))
            && eachSeg.getVisibility().equalsIgnoreCase("false")) {
          invisibleSegmentCnt += 1;
        }
      }
    }
    return invisibleSegmentCnt;
  }

  private static class TableStatusReturnTuple {
    LoadMetadataDetails[] arrayOfLoadDetails;
    LoadMetadataDetails[] arrayOfLoadHistoryDetails;
    TableStatusReturnTuple(LoadMetadataDetails[] arrayOfLoadDetails,
        LoadMetadataDetails[] arrayOfLoadHistoryDetails) {
      this.arrayOfLoadDetails = arrayOfLoadDetails;
      this.arrayOfLoadHistoryDetails = arrayOfLoadHistoryDetails;
    }
  }

  /**
   * Separate visible and invisible segments into two array.
   */
  public static TableStatusReturnTuple separateVisibleAndInvisibleSegments(
      LoadMetadataDetails[] oldList,
      LoadMetadataDetails[] newList,
      int invisibleSegmentCnt,
      int maxSegmentId) {
    int newSegmentsLength = newList.length;
    int visibleSegmentCnt = newSegmentsLength - invisibleSegmentCnt;
    LoadMetadataDetails[] arrayOfVisibleSegments = new LoadMetadataDetails[visibleSegmentCnt];
    LoadMetadataDetails[] arrayOfInvisibleSegments = new LoadMetadataDetails[invisibleSegmentCnt];
    int oldSegmentsLength = oldList.length;
    int visibleIdx = 0;
    int invisibleIdx = 0;
    for (int i = 0; i < newSegmentsLength; i++) {
      LoadMetadataDetails newSegment = newList[i];
      if (i < oldSegmentsLength) {
        LoadMetadataDetails oldSegment = oldList[i];
        if (newSegment.getLoadName().equalsIgnoreCase("0")
            || newSegment.getLoadName().equalsIgnoreCase(String.valueOf(maxSegmentId))) {
          newSegment.setVisibility(oldSegment.getVisibility());
          arrayOfVisibleSegments[visibleIdx] = newSegment;
          visibleIdx++;
        } else if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
          newSegment.setVisibility("false");
          arrayOfInvisibleSegments[invisibleIdx] = newSegment;
          invisibleIdx++;
        } else {
          arrayOfVisibleSegments[visibleIdx] = newSegment;
          visibleIdx++;
        }
      } else {
        arrayOfVisibleSegments[visibleIdx] = newSegment;
        visibleIdx++;
      }
    }
    return new TableStatusReturnTuple(arrayOfVisibleSegments, arrayOfInvisibleSegments);
  }

  /**
   * Return an array containing all invisible segment entries in appendList and historyList.
   */
  public static LoadMetadataDetails[] appendLoadHistoryList(
      LoadMetadataDetails[] historyList,
      LoadMetadataDetails[] appendList) {
    int historyListLen = historyList.length;
    int appendListLen = appendList.length;
    int newListLen = historyListLen + appendListLen;
    LoadMetadataDetails[] newList = new LoadMetadataDetails[newListLen];
    int newListIdx = 0;
    for (int i = 0; i < historyListLen; i++) {
      newList[newListIdx] = historyList[i];
      newListIdx++;
    }
    for (int i = 0; i < appendListLen; i++) {
      newList[newListIdx] = appendList[i];
      newListIdx++;
    }
    return newList;
  }

  /*
   * This method reads the load metadata file and returns Carbon segments only
   */
  public static LoadMetadataDetails[] readCarbonMetaData(String metadataFolderPath) {
    String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
        + CarbonTablePath.TABLE_STATUS_FILE;
    try {
      LoadMetadataDetails[] allSegments = readTableStatusFile(metadataFileName);
      List<LoadMetadataDetails> carbonSegments = new ArrayList<>();
      for (LoadMetadataDetails currSegment : allSegments) {
        if (currSegment.isCarbonFormat()) {
          carbonSegments.add(currSegment);
        }
      }
      return carbonSegments.toArray(new LoadMetadataDetails[carbonSegments.size()]);
    } catch (IOException e) {
      return new LoadMetadataDetails[0];
    }
  }
}
