/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.hadoop.api;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapChooser;
import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.DataMapUtil;
import org.apache.carbondata.core.datamap.DistributableDataMapFormat;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datamap.dev.expr.DataMapWrapperSimpleInfo;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeConverter;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.log4j.Logger;

/**
 * Base class for carbondata input format, there are two input format implementations:
 * 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support.
 *
 * 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support,
 * such as segment and explicit schema metadata.
 *
 * @param <T>
 */
public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
  // comma separated list of input segment numbers
  public static final String INPUT_SEGMENT_NUMBERS =
      "mapreduce.input.carboninputformat.segmentnumbers";
  // comma separated list of input files
  private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
  private static final Logger LOG =
      LogServiceFactory.getLogService(CarbonInputFormat.class.getName());
  private static final String FILTER_PREDICATE =
      "mapreduce.input.carboninputformat.filter.predicate";
  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
  private static final String CARBON_TRANSACTIONAL_TABLE =
      "mapreduce.input.carboninputformat.transactional";
  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
  public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
  public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
  private static final String PARTITIONS_TO_PRUNE =
      "mapreduce.input.carboninputformat.partitions.to.prune";
  private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
  private static final String READ_COMMITTED_SCOPE =
      "mapreduce.input.carboninputformat.read.committed.scope";

  // record segment number and hit blocks
  protected int numSegments = 0;
  protected int numStreamSegments = 0;
  protected int numStreamFiles = 0;
  protected int hitedStreamFiles = 0;
  protected int numBlocks = 0;
  protected List fileLists = null;

  private CarbonTable carbonTable;

  public int getNumSegments() {
    return numSegments;
  }

  public int getNumStreamSegments() {
    return numStreamSegments;
  }

  public int getNumStreamFiles() {
    return numStreamFiles;
  }

  public int getHitedStreamFiles() {
    return hitedStreamFiles;
  }

  public int getNumBlocks() {
    return numBlocks;
  }

  public void setFileLists(List fileLists) {
    this.fileLists = fileLists;
  }

  /**
   * Set the `tableInfo` in `configuration`
   */
  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
      throws IOException {
    if (null != tableInfo) {
      configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
    }
  }

  /**
   * Get TableInfo object from `configuration`
   */
  protected static TableInfo getTableInfo(Configuration configuration) throws IOException {
    String tableInfoStr = configuration.get(TABLE_INFO);
    if (tableInfoStr == null) {
      return null;
    } else {
      TableInfo output = new TableInfo();
      output.readFields(new DataInputStream(
          new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
      return output;
    }
  }

  /**
   * Get the cached CarbonTable or create it by TableInfo in `configuration`
   */
  public CarbonTable getOrCreateCarbonTable(Configuration configuration)
      throws IOException {
    if (carbonTable == null) {
      // carbon table should be created either from deserialized table info (schema saved in
      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
      TableInfo tableInfo = getTableInfo(configuration);
      CarbonTable carbonTable;
      if (tableInfo != null) {
        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
      } else {
        carbonTable = SchemaReader.readCarbonTableFromStore(
            getAbsoluteTableIdentifier(configuration));
      }
      this.carbonTable = carbonTable;
      return carbonTable;
    } else {
      return this.carbonTable;
    }
  }

  public static void setTablePath(Configuration configuration, String tablePath) {
    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
  }

  public static void setTransactionalTable(Configuration configuration,
      boolean isTransactionalTable) {
    configuration.set(CARBON_TRANSACTIONAL_TABLE, String.valueOf(isTransactionalTable));
  }

  public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
  }

  /**
   * It sets unresolved filter expression.
   *
   * @param configuration
   * @para    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
m filterExpression
   */
  public static void setFilterPredicates(Configuration configuration,
      DataMapFilter filterExpression) {
    if (filterExpression == null || filterExpression.getExpression() == null) {
      return;
    }
    try {
      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
      configuration.set(FILTER_PREDICATE, filterString);
    } catch (Exception e) {
      throw new RuntimeException("Error while setting filter expression to Job", e);
    }
  }

  /**
   * Set the column projection column names
   *
   * @param configuration     Configuration info
   * @param projectionColumns projection columns name
   */
  public static void setColumnProjection(Configuration configuration, String[] projectionColumns) {
    Objects.requireNonNull(projectionColumns);
    if (projectionColumns.length < 1) {
      throw new RuntimeException("Projection can't be empty");
    }
    StringBuilder builder = new StringBuilder();
    for (String column : projectionColumns) {
      builder.append(column).append(",");
    }
    String columnString = builder.toString();
    columnString = columnString.substring(0, columnString.length() - 1);
    configuration.set(COLUMN_PROJECTION, columnString);
  }

  /**
   * Set the column projection column names from CarbonProjection
   *
   * @param configuration Configuration info
   * @param projection    CarbonProjection object that includes unique projection column name
   */
  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
    if (projection == null || projection.isEmpty()) {
      return;
    }
    String[] allColumns = projection.getAllColumns();
    StringBuilder builder = new StringBuilder();
    for (String column : allColumns) {
      builder.append(column).append(",");
    }
    String columnString = builder.toString();
    columnString = columnString.substring(0, columnString.length() - 1);
    configuration.set(COLUMN_PROJECTION, columnString);
  }

  public static String getColumnProjection(Configuration configuration) {
    return configuration.get(COLUMN_PROJECTION);
  }

  public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
    configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
  }

  public static boolean isFgDataMapPruningEnable(Configuration configuration) {
    String enable = configuration.get(FGDATAMAP_PRUNING);

    // if FDDATAMAP_PRUNING is not set, by default we will use FGDataMap
    return (enable == null) || enable.equalsIgnoreCase("true");
  }

  /**
   * Set list of segments to access
   */
  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
  }

  /**
   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
   */
  public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
    String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
    String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
    setQuerySegmentToAccess(conf, dbName, tbName);
  }

  /**
   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
   */
  public static void setQuerySegment(Configuration conf, String segmentList) {
    if (!segmentList.trim().equals("*")) {
      CarbonInputFormat
          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(","), null));
    }
  }

  /**
   * set list of partitions to prune
   */
  public static void setPartitionsToPrune(Configuration configuration,
      List<PartitionSpec> partitions) {
    if (partitions == null) {
      return;
    }
    try {
      String partitionString =
          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
      configuration.set(PARTITIONS_TO_PRUNE, partitionString);
    } catch (Exception e) {
      throw new RuntimeException(
          "Error while setting partition information to Job" + partitions, e);
    }
  }

  /**
   * get list of partitions to prune
   */
  public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
      throws IOException {
    String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
    if (partitionString != null) {
      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
    }
    return null;
  }

  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
      throws IOException {
    String tablePath = configuration.get(INPUT_DIR, "");
    try {
      return AbsoluteTableIdentifier
          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
    } catch (InvalidConfigurationException e) {
      throw new IOException(e);
    }
  }

  public static void setReadCommittedScope(Configuration configuration,
      ReadCommittedScope committedScope) {
    if (committedScope == null) {
      return;
    }
    try {
      String subFoldersString = ObjectSerializationUtil.convertObjectToString(committedScope);
      configuration.set(READ_COMMITTED_SCOPE, subFoldersString);
    } catch (Exception e) {
      throw new RuntimeException(
          "Error while setting committedScope information to Job" + committedScope, e);
    }
  }

  public static ReadCommittedScope getReadCommittedScope(Configuration configuration)
      throws IOException {
    String subFoldersString = configuration.get(READ_COMMITTED_SCOPE);
    if (subFoldersString != null) {
      return (ReadCommittedScope) ObjectSerializationUtil.convertStringToObject(subFoldersString);
    }
    return null;
  }

  /**
   * {@inheritDoc}
   * Configurations FileInputFormat.INPUT_DIR
   * are used to get table path to read.
   *
   * @param job
   * @return List<InputSplit> list of CarbonInputSplit
   * @throws IOException
   */
  @Override
  public abstract List<InputSplit> getSplits(JobContext job) throws IOException;

  /**
   * This method will execute a distributed job(DistributedDataMapJob) to get the count for the
   * table. If the DistributedDataMapJob fails for some reason then an embedded job is fired to
   * get the count.
   */
  Long getDistributedCount(CarbonTable table,
      List<PartitionSpec> partitionNames, List<Segment> validSegments) throws IOException {
    DistributableDataMapFormat dataMapFormat =
        new DistributableDataMapFormat(table, null, validSegments, new ArrayList<String>(),
            partitionNames, false, null, false, false);
    dataMapFormat.setIsWriteToFile(false);
    try {
      DataMapJob dataMapJob =
          (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
      if (dataMapJob == null) {
        throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
      }
      return dataMapJob.executeCountJob(dataMapFormat);
    } catch (Exception e) {
      LOG.error("Failed to get count from index server. Initializing fallback", e);
      DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob();
      return dataMapJob.executeCountJob(dataMapFormat);
    }
  }

  List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table,
      List<PartitionSpec> partitionNames, List<Segment> validSegments,
      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) throws IOException {
    return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
        segmentsToBeRefreshed, true);
  }

  private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
      FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
      List<Segment> validSegments, List<Segment> invalidSegments,
      List<String> segmentsToBeRefreshed, boolean isCountJob) throws IOException {
    try {
      DataMapJob dataMapJob =
          (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
      if (dataMapJob == null) {
        throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
      }
      return DataMapUtil
          .executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments,
              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
    } catch (Exception e) {
      // Check if fallback is disabled for testing purposes then directly throw exception.
      if (CarbonProperties.getInstance().isFallBackDisabled()) {
        throw e;
      }
      LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
          + "back to embedded mode", e);
      return DataMapUtil.executeDataMapJob(table, filterResolverIntf,
          DataMapUtil.getEmbeddedJob(), partitionNames, validSegments,
          invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
    }
  }

  protected DataMapFilter getFilterPredicates(Configuration configuration) {
    try {
      String filterExprString = configuration.get(FILTER_PREDICATE);
      if (filterExprString == null) {
        return null;
      }
      DataMapFilter filter =
          (DataMapFilter) ObjectSerializationUtil.convertStringToObject(filterExprString);
      if (filter != null) {
        CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
        filter.setTable(carbonTable);
      }
      return filter;
    } catch (IOException e) {
      throw new RuntimeException("Error while reading filter expression", e);
    }
  }

  /**
   * get data blocks of given segment
   */
  protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
      DataMapFilter expression, List<Segment> segmentIds,
      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed)
      throws IOException {

    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
    QueryStatistic statistic = new QueryStatistic();

    // get tokens for all the required FileSystem for table path
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
        new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
    List<ExtendedBlocklet> prunedBlocklets =
        getPrunedBlocklets(job, carbonTable, expression, segmentIds, invalidSegments,
            segmentsToBeRefreshed);
    List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
    for (ExtendedBlocklet blocklet : prunedBlocklets) {
      // matchedPartitions variable will be null in two cases as follows
      // 1. the table is not a partition table
      // 2. the table is a partition table, and all partitions are matched by query
      // for partition table, the task id of carbaondata file name is the partition id.
      // if this partition is not required, here will skip it.
      resultFilteredBlocks.add(blocklet.getInputSplit());
    }
    statistic
        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
    return resultFilteredBlocks;
  }

  /**
   * for explain command
   * get number of block by counting distinct file path of blocklets
   */
  private int getBlockCount(List<ExtendedBlocklet> blocklets) {
    Set<String> filepaths = new HashSet<>();
    for (ExtendedBlocklet blocklet: blocklets) {
      filepaths.add(blocklet.getPath());
    }
    return filepaths.size();
  }

  /**
   * Prune the blocklets using the filter expression with available datamaps.
   * First pruned with default blocklet datamap, then pruned with CG and FG datamaps
   */
  private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
      DataMapFilter filter, List<Segment> segmentIds, List<Segment> invalidSegments,
      List<String> segmentsToBeRefreshed) throws IOException {
    ExplainCollector.addPruningInfo(carbonTable.getTableName());
    filter = filter == null ? new DataMapFilter(carbonTable, null) : filter;
    ExplainCollector.setFilterStatement(
        filter.getExpression() == null ? "none" : filter.getExpression().getStatement());
    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
    DataMapJob dataMapJob = DataMapUtil.getDataMapJob(job.getConfiguration());
    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
    // First prune using default datamap on driver side.
    TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable);
    List<ExtendedBlocklet> prunedBlocklets = null;
    // This is to log the event, so user will know what is happening by seeing logs.
    LOG.info("Started block pruning ...");
    boolean isDistributedPruningEnabled = CarbonProperties.getInstance()
        .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName());
    if (isDistributedPruningEnabled) {
      try {
        prunedBlocklets =
            getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
                invalidSegments, segmentsToBeRefreshed, false);
      } catch (Exception e) {
        // Check if fallback is disabled then directly throw exception otherwise try driver
        // pruning.
        if (CarbonProperties.getInstance().isFallBackDisabled()) {
          throw e;
        }
        prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
      }
    } else {
      prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);

      if (ExplainCollector.enabled()) {
        ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
      }

      if (prunedBlocklets.size() == 0) {
        return prunedBlocklets;
      }

      DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));

      // Get the available CG datamaps and prune further.
      DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver());

      if (cgDataMapExprWrapper != null) {
        // Prune segments from already pruned blocklets
        DataMapUtil.pruneSegments(segmentIds, prunedBlocklets);
        List<ExtendedBlocklet> cgPrunedBlocklets = new ArrayList<>();
        boolean isCGPruneFallback = false;
        // Again prune with CG datamap.
        try {
          if (distributedCG && dataMapJob != null) {
            cgPrunedBlocklets = DataMapUtil
                .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
                    segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList<String>());
          } else {
            cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
          }
        } catch (Exception e) {
          isCGPruneFallback = true;
          LOG.error("CG datamap pruning failed.", e);
        }
        // If isCGPruneFallback = true, it means that CG datamap pruning failed,
        // hence no need to do intersect and simply pass the prunedBlocklets from default datamap
        if (!isCGPruneFallback) {
          // since index datamap prune in segment scope,
          // the result need to intersect with previous pruned result
          prunedBlocklets =
              intersectFilteredBlocklets(carbonTable, prunedBlocklets, cgPrunedBlocklets);
        }
        if (ExplainCollector.enabled()) {
          ExplainCollector.recordCGDataMapPruning(
              DataMapWrapperSimpleInfo.fromDataMapWrapper(cgDataMapExprWrapper),
              prunedBlocklets.size(), getBlockCount(prunedBlocklets));
        }
      }

      if (prunedBlocklets.size() == 0) {
        return prunedBlocklets;
      }
      // Now try to prune with FG DataMap.
      if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
        DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver());
        List<ExtendedBlocklet> fgPrunedBlocklets;
        if (fgDataMapExprWrapper != null) {
          // Prune segments from already pruned blocklets
          DataMapUtil.pruneSegments(segmentIds, prunedBlocklets);
          // Prune segments from already pruned blocklets
          fgPrunedBlocklets = DataMapUtil
              .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
                  segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(),
                  new ArrayList<String>());
          // note that the 'fgPrunedBlocklets' has extra datamap related info compared with
          // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
          prunedBlocklets =
              intersectFilteredBlocklets(carbonTable, prunedBlocklets, fgPrunedBlocklets);
          ExplainCollector.recordFGDataMapPruning(
              DataMapWrapperSimpleInfo.fromDataMapWrapper(fgDataMapExprWrapper),
              prunedBlocklets.size(), getBlockCount(prunedBlocklets));
        }
      }
    }
    LOG.info("Finished block pruning ...");
    return prunedBlocklets;
  }

  private List<ExtendedBlocklet> intersectFilteredBlocklets(CarbonTable carbonTable,
      List<ExtendedBlocklet> previousDataMapPrunedBlocklets,
      List<ExtendedBlocklet> otherDataMapPrunedBlocklets) {
    List<ExtendedBlocklet> prunedBlocklets = null;
    if (BlockletDataMapUtil.isCacheLevelBlock(carbonTable)) {
      prunedBlocklets = new ArrayList<>();
      for (ExtendedBlocklet otherBlocklet : otherDataMapPrunedBlocklets) {
        if (previousDataMapPrunedBlocklets.contains(otherBlocklet)) {
          prunedBlocklets.add(otherBlocklet);
        }
      }
    } else {
      prunedBlocklets = (List) CollectionUtils
          .intersection(otherDataMapPrunedBlocklets, previousDataMapPrunedBlocklets);
    }
    return prunedBlocklets;
  }

  static List<InputSplit> convertToCarbonInputSplit(List<ExtendedBlocklet> extendedBlocklets) {
    List<InputSplit> resultFilteredBlocks = new ArrayList<>();
    for (ExtendedBlocklet blocklet : extendedBlocklets) {
      if (blocklet != null) {
        resultFilteredBlocks.add(blocklet.getInputSplit());
      }
    }
    return resultFilteredBlocks;
  }

  @Override
  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    Configuration configuration = taskAttemptContext.getConfiguration();
    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext,
        getFilterPredicates(taskAttemptContext.getConfiguration()));
    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
    return new CarbonRecordReader<T>(queryModel, readSupport,
        taskAttemptContext.getConfiguration());
  }

  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
      throws IOException {
    return createQueryModel(inputSplit, taskAttemptContext,
        getFilterPredicates(taskAttemptContext.getConfiguration()));
  }

  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext,
      DataMapFilter dataMapFilter) throws IOException {
    Configuration configuration = taskAttemptContext.getConfiguration();
    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);

    // set projection column in the query model
    String projectionString = getColumnProjection(configuration);
    String[] projectColumns;
    if (projectionString != null) {
      projectColumns = projectionString.split(",");
    } else {
      projectColumns = new String[]{};
    }
    if (dataMapFilter != null) {
      checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit);
    }
    return new QueryModelBuilder(carbonTable)
        .projectColumns(projectColumns)
        .filterExpression(dataMapFilter)
        .dataConverter(getDataTypeConverter(configuration))
        .build();
  }

  /**
   * This method will create an Implict Expression and set it as right child in the given
   * expression
   *
   * @param expression
   * @param inputSplit
   */
  private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) {
    if (inputSplit instanceof CarbonMultiBlockSplit) {
      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
      List<CarbonInputSplit> splits = split.getAllSplits();
      // iterate over all the splits and create block to bblocklet mapping
      Map<String, Set<Integer>> blockIdToBlockletIdMapping = new HashMap<>();
      for (CarbonInputSplit carbonInputSplit : splits) {
        Set<Integer> validBlockletIds = carbonInputSplit.getValidBlockletIds();
        if (null != validBlockletIds && !validBlockletIds.isEmpty()) {
          String uniqueBlockPath = carbonInputSplit.getFilePath();
          String shortBlockPath = CarbonTablePath
              .getShortBlockId(uniqueBlockPath.substring(uniqueBlockPath.lastIndexOf("/Part") + 1));
          blockIdToBlockletIdMapping.put(shortBlockPath, validBlockletIds);
        }
      }
      if (!blockIdToBlockletIdMapping.isEmpty()) {
        // create implicit expression and set as right child
        FilterUtil
            .createImplicitExpressionAndSetAsRightChild(expression, blockIdToBlockletIdMapping);
      }
    }
  }

  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
    //By default it uses dictionary decoder read class
    CarbonReadSupport<T> readSupport = null;
    if (readSupportClass != null) {
      try {
        Class<?> myClass = Class.forName(readSupportClass);
        Constructor<?> constructor = myClass.getConstructors()[0];
        Object object = constructor.newInstance();
        if (object instanceof CarbonReadSupport) {
          readSupport = (CarbonReadSupport) object;
        }
      } catch (ClassNotFoundException ex) {
        LOG.error("Class " + readSupportClass + "not found", ex);
      } catch (Exception ex) {
        LOG.error("Error while creating " + readSupportClass, ex);
      }
    } else {
      readSupport = new DictionaryDecodeReadSupport<>();
    }
    return readSupport;
  }

  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
    try {
      // Don't split the file if it is local file system
      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
      if (fileSystem instanceof LocalFileSystem) {
        return false;
      }
    } catch (Exception e) {
      return true;
    }
    return true;
  }

  public static void setCarbonReadSupport(Configuration configuration,
      Class<? extends CarbonReadSupport> readSupportClass) {
    if (readSupportClass != null) {
      configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
    }
  }

  /**
   * It is optional, if user does not set then it reads from store
   *
   * @param configuration
   * @param converterClass is the Data type converter for different computing engine
   */
  public static void setDataTypeConverter(
      Configuration configuration, Class<? extends DataTypeConverter> converterClass) {
    if (null != converterClass) {
      configuration.set(CARBON_CONVERTER, converterClass.getCanonicalName());
    }
  }

  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
      throws IOException {
    String converterClass = configuration.get(CARBON_CONVERTER);
    if (converterClass == null) {
      return new DataTypeConverterImpl();
    }

    try {
      return (DataTypeConverter) Class.forName(converterClass).newInstance();
    } catch (Exception e) {
      throw new IOException(e);
    }
  }

  public static void setDatabaseName(Configuration configuration, String databaseName) {
    if (null != databaseName) {
      configuration.set(DATABASE_NAME, databaseName);
    }
  }

  public static String getDatabaseName(Configuration configuration)
      throws InvalidConfigurationException {
    String databseName = configuration.get(DATABASE_NAME);
    if (null == databseName) {
      throw new InvalidConfigurationException("Database name is not set.");
    }
    return databseName;
  }

  public static void setTableName(Configuration configuration, String tableName) {
    if (null != tableName) {
      configuration.set(TABLE_NAME, tableName);
    }
  }

  public static String getTableName(Configuration configuration)
      throws InvalidConfigurationException {
    String tableName = configuration.get(TABLE_NAME);
    if (tableName == null) {
      throw new InvalidConfigurationException("Table name is not set");
    }
    return tableName;
  }

  /**
   * Project all Columns for carbon reader
   *
   * @return String araay of columnNames
   * @param carbonTable
   */
  public String[] projectAllColumns(CarbonTable carbonTable) {
    List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns();
    List<String> projectColumns = new ArrayList<>();
    // complex type and add just the parent column name while skipping the child columns.
    for (ColumnSchema col : colList) {
      if (!col.getColumnName().contains(".")) {
        projectColumns.add(col.getColumnName());
      }
    }
    return projectColumns.toArray(new String[projectColumns.size()]);
  }

  private static void setQuerySegmentToAccess(Configuration conf, String dbName, String tableName) {
    String segmentNumbersFromProperty = CarbonProperties.getInstance()
        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tableName, "*");
    if (!segmentNumbersFromProperty.trim().equals("*")) {
      CarbonInputFormat.setSegmentsToAccess(conf,
          Segment.toSegmentList(segmentNumbersFromProperty.split(","), null));
    }
  }

  /**
   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
   */
  public static void setQuerySegment(Configuration conf, CarbonTable carbonTable) {
    String tableName = carbonTable.getTableName();
    setQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
  }

}
