/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.statusmanager;

import java.io.*;
import java.util.*;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

/**
 * Manages Segment & block status of carbon table for Delete operation
 */
public class SegmentUpdateStatusManager {

  /**
   * logger
   */
  private static final Logger LOG =
      LogServiceFactory.getLogService(SegmentUpdateStatusManager.class.getName());

  private final AbsoluteTableIdentifier identifier;
  private LoadMetadataDetails[] segmentDetails;
  private SegmentUpdateDetails[] updateDetails;
  private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
  private boolean isPartitionTable;
  /**
   * It contains the mapping of segment path and corresponding delete delta file paths,
   * avoiding listing these files for every query
   */
  private Map<String, List<String>> segmentDeleteDeltaListMap = new HashMap<>();

  public SegmentUpdateStatusManager(CarbonTable table,
      LoadMetadataDetails[] segmentDetails) {
    this(table, segmentDetails, null);
  }

  /**
   * It takes the updateVersion as one of the parameter. Basically user can give on which
   * updateVersion user can retrieve the data.It is useful to get the history changed data
   * of a particular version.
   */
  public SegmentUpdateStatusManager(CarbonTable table,
      LoadMetadataDetails[] segmentDetails, String updateVersion) {
    this.identifier = table.getAbsoluteTableIdentifier();
    this.isPartitionTable = table.isHivePartitionTable();
    // current it is used only for read function scenarios, as file update always requires to work
    // on latest file status.
    this.segmentDetails = segmentDetails;
    updateDetails = readLoadMetadata();
    updateUpdateDetails(updateVersion);
    populateMap();
  }

  public SegmentUpdateStatusManager(CarbonTable table) {
    this(table, (String) null);
  }

  public SegmentUpdateStatusManager(CarbonTable table, String updateVersion) {
    this.identifier = table.getAbsoluteTableIdentifier();
    // current it is used only for read function scenarios, as file update always requires to work
    // on latest file status.
    if (!table.getTableInfo().isTransactionalTable()) {
      // fileExist is costly operation, so check based on table Type
      segmentDetails = new LoadMetadataDetails[0];
    } else {
      segmentDetails = SegmentStatusManager.readLoadMetadata(
          CarbonTablePath.getMetadataPath(identifier.getTablePath()));
    }
    this.isPartitionTable = table.isHivePartitionTable();
    if (segmentDetails.length != 0) {
      updateDetails = readLoadMetadata();
    } else {
      updateDetails = new SegmentUpdateDetails[0];
    }
    updateUpdateDetails(updateVersion);
    populateMap();
  }

  /**
   * It adds only the SegmentUpdateDetails of given updateVersion, it is used to get the history
   * data of updated/deleted data.
   */
  private void updateUpdateDetails(String updateVersion) {
    if (updateVersion != null) {
      List<SegmentUpdateDetails> newUpdateDetails = new ArrayList<>();
      for (SegmentUpdateDetails updateDetail : updateDetails) {
        if (updateDetail.getDeltaFileStamps() != null) {
          if (updateDetail.getDeltaFileStamps().contains(updateVersion)) {
            HashSet<String> set = new HashSet<>();
            set.add(updateVersion);
            updateDetail.setDeltaFileStamps(set);
            updateDetail.setSegmentStatus(SegmentStatus.SUCCESS);
            newUpdateDetails.add(updateDetail);
          }
        } else if (updateDetail.getDeleteDeltaStartTimestamp().equalsIgnoreCase(updateVersion)) {
          updateDetail.setSegmentStatus(SegmentStatus.SUCCESS);
          newUpdateDetails.add(updateDetail);
        }
      }
      updateDetails = newUpdateDetails.toArray(new SegmentUpdateDetails[0]);
    }
  }

  /**
   * populate the block and its details in a map.
   */
  private void populateMap() {
    blockAndDetailsMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    for (SegmentUpdateDetails blockDetails : updateDetails) {
      String blockIdentifier = CarbonUpdateUtil
          .getSegmentBlockNameKey(blockDetails.getSegmentName(), blockDetails.getActualBlockName(),
              isPartitionTable);
      blockAndDetailsMap.put(blockIdentifier, blockDetails);
    }
  }

