/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.datastore.page.encoding;

import java.math.BigDecimal;

import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.PlainDimensionIndexCodec;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.util.DataTypeUtil;

/**
 * Default factory will select encoding base on column page data type and statistics
 */
public class DefaultEncodingFactory extends EncodingFactory {

  private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
  private static final int THREE_BYTES_MIN = -THREE_BYTES_MAX - 1;

  private static final boolean newWay = false;

  private static EncodingFactory encodingFactory = new DefaultEncodingFactory();

  public static EncodingFactory getInstance() {
    // TODO: make it configurable after added new encodingFactory
    return encodingFactory;
  }

  @Override
  public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
    // TODO: add log
    // choose the encoding type for measure type and no dictionary primitive type columns
    if (columnSpec instanceof TableSpec.MeasureSpec || (
        DataTypeUtil.isPrimitiveColumn(columnSpec.getSchemaDataType())
            && columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
      return createEncoderForMeasureOrNoDictionaryPrimitive(inputPage, columnSpec);
    } else {
      if (newWay) {
        return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
      } else {
        assert columnSpec instanceof TableSpec.DimensionSpec;
        return createEncoderForDimensionLegacy((TableSpec.DimensionSpec) columnSpec);
      }
    }
  }

  private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
      ColumnPage inputPage) {
    switch (columnSpec.getColumnType()) {
      case DIRECT_DICTIONARY:
      case PLAIN_VALUE:
        return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
      case COMPLEX:
        return new ComplexDimensionIndexCodec(false, false).createEncoder(null);
      default:
        throw new RuntimeException("unsupported dimension type: " + columnSpec.getColumnType());
    }
  }

  private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) {
    switch (dimensionSpec.getColumnType()) {
      case DIRECT_DICTIONARY:
        return new DirectDictDimensionIndexCodec(
            dimensionSpec.isInSortColumns(),
            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
            .createEncoder(null);
      case PLAIN_VALUE:
        return new PlainDimensionIndexCodec(dimensionSpec.isInSortColumns(),
            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR
                || dimensionSpec.getSchemaDataType() == DataTypes.BINARY).createEncoder(null);
      default:
        throw new RuntimeException("unsupported dimension type: " +
            dimensionSpec.getColumnType());
    }
  }

  private ColumnPageEncoder createEncoderForMeasureOrNoDictionaryPrimitive(ColumnPage columnPage,
      TableSpec.ColumnSpec columnSpec) {

    SimpleStatsResult stats = columnPage.getStatistics();
    DataType dataType = stats.getDataType();
    if (dataType == DataTypes.BOOLEAN
        || dataType == DataTypes.BYTE_ARRAY
        || columnPage.getDataType() == DataTypes.BINARY) {
      return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
    } else if (dataType == DataTypes.BYTE ||
        dataType == DataTypes.SHORT ||
        dataType == DataTypes.INT ||
        dataType == DataTypes.LONG ||
        dataType == DataTypes.TIMESTAMP) {
      return selectCodecByAlgorithmForIntegral(stats, false, columnSpec).createEncoder(null);
    } else if (DataTypes.isDecimal(dataType)) {
      return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec);
    } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
      return selectCodecByAlgorithmForFloating(stats, false, columnSpec).createEncoder(null);
    } else {
      throw new RuntimeException("unsupported data type: " + stats.getDataType());
    }
  }

  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage,
      TableSpec.ColumnSpec columnSpec) {
    DecimalConverterFactory.DecimalConverterType decimalConverterType =
        ((DecimalColumnPage) columnPage).getDecimalConverter().getDecimalConverterType();
    switch (decimalConverterType) {
      case DECIMAL_INT:
      case DECIMAL_LONG:
        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType,
            columnSpec)
            .createEncoder(null);
      default:
        return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
    }
  }

  private static DataType fitLongMinMax(long max, long min) {
    if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
      return DataTypes.BYTE;
    } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
      return DataTypes.SHORT;
    } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
      return DataTypes.SHORT_INT;
    } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
      return DataTypes.INT;
    } else {
      return DataTypes.LONG;
    }
  }

  private static DataType fitMinMax(DataType dataType, Object max, Object min) {
    if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
      return fitLongMinMax((byte) max, (byte) min);
    } else if (dataType == DataTypes.SHORT) {
      return fitLongMinMax((short) max, (short) min);
    } else if (dataType == DataTypes.INT) {
      return fitLongMinMax((int) max, (int) min);
    } else if ((dataType == DataTypes.LONG) || (dataType == DataTypes.TIMESTAMP)) {
      return fitLongMinMax((long) max, (long) min);
    } else if (dataType == DataTypes.DOUBLE) {
      return fitLongMinMax((long) (double) max, (long) (double) min);
    } else {
      throw new RuntimeException("internal error: " + dataType);
    }
  }

  private static DataType fitMinMaxForDecimalType(DataType dataType, Object max, Object min,
      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
    long maxValue = ((BigDecimal) max).unscaledValue().longValue();
    long minValue = ((BigDecimal) min).unscaledValue().longValue();
    switch (decimalConverterType) {
      case DECIMAL_INT:
        return fitLongMinMax((int) maxValue, (int) minValue);
      case DECIMAL_LONG:
        return fitLongMinMax(maxValue, minValue);
      default:
        throw new RuntimeException("internal error: " + dataType);
    }
  }

  private static DataType fitDeltaForDecimalType(DataType dataType, Object max, Object min,
      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
    long maxValue = ((BigDecimal) max).unscaledValue().longValue();
    long minValue = ((BigDecimal) min).unscaledValue().longValue();
    switch (decimalConverterType) {
      case DECIMAL_INT:
        long value = maxValue - minValue;
        return compareMinMaxAndSelectDataType(value);
      case DECIMAL_LONG:
        return DataTypes.LONG;
      default:
        throw new RuntimeException("internal error: " + dataType);
    }
  }

  // fit the long input value into minimum data type
  private static DataType fitDelta(DataType dataType, Object max, Object min) {
    // use long data type to calculate delta to avoid overflow
    long value;
    if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
      value = (long) (byte) max - (long) (byte) min;
    } else if (dataType == DataTypes.SHORT) {
      value = (long) (short) max - (long) (short) min;
    } else if (dataType == DataTypes.INT) {
      value = (long) (int) max - (long) (int) min;
    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
      value = (long) max - (long) min;
      // The subtraction overflowed iff the operands have opposing signs
      // and the result's sign differs from the minuend.
      boolean overflow = (((long) max ^ (long) min) & ((long) max ^ value)) < 0;
      if (overflow) {
        return DataTypes.LONG;
      }
    } else if (dataType == DataTypes.DOUBLE) {
      return DataTypes.LONG;
    } else {
      throw new RuntimeException("internal error: " + dataType);
    }
    return compareMinMaxAndSelectDataType(value);
  }

  private static DataType compareMinMaxAndSelectDataType(long value) {
    return fitLongMinMax(value, value);
  }

  /**
   * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
   * size is smaller
   */
  static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats,
      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
    DataType srcDataType = stats.getDataType();
    DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());

    DataType deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
    // for complex primitive, if source and destination data type is same, use adaptive encoding.
    if (!isComplexPrimitive) {
      // in case of decimal datatype, check if the decimal converter type is Int or Long and based
      // on that get size in bytes
      if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
          .getSizeInBytes()) {
        // no effect to use adaptive or delta, use compression only
        return new DirectCompressCodec(stats.getDataType());
      }
    }
    boolean isInvertedIndex = isInvertedIndex(isComplexPrimitive, columnSpec);
    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
      // choose adaptive encoding
      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
          isInvertedIndex);
    } else {
      // choose delta adaptive encoding
      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
          isInvertedIndex);
    }
  }

  /**
   * Check whether the column is sort column and inverted index column
   *
   * @param isComplexPrimitive
   * @param columnSpec
   * @return
   */
  private static boolean isInvertedIndex(boolean isComplexPrimitive,
      TableSpec.ColumnSpec columnSpec) {
    boolean isSort;
    boolean isInvertedIndex = false;
    if (columnSpec instanceof TableSpec.DimensionSpec && !isComplexPrimitive) {
      isSort = ((TableSpec.DimensionSpec) columnSpec).isInSortColumns();
      isInvertedIndex = isSort && ((TableSpec.DimensionSpec) columnSpec).isDoInvertedIndex();
    }
    return isInvertedIndex;
  }

  // choose between upscale adaptive encoder or upscale delta adaptive encoder,
  // based on whose target data type size is smaller
  static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats,
      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
    DataType srcDataType = stats.getDataType();
    double maxValue;
    double minValue;
    if (srcDataType == DataTypes.FLOAT) {
      maxValue = (float) stats.getMax();
      minValue = (float) stats.getMin();
    } else {
      maxValue = (double) stats.getMax();
      minValue = (double) stats.getMin();
    }
    int decimalCount = stats.getDecimalCount();

    // For Complex Type primitive we should always choose adaptive path
    // as LV format will be reduced to only V format. Therefore inorder
    // to do that decimal count should be actual count instead of -1.
    if (isComplexPrimitive && decimalCount == -1 && stats instanceof PrimitivePageStatsCollector) {
      decimalCount = ((PrimitivePageStatsCollector)stats).getDecimalForComplexPrimitive();
    }

    //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
    //but we can't use -1 to getDatatype, we should use -10000000.
    double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
    if (srcDataType == DataTypes.FLOAT && decimalCount == 0) {
      return getColumnPageCodec(stats, isComplexPrimitive, columnSpec, srcDataType, maxValue,
          minValue, decimalCount, absMaxValue);
    } else if (decimalCount == 0) {
      // short, int, long
      return selectCodecByAlgorithmForIntegral(stats, false, columnSpec);
    } else if (decimalCount < 0 && !isComplexPrimitive) {
      return new DirectCompressCodec(DataTypes.DOUBLE);
    } else {
      return getColumnPageCodec(stats, isComplexPrimitive, columnSpec, srcDataType, maxValue,
          minValue, decimalCount, absMaxValue);
    }
  }

  private static ColumnPageCodec getColumnPageCodec(SimpleStatsResult stats,
      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec, DataType srcDataType,
      double maxValue, double minValue, int decimalCount, double absMaxValue) {
    // double
    // If absMaxValue exceeds LONG.MAX_VALUE, then go for direct compression
    if ((Math.pow(10, decimalCount) * absMaxValue) > Long.MAX_VALUE) {
      return new DirectCompressCodec(DataTypes.DOUBLE);
    } else {
      long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
      DataType adaptiveDataType = fitLongMinMax(max, 0);
      DataType deltaDataType = compareMinMaxAndSelectDataType(
          (long) (Math.pow(10, decimalCount) * (maxValue - minValue)));
      if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) {
        return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats,
            isInvertedIndex(isComplexPrimitive, columnSpec));
      } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes() || (
          (isComplexPrimitive) && (adaptiveDataType.getSizeInBytes() == DataTypes.DOUBLE
              .getSizeInBytes()))) {
        return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats,
            isInvertedIndex(isComplexPrimitive, columnSpec));
      } else {
        return new DirectCompressCodec(DataTypes.DOUBLE);
      }
    }
  }

  /**
   * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
   * size is smaller for decimal data type
   */
  static ColumnPageCodec selectCodecByAlgorithmForDecimal(SimpleStatsResult stats,
      DecimalConverterFactory.DecimalConverterType decimalConverterType,
      TableSpec.ColumnSpec columnSpec) {
    DataType srcDataType = stats.getDataType();
    DataType adaptiveDataType =
        fitMinMaxForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
            decimalConverterType);
    DataType deltaDataType;

    if (adaptiveDataType == DataTypes.LONG) {
      deltaDataType = DataTypes.LONG;
    } else {
      deltaDataType = fitDeltaForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
          decimalConverterType);
    }
    // in case of decimal data type check if the decimal converter type is Int or Long and based on
    // that get size in bytes
    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
        .getSizeInBytes()) {
      // no effect to use adaptive or delta, use compression only
      return new DirectCompressCodec(stats.getDataType());
    }
    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
      // choose adaptive encoding
      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
          isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
    } else {
      // choose delta adaptive encoding
      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
          isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
    }
  }

}
