/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.scan.collector.impl;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.complextypes.StructQueryType;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.result.BlockletScannedResult;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;

import org.apache.commons.lang3.ArrayUtils;

/**
 * It is not a collector it is just a scanned result holder.
 */
public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {

  protected ProjectionDimension[] queryDimensions;

  protected ProjectionMeasure[] queryMeasures;

  private DirectDictionaryGenerator[] directDictionaryGenerators;

  /**
   * query order
   */
  protected int[] order;

  private int[] actualIndexInSurrogateKey;

  private boolean[] implicitColumnArray;

  private boolean[] complexDataTypeArray;

  int dictionaryColumnIndex;
  int noDictionaryColumnIndex;
  int complexTypeColumnIndex;


  boolean isDimensionExists;

  private int[] surrogateResult;
  private byte[][] noDictionaryKeys;
  private byte[][] complexTypeKeyArray;

  protected Map<Integer, GenericQueryType> complexDimensionInfoMap;

  /**
   * Field of this Map is the parent Column and associated child columns.
   * Final Projection should be a merged list consist of only parents.
   */
  private Map<Integer, List<Integer>> parentToChildColumnsMap = new HashMap<>();

  /**
   * Map to hold the complex parent ordinal of each query dimension
   */
  private List<Integer> queryDimensionToComplexParentOrdinal = new ArrayList<>();

  /**
   * Fields of this Map of Parent Ordinal with the List is the Child Column Dimension and
   * the corresponding data buffer of that column.
   */
  private Map<Integer, Map<CarbonDimension, ByteBuffer>> mergedComplexDimensionIndex =
      new HashMap<>();

  private boolean readOnlyDelta;

  public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
    super(blockExecutionInfos);
    queryDimensions = executionInfo.getProjectionDimensions();
    queryMeasures = executionInfo.getProjectionMeasures();
    initDimensionAndMeasureIndexesForFillingData();
    isDimensionExists = queryDimensions.length > 0;
    this.complexDimensionInfoMap = executionInfo.getComplexDimensionInfoMap();
    this.readOnlyDelta = executionInfo.isReadOnlyDelta();
  }

  /**
   * This method will add a record both key and value to list object
   * it will keep track of how many record is processed, to handle limit scenario
   */
  @Override
  public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, int batchSize) {
    // scan the record and add to list
    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
    int rowCounter = 0;
    boolean isStructQueryType = false;
    for (Object obj : scannedResult.complexParentIndexToQueryMap.values()) {
      if (obj instanceof StructQueryType) {
        //if any one of the map elements contains struct,need to shift rows if contains null.
        isStructQueryType = true;
        break;
      }
    }
    boolean[] isComplexChildColumn = null;
    if (isStructQueryType) {
      // need to identify complex child columns for shifting rows if contains null
      isComplexChildColumn = new boolean[queryDimensions.length + queryMeasures.length];
      for (ProjectionDimension dimension : queryDimensions) {
        if (null != dimension.getDimension().getComplexParentDimension()) {
          isComplexChildColumn[dimension.getOrdinal()] = true;
        }
      }
    }
    while (scannedResult.hasNext() && rowCounter < batchSize) {
      scannedResult.incrementCounter();
      if (readOnlyDelta) {
        if (!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId()) &&
                scannedResult.getCurrentDeleteDeltaVo() != null) {
          continue;
        }
      } else {
        if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
          continue;
        }
      }
      Object[] row = new Object[queryDimensions.length + queryMeasures.length];
      if (isDimensionExists) {
        surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
        noDictionaryKeys = scannedResult.getNoDictionaryKeyArray();
        complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
        dictionaryColumnIndex = 0;
        noDictionaryColumnIndex = 0;
        complexTypeColumnIndex = 0;

        // get the complex columns data of this row
        fillComplexColumnDataBufferForThisRow();
        for (int i = 0; i < queryDimensions.length; i++) {
          fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
              complexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal());
        }
      }
      fillMeasureData(scannedResult, row);
      if (isStructQueryType) {
        shiftNullForStruct(row, isComplexChildColumn);
      }
      listBasedResult.add(row);
      rowCounter++;
    }
    return listBasedResult;
  }

  /**
   * shift the complex column null to the end
   *
   * @param row
   * @param isComplexChildColumn
   */
  private void shiftNullForStruct(Object[] row, boolean[] isComplexChildColumn) {
    int count = 0;
    // If a : <b,c> and d : <e,f> are two struct and if a.b,a.c,d.e is given in the
    // projection list,then object array will contain a,null,d as result, because for a.b,
    // a will be filled and for a.c null will be placed.
    // Instead place null in the end of object array and send a,d,null as result.
    for (int j = 0; j < row.length; j++) {
      if (null == row[j] && !isComplexChildColumn[j]) {
        // if it is a primitive column, don't shift the null to the end.
        row[count++] = null;
      } else if (null != row[j]) {
        row[count++] = row[j];
      }
    }
    // fill the skipped content
    while (count < row.length) row[count++] = null;
  }

  private void fillComplexColumnDataBufferForThisRow() {
    mergedComplexDimensionIndex.clear();
    int noDictionaryComplexColumnIndex = 0;
    int complexTypeComplexColumnIndex = 0;
    for (int i = 0; i < queryDimensions.length; i++) {
      int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
      if (complexParentOrdinal != -1) {
        Map<CarbonDimension, ByteBuffer> childColumnByteBuffer;
        // Add the parent and the child ordinal to the parentToChildColumnsMap
        if (mergedComplexDimensionIndex.get(complexParentOrdinal) == null) {
          childColumnByteBuffer = new HashMap<>();
        } else {
          childColumnByteBuffer = mergedComplexDimensionIndex.get(complexParentOrdinal);
        }

        // send the byte buffer for the complex columns. Currently expected columns for
        // complex types are
        // a) Complex Columns
        // b) No Dictionary columns.
        // TODO have to fill out for dictionary columns. Once the support for push down in
        // complex dictionary columns comes.
        ByteBuffer buffer;
        if (queryDimensions[i].getDimension().getDataType() != DataTypes.DATE) {
          if (implicitColumnArray[i]) {
            throw new RuntimeException("Not Supported Column Type");
          } else if (complexDataTypeArray[i]) {
            buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
          } else {
            buffer = ByteBuffer.wrap(noDictionaryKeys[noDictionaryComplexColumnIndex++]);
          }
        } else if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
          throw new RuntimeException("Direct Dictionary Column Type Not Supported Yet.");
        } else if (complexDataTypeArray[i]) {
          buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
        } else {
          throw new RuntimeException("Not Supported Column Type");
        }

        childColumnByteBuffer
            .put(queryDimensions[i].getDimension(), buffer);
        mergedComplexDimensionIndex.put(complexParentOrdinal, childColumnByteBuffer);
      } else if (!queryDimensions[i].getDimension().isComplex()) {
        // If Dimension is not a Complex Column, then increment index for noDictionaryComplexColumn
        noDictionaryComplexColumnIndex++;
      }
    }
  }

  /**
   * fill the data of dimension columns into row
   *
   * @param scannedResult
   * @param surrogateResult
   * @param noDictionaryKeys
   * @param complexTypeKeyArray
   * @param complexDimensionInfoMap
   * @param row: row data
   * @param i: dimension columns index
   * @param actualOrdinal: the actual ordinal of dimension columns in segment
   *
   */
  void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResult,
      byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
      Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
      int actualOrdinal) {
    if (queryDimensions[i].getDimension().getDataType() != DataTypes.DATE) {
      if (implicitColumnArray[i]) {
        if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
            .equals(queryDimensions[i].getColumnName())) {
          row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
              scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
                  .getCurrentPageCounter() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
                  .getCurrentRowId(), DataTypes.STRING);
        } else {
          row[order[i]] =
              DataTypeUtil.getDataBasedOnDataType(scannedResult.getBlockletId(), DataTypes.STRING);
        }
      } else if (complexDataTypeArray[i]) {
        // Complex Type With No Dictionary Encoding.
        if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
          fillRow(complexDimensionInfoMap, row, i,
              ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
        } else {
          row[order[i]] =
              complexDimensionInfoMap.get(actualOrdinal).getDataBasedOnDataType(
                  ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
        }
      } else {
        if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
          // When the parent Ordinal is not -1 then this is a predicate is being pushed down
          // for complex column.
          fillRow(complexDimensionInfoMap, row, i,
              ByteBuffer.wrap(noDictionaryKeys[noDictionaryColumnIndex++]));
        } else {
          row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
              noDictionaryKeys[noDictionaryColumnIndex++],
              queryDimensions[i].getDimension().getDataType());
        }
      }
    } else if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
      if (directDictionaryGenerators[i] != null) {
        row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
            surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
      }
    } else if (complexDataTypeArray[i]) {
      row[order[i]] = complexDimensionInfoMap.get(actualOrdinal)
          .getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
      dictionaryColumnIndex++;
    } else {
      row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
    }
  }

  private void fillRow(Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
      ByteBuffer wrap) {
    if (parentToChildColumnsMap.get(queryDimensionToComplexParentOrdinal.get(i)).size() > 1) {
      fillRowForComplexColumn(complexDimensionInfoMap, row, i);
    } else {
      row[order[i]] = complexDimensionInfoMap.get(queryDimensionToComplexParentOrdinal.get(i))
          .getDataBasedOnColumn(wrap, queryDimensions[i].getDimension().getComplexParentDimension(),
              queryDimensions[i].getDimension());
    }
  }

  private void fillRowForComplexColumn(Map<Integer, GenericQueryType> complexDimensionInfoMap,
      Object[] row, int i) {
    // When multiple columns are then the first child elements is only going to make
    // parent Object Array. For all other cases it should be null.
    // For e.g. a : <b,c,d>. here as a is the parent column and b, c, d are child columns
    // during traversal when we encounter the first element in list i.e. column 'b'
    // a will be completely filled. In case when column 'c' and 'd' encountered then
    // only place null in the output.
    int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
    List<Integer> childColumns = parentToChildColumnsMap.get(complexParentOrdinal);
    if (childColumns.get(0).equals(queryDimensions[i].getDimension().getOrdinal())) {
      // Fill out Parent Column.
      row[order[i]] = complexDimensionInfoMap.get(complexParentOrdinal).getDataBasedOnColumnList(
          mergedComplexDimensionIndex.get(queryDimensions[i].getParentDimension().getOrdinal()),
          queryDimensions[i].getParentDimension());
    } else {
      row[order[i]] = null;
    }
  }

  void fillMeasureData(BlockletScannedResult scannedResult, Object[] row) {
    if (measureInfo.getMeasureDataTypes().length > 0) {
      Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
      fillMeasureData(msrValues, 0, scannedResult);
      for (int i = 0; i < msrValues.length; i++) {
        row[order[i + queryDimensions.length]] = msrValues[i];
      }
    }
  }

  void initDimensionAndMeasureIndexesForFillingData() {
    List<Integer> dictionaryIndexes = new ArrayList<Integer>();
    for (int i = 0; i < queryDimensions.length; i++) {
      if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
        dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
      }
    }
    int[] primitive =
        ArrayUtils.toPrimitive(dictionaryIndexes.toArray(new Integer[dictionaryIndexes.size()]));
    Arrays.sort(primitive);
    actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
    int index = 0;

    implicitColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
    complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);

    parentToChildColumnsMap.clear();
    queryDimensionToComplexParentOrdinal.clear();
    for (int i = 0; i < queryDimensions.length; i++) {
      if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
        actualIndexInSurrogateKey[index++] =
            Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
      }
      if (null != queryDimensions[i].getDimension().getComplexParentDimension()) {
        // Add the parent and the child ordinal to the parentToChildColumnsMap
        int complexParentOrdinal =
            queryDimensions[i].getDimension().getComplexParentDimension().getOrdinal();
        queryDimensionToComplexParentOrdinal.add(complexParentOrdinal);
        if (parentToChildColumnsMap.get(complexParentOrdinal) == null) {
          // Add the parent and child ordinal in the map
          List<Integer> childOrdinals = new ArrayList<>();
          childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
          parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);

        } else {
          List<Integer> childOrdinals = parentToChildColumnsMap.get(complexParentOrdinal);
          childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
          parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);
        }
      } else {
        queryDimensionToComplexParentOrdinal.add(-1);
      }
    }

    order = new int[queryDimensions.length + queryMeasures.length];
    for (int i = 0; i < queryDimensions.length; i++) {
      order[i] = queryDimensions[i].getOrdinal();
    }
    for (int i = 0; i < queryMeasures.length; i++) {
      order[i + queryDimensions.length] = queryMeasures[i].getOrdinal();
    }
    directDictionaryGenerators = new DirectDictionaryGenerator[queryDimensions.length];
    for (int i = 0; i < queryDimensions.length; i++) {
      directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
          .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
    }
  }
}