  /**
   *
   * @param segID
   * @param actualBlockName
   * @return null if block is not present in segment update status.
   */
  private SegmentUpdateDetails getDetailsForABlock(String segID, String actualBlockName) {

    String blockIdentifier = CarbonUpdateUtil
        .getSegmentBlockNameKey(segID, actualBlockName, isPartitionTable);

    return blockAndDetailsMap.get(blockIdentifier);

  }

  /**
   *
   * @param key will be like (segmentId/blockName)  0/0-0-5464654654654
   * @return
   */
  public SegmentUpdateDetails getDetailsForABlock(String key) {

    return blockAndDetailsMap.get(key);

  }

  /**
   * Returns the LoadMetadata Details
   * @return
   */
  public LoadMetadataDetails[] getLoadMetadataDetails() {
    return segmentDetails;
  }

  /**
   * Returns the UpdateStatus Details.
   * @return
   */
  public SegmentUpdateDetails[] getUpdateStatusDetails() {
    return updateDetails;
  }

  /**
   *
   * @param segmentUpdateDetails
   */
  public void setUpdateStatusDetails(SegmentUpdateDetails[] segmentUpdateDetails) {
    this.updateDetails = segmentUpdateDetails;
  }

  /**
   * This will return the lock object used to lock the table update status file before updating.
   *
   * @return
   */
  public ICarbonLock getTableUpdateStatusLock() {
    return CarbonLockFactory.getCarbonLockObj(identifier,
        LockUsage.TABLE_UPDATE_STATUS_LOCK);
  }

