/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.indexstore.blockletindex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.IndexInputSplit;
import org.apache.carbondata.core.index.IndexMeta;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.index.dev.CacheableIndex;
import org.apache.carbondata.core.index.dev.Index;
import org.apache.carbondata.core.index.dev.IndexBuilder;
import org.apache.carbondata.core.index.dev.IndexWriter;
import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndex;
import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndexFactory;
import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.SegmentBlockIndexInfo;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo;
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
import org.apache.carbondata.core.util.BlockletIndexUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.Path;

/**
 * Index for blocklet
 */
public class BlockletIndexFactory extends CoarseGrainIndexFactory
    implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableIndex {

  private static final String NAME = "clustered.btree.blocklet";
  /**
   * variable for cache level BLOCKLET
   */
  public static final String CACHE_LEVEL_BLOCKLET = "BLOCKLET";

  public static final IndexSchema INDEX_SCHEMA =
      new IndexSchema(NAME, BlockletIndexFactory.class.getName());

  private AbsoluteTableIdentifier identifier;

  // segmentId -> list of index file
  private Map<String, SegmentBlockIndexInfo> segmentMap = new ConcurrentHashMap<>();

  private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache;

  public BlockletIndexFactory(CarbonTable carbonTable, IndexSchema indexSchema) {
    super(carbonTable, indexSchema);
    this.identifier = carbonTable.getAbsoluteTableIdentifier();
    cache = CacheProvider.getInstance()
        .createCache(CacheType.DRIVER_BLOCKLET_INDEX);
  }

  /**
   * create index based on cache level
   *
   * @param carbonTable
   * @return
   */
  public static Index createIndex(CarbonTable carbonTable) {
    boolean cacheLevelBlock = BlockletIndexUtil.isCacheLevelBlock(carbonTable);
    if (cacheLevelBlock) {
      // case1: when CACHE_LEVEL = BLOCK
      return new BlockIndex();
    } else {
      // case2: when CACHE_LEVEL = BLOCKLET
      return new BlockletIndex();
    }
  }

  @Override
  public IndexWriter createWriter(Segment segment, String shardName,
      SegmentProperties segmentProperties) {
    throw new UnsupportedOperationException("not implemented");
  }

  @Override
  public IndexBuilder createBuilder(Segment segment, String shardName,
      SegmentProperties segmentProperties) {
    throw new UnsupportedOperationException("not implemented");
  }

  /**
   * Get the indexes for all segments
   */
  public Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments,
      IndexFilter filter) throws IOException {
    return getIndexes(segments, new HashSet<>(), filter);
  }

  /**
   * Get the index for all segments
   */
  public Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments,
      Set<Path> partitionLocations, IndexFilter filter) throws IOException {
    List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
        new ArrayList<>();
    Map<Segment, List<CoarseGrainIndex>> indexMap = new HashMap<>();
    Map<String, Segment> segmentMap = new HashMap<>();
    for (Segment segment : segments) {
      segmentMap.put(segment.getSegmentNo(), segment);
      Set<TableBlockIndexUniqueIdentifier> identifiers =
          getTableBlockIndexUniqueIdentifiers(segment);
      if (!partitionLocations.isEmpty()) {
        // get tableBlockIndexUniqueIdentifierWrappers from segment file info
        getTableBlockUniqueIdentifierWrappers(partitionLocations,
            tableBlockIndexUniqueIdentifierWrappers, identifiers);
      } else {
        SegmentMetaDataInfo segmentMetaDataInfo = segment.getSegmentMetaDataInfo();
        boolean isLoadAllIndex = Boolean.parseBoolean(CarbonProperties.getInstance()
            .getProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
                CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT));
        if (!isLoadAllIndex && null != segmentMetaDataInfo && null != filter && !filter.isEmpty()
            && null != filter.getExpression() && null == FilterUtil
            .getImplicitFilterExpression(filter.getExpression())) {
          getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(segment, segmentMetaDataInfo, filter,
              identifiers, tableBlockIndexUniqueIdentifierWrappers);
        } else {
          for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
            tableBlockIndexUniqueIdentifierWrappers.add(
                new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                    this.getCarbonTable()));
          }
        }
      }
    }
    List<BlockletIndexWrapper> blockletIndexWrappers =
        cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
    for (BlockletIndexWrapper wrapper : blockletIndexWrappers) {
      Segment segment = segmentMap.get(wrapper.getSegmentId());
      List<CoarseGrainIndex> indexes = indexMap.get(segment);
      if (null == indexes) {
        indexes = new ArrayList<CoarseGrainIndex>();
      }
      indexes.addAll(wrapper.getIndexes());
      indexMap.put(segment, indexes);
    }
    return indexMap;
  }

  /**
   * get tableBlockUniqueIdentifierWrappers from segment info. If partitionLocations is defined,
   * then get tableBlockUniqueIdentifierWrappers for the matched partitions.
   */
  private void getTableBlockUniqueIdentifierWrappers(Set<Path> partitionLocations,
      List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers,
      Set<TableBlockIndexUniqueIdentifier> identifiers) {
    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
      if (!partitionLocations.isEmpty()) {
        // add only tableBlockUniqueIdentifier that matches the partition
        // get the indexFile Parent path and compare with the PartitionPath, if matches, then add
        // the corresponding tableBlockIndexUniqueIdentifier for pruning
        if (partitionLocations
            .contains(new Path(tableBlockIndexUniqueIdentifier.getIndexFilePath()))) {
          tableBlockIndexUniqueIdentifierWrappers.add(
              new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                  this.getCarbonTable()));
        }
      } else {
        tableBlockIndexUniqueIdentifierWrappers.add(
            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                this.getCarbonTable()));
      }
    }
  }

  /**
   * Using blockLevel minmax values, identify if segment has to be added for further pruning and to
   * load segment index info to cache
   * @param segment to be identified if needed for loading block indexes
   * @param segmentMetaDataInfo list of block level min max values
   * @param filter filter expression
   * @param identifiers tableBlockIndexUniqueIdentifiers
   * @param tableBlockIndexUniqueIdentifierWrappers to add tableBlockIndexUniqueIdentifiers
   */
  private void getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(Segment segment,
      SegmentMetaDataInfo segmentMetaDataInfo, IndexFilter filter,
      Set<TableBlockIndexUniqueIdentifier> identifiers,
      List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers) {
    boolean isScanRequired = false;
    Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap =
        segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap();
    int length = segmentColumnMetaDataInfoMap.size();
    // Add columnSchemas based on the columns present in segment
    List<ColumnSchema> columnSchemas = new ArrayList<>();
    byte[][] min = new byte[length][];
    byte[][] max = new byte[length][];
    boolean[] minMaxFlag = new boolean[length];
    int i = 0;

    // get current columnSchema list for the table
    Map<String, ColumnSchema> tableColumnSchemas =
        this.getCarbonTable().getTableInfo().getFactTable().getListOfColumns().stream()
            .collect(Collectors.toMap(ColumnSchema::getColumnUniqueId, ColumnSchema::clone));

    // fill min,max and columnSchema values
    for (Map.Entry<String, SegmentColumnMetaDataInfo> columnMetaData :
        segmentColumnMetaDataInfoMap.entrySet()) {
      ColumnSchema columnSchema = tableColumnSchemas.get(columnMetaData.getKey());
      if (null != columnSchema) {
        // get segment sort column and column drift info
        boolean isSortColumnInSegment = columnMetaData.getValue().isSortColumn();
        boolean isColumnDriftInSegment = columnMetaData.getValue().isColumnDrift();
        if (null != columnSchema.getColumnProperties()) {
          // get current sort column and column drift info from current columnSchema
          String isSortColumn =
              columnSchema.getColumnProperties().get(CarbonCommonConstants.SORT_COLUMNS);
          String isColumnDrift =
              columnSchema.getColumnProperties().get(CarbonCommonConstants.COLUMN_DRIFT);
          if (null != isSortColumn) {
            if (isSortColumn.equalsIgnoreCase("true") && !isSortColumnInSegment) {
              // Unset current column schema column properties
              modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
                  false);
            } else if (isSortColumn.equalsIgnoreCase("false") && isSortColumnInSegment) {
              // set sort column to true in current column schema column properties
              modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
                  true);
            }
          } else {
            modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
                false);
          }
        }
        columnSchemas.add(columnSchema);
        min[i] = columnMetaData.getValue().getColumnMinValue();
        max[i] = columnMetaData.getValue().getColumnMaxValue();
        minMaxFlag[i] = min[i].length != 0 && max[i].length != 0;
        i++;
      }
    }
    // get segmentProperties using created columnSchemas list
    SegmentProperties segmentProperties = SegmentPropertiesAndSchemaHolder.getInstance()
        .addSegmentProperties(this.getCarbonTable(), columnSchemas, segment.getSegmentNo())
        .getSegmentProperties();

    FilterResolverIntf resolver =
        new IndexFilter(segmentProperties, this.getCarbonTable(), filter.getExpression())
            .getResolver();
    // prepare filter executor using IndexFilter resolver
    FilterExecutor filterExecutor =
        FilterUtil.getFilterExecutorTree(resolver, segmentProperties, null, null, false);
    // check if block has to be pruned based on segment minmax
    BitSet scanRequired = filterExecutor.isScanRequired(max, min, minMaxFlag);
    if (!scanRequired.isEmpty()) {
      isScanRequired = true;
    }
    if (isScanRequired) {
      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
        tableBlockIndexUniqueIdentifierWrappers.add(
            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                this.getCarbonTable()));
      }
    }
  }

  private void modifyColumnSchemaForSortColumn(ColumnSchema columnSchema, boolean columnDrift,
      String isColumnDrift, boolean isSortColumnInSegment) {
    if (!isSortColumnInSegment) {
      if (null != isColumnDrift && isColumnDrift.equalsIgnoreCase("true") && !columnDrift) {
        columnSchema.setDimensionColumn(false);
      }
      columnSchema.setSortColumn(false);
      columnSchema.getColumnProperties().clear();
    } else {
      // modify column schema, if current columnSchema is changed
      columnSchema.setSortColumn(true);
      if (!columnSchema.isDimensionColumn()) {
        columnSchema.setDimensionColumn(true);
        columnSchema.getColumnProperties().put(CarbonCommonConstants.COLUMN_DRIFT, "true");
      }
      columnSchema.getColumnProperties().put(CarbonCommonConstants.SORT_COLUMNS, "true");
    }
  }

  @Override
  public List<CoarseGrainIndex> getIndexes(Segment segment) throws IOException {
    return getIndexes(segment, new HashSet<>());
  }

  @Override
  public List<CoarseGrainIndex> getIndexes(Segment segment,
      Set<Path> partitionLocations) throws IOException {
    List<CoarseGrainIndex> indexes = new ArrayList<>();
    Set<TableBlockIndexUniqueIdentifier> identifiers =
        getTableBlockIndexUniqueIdentifiers(segment);
    List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
        new ArrayList<>(identifiers.size());
    getTableBlockUniqueIdentifierWrappers(partitionLocations,
        tableBlockIndexUniqueIdentifierWrappers, identifiers);
    List<BlockletIndexWrapper> blockletIndexWrappers =
        cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
    for (BlockletIndexWrapper wrapper : blockletIndexWrappers) {
      indexes.addAll(wrapper.getIndexes());
    }
    return indexes;
  }

  public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
      throws IOException {
    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
    if (null != segmentBlockIndexInfo &&
            CollectionUtils.isNotEmpty(
                    segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers())) {
      if (null != segmentBlockIndexInfo.getSegmentMetaDataInfo()) {
        segment.setSegmentMetaDataInfo(
                segmentMap.get(segment.getSegmentNo()).getSegmentMetaDataInfo());
      }
      return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
    } else {
      tableBlockIndexUniqueIdentifiers =
          BlockletIndexUtil.getTableBlockUniqueIdentifiers(segment);
      if (tableBlockIndexUniqueIdentifiers.size() > 0) {
        segmentMap.put(segment.getSegmentNo(),
            new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
                segment.getSegmentMetaDataInfo()));
      }
    }
    return tableBlockIndexUniqueIdentifiers;
  }

  /**
   * Get the blocklet detail information based on blockletId, blockId and segmentId. This method is
   * exclusively for BlockletIndexFactory as detail information is only available in this
   * default index.
   */
  @Override
  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
      throws IOException {
    List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(blocklets.size() + 1);
    // if the blocklets is empty, return the empty detailed blocklets list directly.
    if (blocklets.size() == 0) {
      return detailedBlocklets;
    }
    // If it is already detailed blocklet then type cast and return same
    if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
      for (Blocklet blocklet : blocklets) {
        detailedBlocklets.add((ExtendedBlocklet) blocklet);
      }
      return detailedBlocklets;
    }
    Set<TableBlockIndexUniqueIdentifier> identifiers =
        getTableBlockIndexUniqueIdentifiers(segment);
    Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
        new HashSet<>(identifiers.size());
    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
      tableBlockIndexUniqueIdentifierWrappers.add(
          new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
              this.getCarbonTable()));
    }
    // Retrieve each blocklets detail information from blocklet index
    for (Blocklet blocklet : blocklets) {
      detailedBlocklets.add(getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet));
    }
    return detailedBlocklets;
  }

  @Override
  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
      throws IOException {
    if (blocklet instanceof ExtendedBlocklet) {
      return (ExtendedBlocklet) blocklet;
    }
    Set<TableBlockIndexUniqueIdentifier> identifiers =
        getTableBlockIndexUniqueIdentifiers(segment);

    Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
        new HashSet<>(identifiers.size());
    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
      tableBlockIndexUniqueIdentifierWrappers.add(
          new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
              this.getCarbonTable()));
    }
    return getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet);
  }

  private ExtendedBlocklet getExtendedBlocklet(
      Set<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper, Blocklet blocklet)
      throws IOException {
    for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : identifiersWrapper) {
      BlockletIndexWrapper wrapper = cache.get(identifierWrapper);
      List<BlockIndex> indexes = wrapper.getIndexes();
      for (Index index : indexes) {
        if (((BlockIndex) index)
            .getTableTaskInfo(BlockletIndexRowIndexes.SUMMARY_INDEX_FILE_NAME)
            .startsWith(blocklet.getFilePath())) {
          return ((BlockIndex) index).getDetailedBlocklet(blocklet.getBlockletId());
        }
      }
    }
    throw new IOException("Blocklet not found: " + blocklet.toString());
  }

  @Override
  public List<IndexInputSplit> toDistributable(Segment segment) {
    List<IndexInputSplit> distributableList = new ArrayList<>();
    try {
      BlockletIndexInputSplit distributable = new BlockletIndexInputSplit();
      distributable.setSegment(segment);
      distributable.setIndexSchema(INDEX_SCHEMA);
      distributable.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath(),
          segment.getSegmentNo()));
      distributableList.add(new IndexInputSplitWrapper(UUID.randomUUID().toString(),
          distributable).getDistributable());
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    return distributableList;
  }

  @Override
  public void fireEvent(Event event) {

  }

  @Override
  public void clear(String segment) {
    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.remove(segment);
    Set<TableBlockIndexUniqueIdentifier> blockIndexes = null;
    if (null != segmentBlockIndexInfo) {
      blockIndexes = segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
    }
    if (blockIndexes != null) {
      for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
        TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper =
            new TableBlockIndexUniqueIdentifierWrapper(blockIndex, this.getCarbonTable());
        BlockletIndexWrapper wrapper = cache.getIfPresent(blockIndexWrapper);
        if (null != wrapper) {
          List<BlockIndex> indexes = wrapper.getIndexes();
          for (Index index : indexes) {
            if (index != null) {
              cache.invalidate(blockIndexWrapper);
              index.clear();
            }
          }
        }
      }
    }
  }

  @Override
  public synchronized void clear() {
    if (segmentMap.size() > 0) {
      for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
        clear(segmentId);
      }
    }
  }

  @Override
  public String getCacheSize() {
    long sum = 0L;
    int numOfIndexFiles = 0;
    for (Map.Entry<String, SegmentBlockIndexInfo> entry : segmentMap.entrySet()) {
      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()
              .getTableBlockIndexUniqueIdentifiers()) {
        BlockletIndexWrapper blockletIndexWrapper = cache.getIfPresent(
            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                getCarbonTable()));
        if (blockletIndexWrapper != null) {
          sum += blockletIndexWrapper.getMemorySize();
          numOfIndexFiles++;
        }
      }
    }
    return numOfIndexFiles + ":" + sum;
  }

  @Override
  public List<CoarseGrainIndex> getIndexes(IndexInputSplit distributable)
      throws IOException {
    BlockletIndexInputSplit mapDistributable = (BlockletIndexInputSplit) distributable;
    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper;
    String segmentNo = mapDistributable.getSegment().getSegmentNo();
    if (mapDistributable.getSegmentPath() != null) {
      identifiersWrapper = getTableBlockIndexUniqueIdentifier(distributable);
    } else {
      identifiersWrapper =
          getTableBlockIndexUniqueIdentifier(mapDistributable.getFilePath(), segmentNo);
    }
    List<CoarseGrainIndex> indexes = new ArrayList<>();
    try {
      List<BlockletIndexWrapper> wrappers = cache.getAll(identifiersWrapper);
      for (BlockletIndexWrapper wrapper : wrappers) {
        indexes.addAll(wrapper.getIndexes());
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return indexes;
  }

  private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
      IndexInputSplit distributable) throws IOException {
    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
    SegmentBlockIndexInfo segmentBlockIndexInfo =
        segmentMap.get(distributable.getSegment().getSegmentNo());
    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
    if (null != segmentBlockIndexInfo) {
      tableBlockIndexUniqueIdentifiers =
          segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
    }
    if (tableBlockIndexUniqueIdentifiers == null) {
      tableBlockIndexUniqueIdentifiers = new HashSet<>();
      Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet();
      for (String indexFile : indexFiles) {
        CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
        String indexFileName;
        String mergeIndexName;
        if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
          indexFileName = carbonFile.getName();
          mergeIndexName = null;
        } else {
          indexFileName = carbonFile.getName();
          mergeIndexName = carbonFile.getName();
        }
        String parentPath = carbonFile.getParentFile().getAbsolutePath();
        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier =
            new TableBlockIndexUniqueIdentifier(parentPath, indexFileName, mergeIndexName,
                distributable.getSegment().getSegmentNo());
        identifiersWrapper.add(
            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                this.getCarbonTable()));
        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
      }
      segmentMap.put(distributable.getSegment().getSegmentNo(),
          new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
              distributable.getSegment().getSegmentMetaDataInfo()));
    } else {
      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
          tableBlockIndexUniqueIdentifiers) {
        identifiersWrapper.add(
            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                getCarbonTable()));
      }
    }
    return identifiersWrapper;
  }

  private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
      String indexFilePath, String segmentId) throws IOException {
    List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
    String parent = indexFilePath.substring(0, indexFilePath.lastIndexOf("/"));
    String name =
        indexFilePath.substring(indexFilePath.lastIndexOf("/") + 1, indexFilePath.length());
    if (indexFilePath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
      identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
          new TableBlockIndexUniqueIdentifier(parent, name, null, segmentId),
          this.getCarbonTable()));
    } else if (indexFilePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
      SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
      List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(indexFilePath);
      for (String indexFile : indexFiles) {
        identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
            new TableBlockIndexUniqueIdentifier(parent, indexFile, name,
                segmentId), this.getCarbonTable()));
      }
    }
    return identifiersWrapper;
  }

  @Override
  public IndexMeta getMeta() {
    // TODO: pass SORT_COLUMNS into this class
    return null;
  }

  @Override
  public void deleteIndexData(Segment segment) {

  }

  @Override
  public void deleteIndexData() {

  }

  @Override
  public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
    return getSegmentProperties(segment, new HashSet<>());
  }

  @Override
  public SegmentProperties getSegmentProperties(Segment segment, Set<Path> partitionLocations)
      throws IOException {
    List<CoarseGrainIndex> indexes = getIndexes(segment, partitionLocations);
    assert (indexes.size() > 0);
    CoarseGrainIndex coarseGrainIndex = indexes.get(0);
    assert (coarseGrainIndex instanceof BlockIndex);
    BlockIndex index = (BlockIndex) coarseGrainIndex;
    return index.getSegmentProperties();
  }

  @Override
  public SegmentProperties getSegmentPropertiesFromIndex(Index coarseGrainIndex) {
    assert (coarseGrainIndex instanceof BlockIndex);
    BlockIndex index = (BlockIndex) coarseGrainIndex;
    return index.getSegmentProperties();
  }

  @Override
  public List<Blocklet> getAllBlocklets(Segment segment, Set<Path> partitionLocations)
      throws IOException {
    List<Blocklet> blocklets = new ArrayList<>();
    List<CoarseGrainIndex> indexes = getIndexes(segment, partitionLocations);
    if (indexes.size() == 0) {
      return blocklets;
    }
    SegmentProperties segmentProperties = getSegmentPropertiesFromIndex(indexes.get(0));
    for (CoarseGrainIndex index : indexes) {
      blocklets.addAll(index
          .prune(null, segmentProperties, null, this.getCarbonTable()));
    }
    return blocklets;
  }

  @Override
  public boolean willBecomeStale(TableOperation operation) {
    return false;
  }

  @Override
  public void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
      BlockletIndexWrapper blockletIndexWrapper) throws IOException {
    cache.put(tableBlockIndexUniqueIdentifierWrapper, blockletIndexWrapper);
  }

  @Override
  public List<IndexInputSplit> getAllUncached(
      List<IndexInputSplit> distributableList) throws IOException {
    List<IndexInputSplit> distributableToBeLoaded = new ArrayList<>(distributableList.size());
    for (IndexInputSplit distributable : distributableList) {
      Segment segment = distributable.getSegment();
      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
          getTableBlockIndexUniqueIdentifiers(segment);
      // filter out the tableBlockIndexUniqueIdentifiers based on distributable
      TableBlockIndexUniqueIdentifier validIdentifier = BlockletIndexUtil
          .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
              (BlockletIndexInputSplit) distributable);
      if (null == cache.getIfPresent(
          new TableBlockIndexUniqueIdentifierWrapper(validIdentifier, this.getCarbonTable()))) {
        ((BlockletIndexInputSplit) distributable)
            .setTableBlockIndexUniqueIdentifier(validIdentifier);
        distributableToBeLoaded.add(distributable);
      }
    }
    return distributableToBeLoaded;
  }

  private Set<TableBlockIndexUniqueIdentifier> getTableSegmentUniqueIdentifiers(Segment segment)
      throws IOException {
    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
    if (segmentBlockIndexInfo == null) {
      return BlockletIndexUtil.getSegmentUniqueIdentifiers(segment);
    }
    return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
  }

  public void updateSegmentIndex(
      Map<String, Set<TableBlockIndexUniqueIdentifier>> indexUniqueIdentifiers) {
    for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> identifier : indexUniqueIdentifiers
        .entrySet()) {
      segmentMap.put(identifier.getKey(), new SegmentBlockIndexInfo(identifier.getValue(), null));
    }
  }

  @Override
  public List<IndexInputSplit> getAllUncached(List<Segment> validSegments,
      IndexExprWrapper indexExprWrapper) throws IOException {
    List<IndexInputSplit> distributableToBeLoaded = new ArrayList<>();
    for (Segment segment : validSegments) {
      IndexInputSplitWrapper indexInputSplitWrappers =
          indexExprWrapper.toDistributableSegment(segment);
      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
          getTableSegmentUniqueIdentifiers(segment);
      for (TableBlockIndexUniqueIdentifier identifier : tableBlockIndexUniqueIdentifiers) {
        BlockletIndexWrapper blockletIndexWrapper = cache.getIfPresent(
            new TableBlockIndexUniqueIdentifierWrapper(identifier, this.getCarbonTable()));
        if (identifier.getIndexFilePath() == null || blockletIndexWrapper == null) {
          ((BlockletIndexInputSplit) indexInputSplitWrappers.getDistributable())
              .setTableBlockIndexUniqueIdentifier(identifier);
          distributableToBeLoaded.add(indexInputSplitWrappers.getDistributable());
        }
      }
    }
    return distributableToBeLoaded;
  }

  @Override
  public IndexInputSplitWrapper toDistributableSegment(Segment segment,
      IndexSchema schema, AbsoluteTableIdentifier identifier, String uniqueId) {
    try {
      BlockletIndexInputSplit distributable = new BlockletIndexInputSplit();
      distributable.setIndexSchema(schema);
      distributable.setSegment(segment);
      distributable.setSegmentPath(
          CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
      distributable.setTablePath(identifier.getTablePath());
      return new IndexInputSplitWrapper(uniqueId, distributable);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

}
