/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.scan.executor.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode;
import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.util.BlockletIndexUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
 * This class provides a skeletal implementation of the {@link QueryExecutor}
 * interface to minimize the effort required to implement this interface. This
 * will be used to prepare all the properties required for query execution
 */
public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {

  private static final Logger LOGGER =
      LogServiceFactory.getLogService(AbstractQueryExecutor.class.getName());
  /**
   * holder for query properties which will be used to execute the query
   */
  protected QueryExecutorProperties queryProperties;

  // whether to clear/free unsafe memory or not
  private boolean freeUnsafeMemory;

  /**
   * query result iterator which will execute the query
   * and give the result
   */
  protected CarbonIterator queryIterator;

  // Size of the ReusableDataBuffer based on the number of dimension projection columns
  private int reusableDimensionBufferSize;

  public AbstractQueryExecutor(Configuration configuration) {
    ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
    queryProperties = new QueryExecutorProperties();
  }

  public void setExecutorService(ExecutorService executorService) {
    // add executor service for query execution
    queryProperties.executorService = executorService;
  }

  /**
   * Below method will be used to fill the executor properties based on query
   * model it will parse the query model and get the detail and fill it in
   * query properties
   *
   * @param queryModel
   */
  protected void initQuery(QueryModel queryModel) throws IOException {
    LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
        .getCarbonTableIdentifier().getTableName());
    this.freeUnsafeMemory = queryModel.isFreeUnsafeMemory();
    // Initializing statistics list to record the query statistics
    // creating copy on write to handle concurrent scenario
    queryProperties.queryStatisticsRecorder = queryModel.getStatisticsRecorder();
    if (null == queryProperties.queryStatisticsRecorder) {
      queryProperties.queryStatisticsRecorder =
          CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
      queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
    }
    QueryStatistic queryStatistic = new QueryStatistic();
    // sort the block info
    // so block will be loaded in sorted order this will be required for
    // query execution
    Collections.sort(queryModel.getTableBlockInfos());
    queryProperties.dataBlocks = getDataBlocks(queryModel);
    queryStatistic
        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);

    // as aggregation will be executed in following order
    // 1.aggregate dimension expression
    // 2. expression
    // 3. query measure
    // so calculating the index of the expression start index
    // and measure column start index
    queryProperties.filterMeasures = new HashSet<>();
    queryProperties.complexFilterDimension = new HashSet<>();
    if (queryModel.getIndexFilter() != null) {
      QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getIndexFilter().getResolver(),
          queryProperties.complexFilterDimension, queryProperties.filterMeasures);
    }
    queryStatistic = new QueryStatistic();
    queryStatistic
        .addStatistics(QueryStatisticsConstants.LOAD_DICTIONARY, System.currentTimeMillis());
    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
    for (int i = 0; i < queryModel.getProjectionDimensions().size(); i++) {
      reusableDimensionBufferSize++;
      findChildrenSize(queryModel.getProjectionDimensions().get(i).getDimension());
    }
  }

  // Recursively determine the size needed for ReusableDataBuffer[]
  private void findChildrenSize(CarbonDimension carbonDimension) {
    if (carbonDimension == null || carbonDimension.getListOfChildDimensions() == null) {
      return;
    }
    List<CarbonDimension> childDimension = carbonDimension.getListOfChildDimensions();
    reusableDimensionBufferSize += childDimension.size();
    for (CarbonDimension dimension : childDimension) {
      findChildrenSize(dimension);
    }
  }

  /**
   * Method returns the block(s) on which query will get executed
   *
   * @param queryModel
   * @return
   * @throws IOException
   */
  private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOException {
    Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
    // this is introduced to handle the case when CACHE_LEVEL=BLOCK and there are few other indexes
    // like lucene, Bloom created on the table. In that case all the indexes will do blocklet
    // level pruning and blockInfo entries will be repeated with different blockletIds
    Map<String, DataFileFooter> filePathToFileFooterMapping = new HashMap<>();
    Map<String, SegmentProperties> filePathToSegmentPropertiesMap = new HashMap<>();
    for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
      List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
      if (tableBlockInfos == null) {
        tableBlockInfos = new ArrayList<>();
        listMap.put(blockInfo.getFilePath(), tableBlockInfos);
      }
      SegmentProperties segmentProperties =
          filePathToSegmentPropertiesMap.get(blockInfo.getFilePath());
      BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
      // This case can come in 2 scenarios:
      // 1. old stores (1.1 or any prior version to 1.1) where blocklet information is not
      // available so read the blocklet information from block file
      // 2. CACHE_LEVEL is set to block
      // 3. CACHE_LEVEL is BLOCKLET but filter column min/max is not cached in driver
      if (null == blockletDetailInfo || blockletDetailInfo.getBlockletInfo() == null
          || blockletDetailInfo.isUseMinMaxForPruning()) {
        if (null != blockletDetailInfo) {
          blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
        }
        DataFileFooter fileFooter = filePathToFileFooterMapping.get(blockInfo.getFilePath());
        if (null != blockInfo.getDataFileFooter()) {
          fileFooter = blockInfo.getDataFileFooter();
        }
        if (null == fileFooter) {
          blockInfo.setDetailInfo(null);
          fileFooter = CarbonUtil.readMetadataFile(blockInfo);
          // In case of non transactional table just set columnUniqueId as columnName to support
          // backward compatibility. non transactional tables column uniqueId is always equal to
          // columnName
          if (!queryModel.getTable().isTransactionalTable()) {
            QueryUtil.updateColumnUniqueIdForNonTransactionTable(fileFooter.getColumnInTable());
          }
          filePathToFileFooterMapping.put(blockInfo.getFilePath(), fileFooter);
          if (null == blockletDetailInfo) {
            blockletDetailInfo = QueryUtil.getBlockletDetailInfo(fileFooter, blockInfo);
          }
          blockInfo.setCarbonDataFileWrittenVersion(fileFooter.getCarbonDataFileWrittenVersion());
          blockInfo.setDetailInfo(blockletDetailInfo);
        }
        if (null == segmentProperties) {
          segmentProperties = new SegmentProperties(fileFooter.getColumnInTable());
          createFilterExpression(queryModel, segmentProperties);
          updateColumns(queryModel, fileFooter.getColumnInTable(), blockInfo.getFilePath());
          filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
        }
        readAndFillBlockletInfo(tableBlockInfos, blockInfo, blockletDetailInfo, fileFooter);
      } else {
        if (null == segmentProperties) {
          segmentProperties = new SegmentProperties(blockInfo.getDetailInfo().getColumnSchemas());
          createFilterExpression(queryModel, segmentProperties);
          updateColumns(queryModel, blockInfo.getDetailInfo().getColumnSchemas(),
              blockInfo.getFilePath());
          filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
        }
        tableBlockInfos.add(blockInfo);
      }
    }
    List<AbstractIndex> indexList = new ArrayList<>();
    for (List<TableBlockInfo> tableBlockInfos : listMap.values()) {
      indexList.add(new IndexWrapper(tableBlockInfos,
          filePathToSegmentPropertiesMap.get(tableBlockInfos.get(0).getFilePath())));
    }
    return indexList;
  }

  /**
   * It updates dimensions and measures of query model. In few scenarios like SDK user can configure
   * sort options per load, so if first load has c1 as integer column and configure as sort column
   * then carbon treat that as dimension.But in second load if user change the sort option then the
   * c1 become measure as by default integers are measures. So this method updates the measures to
   * dimensions and vice versa as per the index file schema.
   */
  private void updateColumns(QueryModel queryModel, List<ColumnSchema> columnsInTable,
      String filePath) throws IOException {
    if (queryModel.getTable().isTransactionalTable()) {
      return;
    }
    // First validate the schema of the carbondata file if the same column name have different
    // datatype
    boolean sameColumnSchemaList = BlockletIndexUtil
        .isSameColumnAndDifferentDatatypeInSchema(columnsInTable,
            queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
    if (!sameColumnSchemaList) {
      LOGGER.error("Datatype of the common columns present in " + filePath + " doesn't match with"
          + "the column's datatype in table schema");
      throw new IOException("All common columns present in the files doesn't have same datatype. "
          + "Unsupported operation on nonTransactional table. Check logs.");
    }
    List<ProjectionDimension> dimensions = queryModel.getProjectionDimensions();
    List<ProjectionMeasure> measures = queryModel.getProjectionMeasures();
    List<ProjectionDimension> updatedDims = new ArrayList<>();
    List<ProjectionMeasure> updatedMsrs = new ArrayList<>();

    // Check and update dimensions to measures if it is measure in index file schema
    for (ProjectionDimension dimension : dimensions) {
      int index = columnsInTable.indexOf(dimension.getDimension().getColumnSchema());
      if (index > -1) {
        if (!columnsInTable.get(index).isDimensionColumn()) {
          ProjectionMeasure measure = new ProjectionMeasure(
              new CarbonMeasure(columnsInTable.get(index), dimension.getDimension().getOrdinal(),
                  dimension.getDimension().getSchemaOrdinal()));
          measure.setOrdinal(dimension.getOrdinal());
          updatedMsrs.add(measure);
        } else {
          updatedDims.add(dimension);
        }
      } else {
        updatedDims.add(dimension);
      }
    }

    // Check and update measure to dimension if it is dimension in index file schema.
    for (ProjectionMeasure measure : measures) {
      int index = columnsInTable.indexOf(measure.getMeasure().getColumnSchema());
      if (index > -1) {
        if (columnsInTable.get(index).isDimensionColumn()) {
          ProjectionDimension dimension = new ProjectionDimension(
              new CarbonDimension(columnsInTable.get(index), measure.getMeasure().getOrdinal(), -1,
                  measure.getMeasure().getSchemaOrdinal()));
          dimension.setOrdinal(measure.getOrdinal());
          updatedDims.add(dimension);
        } else {
          updatedMsrs.add(measure);
        }
      } else {
        updatedMsrs.add(measure);
      }
    }
    // Clear and update the query model projections.
    dimensions.clear();
    dimensions.addAll(updatedDims);
    measures.clear();
    measures.addAll(updatedMsrs);
  }

  private void createFilterExpression(QueryModel queryModel, SegmentProperties properties) {
    if (queryModel.getIndexFilter() != null) {
      if (!queryModel.getIndexFilter().isResolvedOnSegment(properties)) {
        IndexFilter expression = new IndexFilter(properties, queryModel.getTable(),
            queryModel.getIndexFilter().getExpression());
        queryModel.setIndexFilter(expression);
      }
    }
  }

  /**
   * Read the file footer of block file and get the blocklets to query
   */
  private void readAndFillBlockletInfo(List<TableBlockInfo> tableBlockInfos,
      TableBlockInfo blockInfo, BlockletDetailInfo blockletDetailInfo, DataFileFooter fileFooter) {
    List<BlockletInfo> blockletList = fileFooter.getBlockletList();
    // cases when blockletID will be -1
    // 1. In case of legacy store
    // 2. In case CACHE_LEVEL is block and no other index apart from blocklet index is
    // created for a table
    // In all above cases entries will be according to the number of blocks and not according to
    // number of blocklets
    if (blockletDetailInfo.getBlockletId() != -1) {
      // fill the info only for given blockletId in detailInfo
      BlockletInfo blockletInfo = blockletList.get(blockletDetailInfo.getBlockletId());
      fillBlockletInfoToTableBlock(tableBlockInfos, blockInfo, fileFooter,
          blockletInfo, blockletDetailInfo.getBlockletId());
    } else {
      short count = 0;
      for (BlockletInfo blockletInfo : blockletList) {
        fillBlockletInfoToTableBlock(tableBlockInfos, blockInfo, fileFooter,
            blockletInfo, count);
        count++;
      }
    }
  }

  private void fillBlockletInfoToTableBlock(List<TableBlockInfo> tableBlockInfos,
      TableBlockInfo blockInfo, DataFileFooter fileFooter, BlockletInfo blockletInfo,
      short blockletId) {
    TableBlockInfo info = blockInfo.copy();
    BlockletDetailInfo detailInfo = info.getDetailInfo();
    // set column schema details
    detailInfo.setColumnSchemas(fileFooter.getColumnInTable());
    detailInfo.setRowCount(blockletInfo.getNumberOfRows());
    byte[][] maxValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues();
    byte[][] minValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues();
    blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
    blockletInfo.getBlockletIndex().getMinMaxIndex().setMinValues(minValues);
    detailInfo.setBlockletInfo(blockletInfo);
    detailInfo.setBlockletId(blockletId);
    detailInfo.setPagesCount((short) blockletInfo.getNumberOfPages());
    tableBlockInfos.add(info);
  }

  protected List<BlockExecutionInfo> getBlockExecutionInfos(QueryModel queryModel)
      throws IOException {
    initQuery(queryModel);
    List<BlockExecutionInfo> blockExecutionInfoList = new ArrayList<BlockExecutionInfo>();
    // fill all the block execution infos for all the blocks selected in
    // query
    // and query will be executed based on that infos
    ReusableDataBuffer[] dimensionReusableDataBuffers = null;
    ReusableDataBuffer[] measureReusableDataBuffers = null;

    for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
      AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i);
      BlockletDataRefNode dataRefNode =
          (BlockletDataRefNode) abstractIndex.getDataRefNode();
      final BlockExecutionInfo blockExecutionInfoForBlock =
          getBlockExecutionInfoForBlock(
              queryModel,
              abstractIndex,
              dataRefNode.numberOfNodes(),
              dataRefNode.getTableBlockInfo().getFilePath(),
              dataRefNode.getTableBlockInfo().getDeletedDeltaFilePath(),
              dataRefNode.getTableBlockInfo().getSegment());
      if (null == dimensionReusableDataBuffers || null == measureReusableDataBuffers) {
        dimensionReusableDataBuffers = blockExecutionInfoForBlock.getDimensionReusableDataBuffer();
        measureReusableDataBuffers = blockExecutionInfoForBlock.getMeasureReusableDataBuffer();
      } else {
        if (dimensionReusableDataBuffers.length == blockExecutionInfoForBlock
            .getDimensionReusableDataBuffer().length) {
          blockExecutionInfoForBlock.setDimensionReusableDataBuffer(dimensionReusableDataBuffers);
        }
        if (measureReusableDataBuffers.length == blockExecutionInfoForBlock
            .getMeasureReusableDataBuffer().length) {
          blockExecutionInfoForBlock.setMeasureReusableDataBuffer(measureReusableDataBuffers);
        }
      }
      blockExecutionInfoList.add(blockExecutionInfoForBlock);
    }
    if (null != queryModel.getStatisticsRecorder()) {
      QueryStatistic queryStatistic = new QueryStatistic();
      queryStatistic.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKS_NUM,
          blockExecutionInfoList.size());
      queryModel.getStatisticsRecorder().recordStatistics(queryStatistic);
    }
    return blockExecutionInfoList;
  }

  /**
   * Below method will be used to get the block execution info which is
   * required to execute any block  based on query model
   *
   * @param queryModel query model from user query
   * @param blockIndex block index
   * @return block execution info
   */
  private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
      AbstractIndex blockIndex, int numberOfBlockletToScan, String filePath,
      String[] deleteDeltaFiles, Segment segment) {
    BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
    SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
    // set actual query dimensions and measures. It may differ in case of restructure scenarios
    RestructureUtil.actualProjectionOfSegment(blockExecutionInfo, queryModel, segmentProperties);
    // below is to get only those dimension in query which is present in the
    // table block
    List<ProjectionDimension> projectDimensions = RestructureUtil
        .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
            blockExecutionInfo.getActualQueryDimensions(), segmentProperties.getDimensions(),
            segmentProperties.getComplexDimensions(),
            blockExecutionInfo.getActualQueryMeasures().length,
            queryModel.getTable().getTableInfo().isTransactionalTable(), queryModel);
    boolean isStandardTable = CarbonUtil.isStandardCarbonTable(queryModel.getTable());
    String blockId = CarbonUtil
        .getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segment.getSegmentNo(),
            queryModel.getTable().getTableInfo().isTransactionalTable(),
            isStandardTable, queryModel.getTable().isHivePartitionTable());
    blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockId(blockId));
    blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
    blockExecutionInfo.setStartBlockletIndex(0);
    blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
    blockExecutionInfo.setProjectionDimensions(projectDimensions
        .toArray(new ProjectionDimension[0]));
    // get measures present in the current block
    List<ProjectionMeasure> projectionMeasures = RestructureUtil
        .createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
            blockExecutionInfo.getActualQueryMeasures(), segmentProperties.getMeasures(),
            queryModel.getTable().getTableInfo().isTransactionalTable(), queryModel);
    blockExecutionInfo.setProjectionMeasures(
        projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
    blockExecutionInfo.setDataBlock(blockIndex);
    // setting whether raw record query or not
    blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery());
    // total number dimension
    blockExecutionInfo
        .setTotalNumberDimensionToRead(
            segmentProperties.getDimensionOrdinalToChunkMapping().size());
    blockExecutionInfo.setReadOnlyDelta(queryModel.isReadOnlyDelta());
    if (queryModel.isReadPageByPage()) {
      blockExecutionInfo.setPrefetchBlocklet(false);
      LOGGER.info("Query prefetch is: false, read page by page");
    } else {
      LOGGER.info("Query prefetch is: " + queryModel.isPreFetchData());
      blockExecutionInfo.setPrefetchBlocklet(queryModel.isPreFetchData());
    }
    // In case of fg index it should not go to direct fill.
    boolean fgIndexPathPresent = false;
    for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
      fgIndexPathPresent = blockInfo.getIndexWriterPath() != null;
      if (fgIndexPathPresent) {
        queryModel.setDirectVectorFill(false);
        break;
      }
    }

    blockExecutionInfo.setDirectVectorFill(queryModel.isDirectVectorFill());
    blockExecutionInfo.setTotalNumberOfMeasureToRead(
        segmentProperties.getMeasuresOrdinalToChunkMapping().size());
    blockExecutionInfo.setComplexDimensionInfoMap(
        QueryUtil.getComplexDimensionsMap(
            projectDimensions,
            segmentProperties.getDimensionOrdinalToChunkMapping(),
            queryProperties.complexFilterDimension));
    if (null != queryModel.getIndexFilter()) {
      FilterResolverIntf filterResolverIntf;
      if (!filePath.startsWith(queryModel.getTable().getTablePath())) {
        filterResolverIntf = queryModel.getIndexFilter().getExternalSegmentResolver();
      } else {
        // loading the filter executor tree for filter evaluation
        filterResolverIntf = queryModel.getIndexFilter().getResolver();
      }
      blockExecutionInfo.setFilterExecutorTree(
          FilterUtil.getFilterExecutorTree(filterResolverIntf, segmentProperties,
              blockExecutionInfo.getComplexDimensionInfoMap(), false));
    }
    // expression measure
    List<CarbonMeasure> expressionMeasures =
        new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    // setting all the dimension chunk indexes to be read from file
    int numberOfElementToConsider = 0;
    // list of dimensions to be projected
    Set<Integer> allProjectionListDimensionIndexes = new LinkedHashSet<>();
    // create a list of filter dimensions present in the current block
    Set<CarbonDimension> currentBlockFilterDimensions =
        getCurrentBlockFilterDimensions(queryProperties.complexFilterDimension, segmentProperties);
    int[] dimensionChunkIndexes = QueryUtil.getDimensionChunkIndexes(projectDimensions,
        segmentProperties.getDimensionOrdinalToChunkMapping(),
        currentBlockFilterDimensions, allProjectionListDimensionIndexes);
    ReusableDataBuffer[] dimensionBuffer = new ReusableDataBuffer[reusableDimensionBufferSize];
    for (int i = 0; i < dimensionBuffer.length; i++) {
      dimensionBuffer[i] = new ReusableDataBuffer();
    }
    blockExecutionInfo.setDimensionReusableDataBuffer(dimensionBuffer);
    int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULT_VALUE));

    if (dimensionChunkIndexes.length > 0) {
      numberOfElementToConsider = dimensionChunkIndexes[dimensionChunkIndexes.length - 1]
          == segmentProperties.getBlockToDimensionOrdinalMapping().size() - 1 ?
          dimensionChunkIndexes.length - 1 :
          dimensionChunkIndexes.length;
      blockExecutionInfo.setAllSelectedDimensionColumnIndexRange(
          CarbonUtil.getRangeIndex(dimensionChunkIndexes, numberOfElementToConsider,
              numberOfColumnToBeReadInOneIO));
    } else {
      blockExecutionInfo.setAllSelectedDimensionColumnIndexRange(new int[0][0]);
    }
    // get the list of updated filter measures present in the current block
    Set<CarbonMeasure> filterMeasures =
        getCurrentBlockFilterMeasures(queryProperties.filterMeasures, segmentProperties);
    // list of measures to be projected
    List<Integer> allProjectionListMeasureIndexes = new ArrayList<>();
    int[] measureChunkIndexes = QueryUtil.getMeasureChunkIndexes(
        projectionMeasures, expressionMeasures,
        segmentProperties.getMeasuresOrdinalToChunkMapping(), filterMeasures,
        allProjectionListMeasureIndexes);
    ReusableDataBuffer[] measureBuffer =
        new ReusableDataBuffer[allProjectionListMeasureIndexes.size()];
    for (int i = 0; i < measureBuffer.length; i++) {
      measureBuffer[i] = new ReusableDataBuffer();
    }
    blockExecutionInfo.setMeasureReusableDataBuffer(measureBuffer);
    if (measureChunkIndexes.length > 0) {

      numberOfElementToConsider = measureChunkIndexes[measureChunkIndexes.length - 1]
          == segmentProperties.getMeasures().size() - 1 ?
          measureChunkIndexes.length - 1 :
          measureChunkIndexes.length;
      // setting all the measure chunk indexes to be read from file
      blockExecutionInfo.setAllSelectedMeasureIndexRange(
          CarbonUtil.getRangeIndex(
              measureChunkIndexes, numberOfElementToConsider,
              numberOfColumnToBeReadInOneIO));
    } else {
      blockExecutionInfo.setAllSelectedMeasureIndexRange(new int[0][0]);
    }
    // setting the indexes of list of dimension in projection list
    blockExecutionInfo.setProjectionListDimensionIndexes(ArrayUtils.toPrimitive(
        allProjectionListDimensionIndexes
            .toArray(new Integer[allProjectionListDimensionIndexes.size()])));
    // setting the indexes of list of measures in projection list
    blockExecutionInfo.setProjectionListMeasureIndexes(ArrayUtils.toPrimitive(
        allProjectionListMeasureIndexes
            .toArray(new Integer[allProjectionListMeasureIndexes.size()])));
    // setting the size of fixed key column (dictionary column)
    blockExecutionInfo
        .setFixedLengthKeySize(getKeySize(projectDimensions, segmentProperties));
    List<Integer> dictionaryColumnChunkIndex = new ArrayList<Integer>();
    List<Integer> noDictionaryColumnChunkIndex = new ArrayList<Integer>();
    // get the block index to be read from file for query dimension
    // for both dictionary columns and no dictionary columns
    QueryUtil.fillQueryDimensionChunkIndexes(projectDimensions,
        segmentProperties.getDimensionOrdinalToChunkMapping(), dictionaryColumnChunkIndex,
        noDictionaryColumnChunkIndex);
    int[] queryDictionaryColumnChunkIndexes = ArrayUtils.toPrimitive(
        dictionaryColumnChunkIndex.toArray(new Integer[dictionaryColumnChunkIndex.size()]));
    // need to sort the dictionary column as for all dimension
    // column key will be filled based on key order
    if (!queryModel.isForcedDetailRawQuery()) {
      Arrays.sort(queryDictionaryColumnChunkIndexes);
    }
    blockExecutionInfo.setDictionaryColumnChunkIndex(queryDictionaryColumnChunkIndexes);
    // setting the no dictionary column block indexes
    blockExecutionInfo.setNoDictionaryColumnChunkIndexes(ArrayUtils.toPrimitive(
        noDictionaryColumnChunkIndex.toArray(new Integer[noDictionaryColumnChunkIndex.size()])));
    blockExecutionInfo.setComplexColumnParentBlockIndexes(
        getComplexDimensionParentBlockIndexes(projectDimensions));
    blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
    DataTypeUtil.setDataTypeConverter(queryModel.getConverter());
    blockExecutionInfo.setRequiredRowId(queryModel.isRequiredRowId());
    return blockExecutionInfo;
  }

  /**
   * This method will be used to get fixed key length size this will be used
   * to create a row from column chunk
   *
   * @param queryDimension    query dimension
   * @param blockMetadataInfo block metadata info
   * @return key size
   */
  private int getKeySize(List<ProjectionDimension> queryDimension,
      SegmentProperties blockMetadataInfo) {
    // add the dimension block ordinal for each dictionary column
    // existing in the current block dimensions. Set is used because in case of column groups
    // ordinal of columns in a column group will be same
    Set<Integer> fixedLengthDimensionOrdinal =
        new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    int counter = 0;
    while (counter < queryDimension.size()) {
      if (queryDimension.get(counter).getDimension().getNumberOfChild() > 0) {
        counter += queryDimension.get(counter).getDimension().getNumberOfChild();
      } else if (queryDimension.get(counter).getDimension().getDataType() != DataTypes.DATE) {
        counter++;
      } else {
        fixedLengthDimensionOrdinal.add(blockMetadataInfo.getDimensionOrdinalToChunkMapping()
            .get(queryDimension.get(counter).getDimension().getOrdinal()));
        counter++;
      }
    }
    int[] dictionaryColumnOrdinal = ArrayUtils.toPrimitive(
        fixedLengthDimensionOrdinal.toArray(new Integer[fixedLengthDimensionOrdinal.size()]));
    // calculate the size of existing query dictionary columns in this block
    if (blockMetadataInfo.getNumberOfDictDimensions() > 0) {
      int[] eachColumnValueSize = blockMetadataInfo.createDimColumnValueLength();
      int keySize = 0;
      for (int i = 0; i < dictionaryColumnOrdinal.length; i++) {
        keySize += eachColumnValueSize[dictionaryColumnOrdinal[i]];
      }
      return keySize;
    }
    return 0;
  }

  private int[] getComplexDimensionParentBlockIndexes(List<ProjectionDimension> queryDimensions) {
    List<Integer> parentBlockIndexList = new ArrayList<Integer>();
    for (ProjectionDimension queryDimension : queryDimensions) {
      if (queryDimension.getDimension().getDataType().isComplexType()) {
        if (null != queryDimension.getDimension().getComplexParentDimension()) {
          if (queryDimension.getDimension().isComplex()) {
            parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
          } else {
            parentBlockIndexList.add(queryDimension.getParentDimension().getOrdinal());
          }
        } else {
          parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
        }
      }
    }
    return ArrayUtils
        .toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()]));
  }

  /**
   * This method will create the updated list of filter measures present in the current block
   *
   * @param queryFilterMeasures
   * @param segmentProperties
   * @return
   */
  private Set<CarbonMeasure> getCurrentBlockFilterMeasures(Set<CarbonMeasure> queryFilterMeasures,
      SegmentProperties segmentProperties) {
    if (!queryFilterMeasures.isEmpty()) {
      Set<CarbonMeasure> updatedFilterMeasures = new HashSet<>(queryFilterMeasures.size());
      for (CarbonMeasure queryMeasure : queryFilterMeasures) {
        CarbonMeasure measureFromCurrentBlock =
            segmentProperties.getMeasureFromCurrentBlock(queryMeasure);
        if (null != measureFromCurrentBlock) {
          updatedFilterMeasures.add(measureFromCurrentBlock);
        }
      }
      return updatedFilterMeasures;
    } else {
      return queryFilterMeasures;
    }
  }

  /**
   * This method will create the updated list of filter dimensions present in the current block
   *
   * @param queryFilterDimensions
   * @param segmentProperties
   * @return
   */
  private Set<CarbonDimension> getCurrentBlockFilterDimensions(
      Set<CarbonDimension> queryFilterDimensions, SegmentProperties segmentProperties) {
    if (!queryFilterDimensions.isEmpty()) {
      Set<CarbonDimension> updatedFilterDimensions = new HashSet<>(queryFilterDimensions.size());
      for (CarbonDimension queryDimension : queryFilterDimensions) {
        CarbonDimension dimensionFromCurrentBlock =
            segmentProperties.getDimensionFromCurrentBlock(queryDimension);
        if (null != dimensionFromCurrentBlock) {
          updatedFilterDimensions.add(dimensionFromCurrentBlock);
        }
      }
      return updatedFilterDimensions;
    } else {
      return queryFilterDimensions;
    }
  }

  /**
   * Below method will be used to finish the execution
   *
   * @throws QueryExecutionException
   */
  @Override
  public void finish() throws QueryExecutionException {
    CarbonUtil.clearBlockCache(queryProperties.dataBlocks);
    Throwable exceptionOccurred = null;
    if (null != queryIterator) {
      // catch if there is any exception so that it can be rethrown after clearing all the resources
      // else if any exception is thrown from this point executor service will not be terminated
      try {
        queryIterator.close();
      } catch (Throwable e) {
        exceptionOccurred = e;
      }
    }
    // clear all the unsafe memory used for the given task ID only if it is necessary to be cleared
    if (freeUnsafeMemory) {
      UnsafeMemoryManager.INSTANCE
          .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
      ThreadLocalTaskInfo.clearCarbonTaskInfo();
    }
    if (null != queryProperties.executorService) {
      // In case of limit query when number of limit records is already found so executors
      // must stop all the running execution otherwise it will keep running and will hit
      // the query performance.
      queryProperties.executorService.shutdownNow();
    }
    // if there is any exception re throw the exception
    if (null != exceptionOccurred) {
      throw new QueryExecutionException(exceptionOccurred);
    }
    DataTypeUtil.clearFormatter();
  }

}