  /**
   * Returns all update delta files of specified Segment.
   *
   * @param segmentId
   * @return
   * @throws Exception
   */
  public List<String> getUpdateDeltaFiles(final String segmentId) {
    List<String> updatedDeltaFilesList =
        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    String endTimeStamp = "";
    String startTimeStamp = "";
    String segmentPath = CarbonTablePath.getSegmentPath(
        identifier.getTablePath(), segmentId);
    CarbonFile segDir =
        FileFactory.getCarbonFile(segmentPath);
    for (LoadMetadataDetails eachSeg : segmentDetails) {
      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
        // if the segment is found then take the start and end time stamp.
        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
      }
    }
    // if start timestamp is empty then no update delta is found. so return empty list.
    if (startTimeStamp.isEmpty()) {
      return updatedDeltaFilesList;
    }
    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);

    // else scan the segment for the delta files with the respective timestamp.
    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {

      @Override
      public boolean accept(CarbonFile pathName) {
        String fileName = pathName.getName();
        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
          String firstPart = fileName.substring(0, fileName.indexOf('.'));

          long timestamp = Long.parseLong(firstPart
              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
                  firstPart.length()));
          if (Long.compare(timestamp, endTimeStampFinal) <= 0
              && Long.compare(timestamp, startTimeStampFinal) >= 0) {

            // if marked for delete then it is invalid.
            if (!isBlockValid(segmentId, fileName)) {
              return false;
            }

            return true;
          }
        }
        return false;
      }
    });

    for (CarbonFile file : files) {
      updatedDeltaFilesList.add(file.getCanonicalPath());
    }

    return updatedDeltaFilesList;
  }

  /**
   * Below method will be used to get all the delete delta files based on block name
   *
   * @param blockFilePath actual block filePath
   * @return all delete delta files
   */
  public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) {
    return getDeltaFiles(blockFilePath, segmentId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
        .toArray(new String[0]);
  }

  /**
   * Returns all delta file paths of specified block
   */
  private List<String> getDeltaFiles(String blockPath, String segment, String extension) {
    Path path = new Path(blockPath);
    String completeBlockName = path.getName();
    String blockNameWithoutExtension =
        completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
    //blockName without timestamp
    final String blockNameFromTuple =
        blockNameWithoutExtension.substring(0, blockNameWithoutExtension.lastIndexOf("-"));
    return getDeltaFiles(path.getParent().toString(), blockNameFromTuple, extension, segment);
  }

  /**
   * This method returns the list of Blocks associated with the segment
   * from the SegmentUpdateDetails List.
   * @param segmentName
   * @return
   */
  public List<String> getBlockNameFromSegment(String segmentName) {
    List<String> blockNames = new ArrayList<String>();
    for (SegmentUpdateDetails block : updateDetails) {
      if (block.getSegmentName().equalsIgnoreCase(segmentName) && !CarbonUpdateUtil
          .isBlockInvalid(block.getSegmentStatus())) {
        blockNames.add(block.getBlockName());
      }
    }
    return blockNames;
  }

  /**
   * check the block whether is valid
   *
   * @param segName segment name
   * @param blockName  block name
   * @return the status of block whether is valid
   */
  public boolean isBlockValid(String segName, String blockName) {

    SegmentUpdateDetails details = getDetailsForABlock(segName, blockName);

    return details == null || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus());

  }

  /**
   * Returns all delta file paths of specified block
   *
   * @param blockDir block directory with CarbonFile format
   * @param blockNameFromTuple block name from tuple
   * @param extension the file extension name
   * @param segment the segment name
   * @return the list of delete file
   */
  private List<String> getDeltaFiles(String blockDir, final String blockNameFromTuple,
      final String extension, String segment) {
    List<String> deleteFileList = new ArrayList<>();
    for (SegmentUpdateDetails block : updateDetails) {
      if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
          .equalsIgnoreCase(segment) && !CarbonUpdateUtil
          .isBlockInvalid(block.getSegmentStatus())) {
        final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);
        // If there is no delete delete file , then return null
        if (deltaStartTimestamp == 0) {
          return deleteFileList;
        }
        final long deltaEndTimeStamp = getEndTimeOfDeltaFile(extension, block);
        // If start and end time is same then it has only one delta file so construct the file
        // directly with available information with out listing
        if (block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp())) {
          deleteFileList.add(
              new StringBuilder(blockDir).append(CarbonCommonConstants.FILE_SEPARATOR)
                  .append(block.getBlockName()).append("-")
                  .append(block.getDeleteDeltaStartTimestamp()).append(extension).toString());
          // If delta timestamp list has data then it has multiple delta file so construct the file
          // directly with list of deltas with out listing
        } else if (block.getDeltaFileStamps() != null && block.getDeltaFileStamps().size() > 0) {
          for (String delta : block.getDeltaFileStamps()) {
            deleteFileList.add(
                new StringBuilder(blockDir).append(CarbonCommonConstants.FILE_SEPARATOR)
                    .append(block.getBlockName()).append("-").append(delta).append(extension)
                    .toString());
          }
        } else {
          // It is for backward compatibility.It lists the files.
          return getFilePaths(blockDir, blockNameFromTuple, extension, deleteFileList,
              deltaStartTimestamp, deltaEndTimeStamp);
        }
      }
    }
    return deleteFileList;
  }

  private List<String> getFilePaths(String blockDir, final String blockNameFromTuple,
      final String extension, List<String> deleteFileList, final long deltaStartTimestamp,
      final long deltaEndTimeStamp) {
    List<String> deltaList = segmentDeleteDeltaListMap.get(blockDir);
    if (deltaList == null) {
      CarbonFile[] files = FileFactory.getCarbonFile(blockDir).listFiles(new CarbonFileFilter() {
        @Override
        public boolean accept(CarbonFile pathName) {
          String fileName = pathName.getName();
          if (fileName.endsWith(extension) && pathName.getSize() > 0) {
            return true;
          }
          return false;
        }
      });
      deltaList = new ArrayList<>(files.length);
      for (CarbonFile file : files) {
        deltaList.add(file.getCanonicalPath());
      }
      segmentDeleteDeltaListMap.put(blockDir, deltaList);
    }
    for (String deltaFile : deltaList) {
      String deltaFilePathName = new Path(deltaFile).getName();
      String firstPart = deltaFilePathName.substring(0, deltaFilePathName.lastIndexOf('.'));
      String blockName =
          firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
      long timestamp =
          Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(deltaFilePathName));
      // It compares whether this delta file belongs to this block or not. And also checks that
      // corresponding delta file is valid or not by considering its load start and end time with
      // the file timestamp.
      if (blockNameFromTuple.equals(blockName) && ((Long.compare(timestamp, deltaEndTimeStamp) <= 0)
          && (Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
        if (null == deleteFileList) {
          deleteFileList = new ArrayList<String>();
        }
        deleteFileList.add(deltaFile);
      }
    }
    return deleteFileList;
  }

  /**
   * Return all delta file for a block.
   * @param segmentId
   * @param blockName
   * @return
   */
  public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
    String segmentPath = CarbonTablePath.getSegmentPath(
        identifier.getTablePath(), segmentId.getSegmentNo());
    CarbonFile segDir =
        FileFactory.getCarbonFile(segmentPath);
    for (SegmentUpdateDetails block : updateDetails) {
      if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
          (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
          && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
        final long deltaStartTimestamp =
            getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
        final long deltaEndTimeStamp =
            getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);

        return segDir.listFiles(new CarbonFileFilter() {

          @Override
          public boolean accept(CarbonFile pathName) {
            String fileName = pathName.getName();
            if (pathName.getSize() > 0
                && fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
              String blkName = fileName.substring(0, fileName.lastIndexOf("-"));
              long timestamp =
                  Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
              if (blockName.equals(blkName) && (Long.compare(timestamp, deltaEndTimeStamp) <= 0)
                  && (Long.compare(timestamp, deltaStartTimestamp) >= 0)) {
                return true;
              }
            }
            return false;
          }
        });
      }
    }
    return null;
  }

  /**
   * Returns all update delta files of specified Segment.
   *
   * @param loadMetadataDetail metadata details of segment
   * @param validUpdateFiles if true then only the valid range files will be returned.
   * @return
   */
  public CarbonFile[] getUpdateDeltaFilesList(LoadMetadataDetails loadMetadataDetail,
      final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
      CarbonFile[] allFilesOfSegment, boolean isAbortedFile) {

    String endTimeStamp = "";
    String startTimeStamp = "";
    long factTimeStamp = 0;

    // if the segment is found then take the start and end time stamp.
    startTimeStamp = loadMetadataDetail.getUpdateDeltaStartTimestamp();
    endTimeStamp = loadMetadataDetail.getUpdateDeltaEndTimestamp();
    factTimeStamp = loadMetadataDetail.getLoadStartTime();

    // if start timestamp is empty then no update delta is found. so return empty list.
    if (startTimeStamp.isEmpty()) {
      return new CarbonFile[0];
    }

    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
    final long factTimeStampFinal = factTimeStamp;

    List<CarbonFile> listOfCarbonFiles =
        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

    // else scan the segment for the delta files with the respective timestamp.

    for (CarbonFile eachFile : allFilesOfSegment) {

      String fileName = eachFile.getName();
      if (fileName.endsWith(fileExtension)) {
        long timestamp =
            Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));

        if (excludeOriginalFact) {
          if (Long.compare(factTimeStampFinal, timestamp) == 0) {
            continue;
          }
        }

        if (validUpdateFiles) {
          if (Long.compare(timestamp, endTimeStampFinal) <= 0
              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
            listOfCarbonFiles.add(eachFile);
          }
        } else {
          // invalid cases.
          if (isAbortedFile) {
            if (Long.compare(timestamp, endTimeStampFinal) > 0) {
              listOfCarbonFiles.add(eachFile);
            }
          } else if (Long.compare(timestamp, startTimeStampFinal) < 0
              || Long.compare(timestamp, endTimeStampFinal) > 0) {
            listOfCarbonFiles.add(eachFile);
          }
        }
      }
    }

    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
  }

  /**
   * Returns all update delta files of specified Segment.
   *
   * @param segmentId
   * @param validUpdateFiles
   * @param fileExtension
   * @param excludeOriginalFact
   * @param allFilesOfSegment
   * @return
   */
  public CarbonFile[] getUpdateDeltaFilesForSegment(String segmentId,
      final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
      CarbonFile[] allFilesOfSegment) {

    String endTimeStamp = "";
    String startTimeStamp = "";
    long factTimeStamp = 0;

    for (LoadMetadataDetails eachSeg : segmentDetails) {
      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
        // if the segment is found then take the start and end time stamp.
        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
        factTimeStamp = eachSeg.getLoadStartTime();
      }
    }

    // if start timestamp is empty then no update delta is found. so return empty list.
    if (startTimeStamp.isEmpty()) {
      return new CarbonFile[0];
    }

    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
    final long factTimeStampFinal = factTimeStamp;

    List<CarbonFile> listOfCarbonFiles =
        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

    // else scan the segment for the delta files with the respective timestamp.

    for (CarbonFile eachFile : allFilesOfSegment) {

      String fileName = eachFile.getName();
      if (fileName.endsWith(fileExtension)) {
        String firstPart = fileName.substring(0, fileName.indexOf('.'));

        long timestamp = Long.parseLong(firstPart
            .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
                firstPart.length()));

        if (excludeOriginalFact) {
          if (Long.compare(factTimeStampFinal, timestamp) == 0) {
            continue;
          }
        }

        if (validUpdateFiles) {
          if (Long.compare(timestamp, endTimeStampFinal) <= 0
              && Long.compare(timestamp, startTimeStampFinal) >= 0) {

            boolean validBlock = true;

            for (SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
              if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
                  && CarbonUpdateUtil.isBlockInvalid(blockDetails.getSegmentStatus())) {
                validBlock = false;
              }
            }

            if (validBlock) {
              listOfCarbonFiles.add(eachFile);
            }

          }
        } else {
          // invalid cases.
          if (Long.compare(timestamp, startTimeStampFinal) < 0) {
            listOfCarbonFiles.add(eachFile);
          }
        }
      }
    }

    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
  }

  /**
   *
   * @param extension
   * @param block
   * @return
   */
  private long getStartTimeOfDeltaFile(String extension, SegmentUpdateDetails block) {
    long startTimestamp;
    switch (extension) {
      case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
        startTimestamp = block.getDeleteDeltaStartTimeAsLong();
        break;
      default:
        startTimestamp = 0;
    }
    return startTimestamp;
  }

  /**
   *
   * @param extension
   * @param block
   * @return
   */
  private long getEndTimeOfDeltaFile(String extension, SegmentUpdateDetails block) {
    long endTimestamp;
    switch (extension) {
      case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
        endTimestamp = block.getDeleteDeltaEndTimeAsLong();
        break;
      default: endTimestamp = 0;
    }
    return endTimestamp;
  }

  /**
   * This method loads segment update details
   *
   * @return
   */
  public SegmentUpdateDetails[] readLoadMetadata() {
    // get the updated status file identifier from the table status.
    String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
    return readLoadMetadata(tableUpdateStatusIdentifier, identifier.getTablePath());
  }

  /**
   * This method loads segment update details
   *
   * @return
   */
  public static SegmentUpdateDetails[] readLoadMetadata(String tableUpdateStatusIdentifier,
      String tablePath) {
    Gson gsonObjectToRead = new Gson();
    DataInputStream dataInputStream = null;
    BufferedReader buffReader = null;
    InputStreamReader inStream = null;
    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray;

    if (StringUtils.isEmpty(tableUpdateStatusIdentifier)) {
      return new SegmentUpdateDetails[0];
    }

    String tableUpdateStatusPath =
        CarbonTablePath.getMetadataPath(tablePath) +
            CarbonCommonConstants.FILE_SEPARATOR + tableUpdateStatusIdentifier;
    AtomicFileOperations fileOperation =
        AtomicFileOperationFactory.getAtomicFileOperations(tableUpdateStatusPath);

    try {
      if (!FileFactory
          .isFileExist(tableUpdateStatusPath)) {
        return new SegmentUpdateDetails[0];
      }
      dataInputStream = fileOperation.openForRead();
      inStream = new InputStreamReader(dataInputStream,
          CarbonCommonConstants.DEFAULT_CHARSET);
      buffReader = new BufferedReader(inStream);
      listOfSegmentUpdateDetailsArray =
          gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class);
    } catch (IOException e) {
      return new SegmentUpdateDetails[0];
    } finally {
      closeStreams(buffReader, inStream, dataInputStream);
    }

    return listOfSegmentUpdateDetailsArray;
  }

  /**
   * @return updateStatusFileName
   */
  private String getUpdatedStatusIdentifier() {
    if (segmentDetails.length == 0) {
      return null;
    }
    return segmentDetails[0].getUpdateStatusFileName();
  }

  /**
   * writes segment update details into a given file at @param dataLoadLocation
   *
   * @param listOfSegmentUpdateDetailsArray
   * @throws IOException
   */
  public void writeLoadDetailsIntoFile(List<SegmentUpdateDetails> listOfSegmentUpdateDetailsArray,
      String updateStatusFileIdentifier) throws IOException {
    String fileLocation =
        CarbonTablePath.getMetadataPath(identifier.getTablePath())
            + CarbonCommonConstants.FILE_SEPARATOR
            + CarbonUpdateUtil.getUpdateStatusFileName(updateStatusFileIdentifier);

    AtomicFileOperations fileWrite =
        AtomicFileOperationFactory.getAtomicFileOperations(fileLocation);
    BufferedWriter brWriter = null;
    DataOutputStream dataOutputStream = null;
    Gson gsonObjectToWrite = new Gson();
    // write the updated data into the metadata file.

    try {
      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
          CarbonCommonConstants.DEFAULT_CHARSET));

      String metadataInstance = gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray);
      brWriter.write(metadataInstance);
    } catch (IOException ioe) {
      LOG.error("Error message: " + ioe.getLocalizedMessage());
      fileWrite.setFailed();
      throw ioe;
    } finally {
      if (null != brWriter) {
        brWriter.flush();
      }
      CarbonUtil.closeStreams(brWriter);
      fileWrite.close();
    }

  }

  /**
   * This method closes the streams
   *
   * @param streams - streams to close.
   */
  private static void closeStreams(Closeable... streams) {
    // Added if to avoid NullPointerException in case one stream is being passed as null
    if (null != streams) {
      for (Closeable stream : streams) {
        if (null != stream) {
          try {
            stream.close();
          } catch (IOException e) {
            LOG.error("Error while closing stream" + stream);
          }
        }
      }
    }
  }

  /**
   * Returns the invalid timestamp range of a segment.
   */
  public static UpdateVO getInvalidTimestampRange(LoadMetadataDetails loadMetadataDetails) {
    UpdateVO range = new UpdateVO();
    if (loadMetadataDetails != null) {
      range.setSegmentId(loadMetadataDetails.getLoadName());
      range.setFactTimestamp(loadMetadataDetails.getLoadStartTime());
      if (!loadMetadataDetails.getUpdateDeltaStartTimestamp().isEmpty() && !loadMetadataDetails
          .getUpdateDeltaEndTimestamp().isEmpty()) {
        range.setUpdateDeltaStartTimestamp(CarbonUpdateUtil
            .getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaStartTimestamp()));
        range.setLatestUpdateTimestamp(
            CarbonUpdateUtil.getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaEndTimestamp()));
      }
    }
    return range;
  }

  /**
   *
   * @param block
   * @param needCompleteList
   * @return
   */
  public CarbonFile[] getDeleteDeltaInvalidFilesList(
      final SegmentUpdateDetails block, final boolean needCompleteList,
      CarbonFile[] allSegmentFiles, boolean isAbortedFile) {

    final long deltaStartTimestamp =
        getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);

    final long deltaEndTimestamp =
        getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);

    Set<CarbonFile> files =
        new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

    for (CarbonFile eachFile : allSegmentFiles) {
      String fileName = eachFile.getName();
      if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
        String blkName = CarbonTablePath.DataFileUtil.getBlockNameFromDeleteDeltaFile(fileName);

        // complete list of delta files of that block is returned.
        if (needCompleteList && block.getBlockName().equalsIgnoreCase(blkName)) {
          files.add(eachFile);
        }

        // invalid delete delta files only will be returned.
        long timestamp = CarbonUpdateUtil.getTimeStampAsLong(
            CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName));

        if (block.getBlockName().equalsIgnoreCase(blkName)) {

          if (isAbortedFile) {
            if (Long.compare(timestamp, deltaEndTimestamp) > 0) {
              files.add(eachFile);
            }
          } else if (Long.compare(timestamp, deltaStartTimestamp) < 0
              || Long.compare(timestamp, deltaEndTimestamp) > 0) {
            files.add(eachFile);
          }
        }
      }
    }

    return files.toArray(new CarbonFile[files.size()]);
  }
}
