blob: b43f5662ba5349595272799e7943bbcfe69f036b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.page.encoding;
import java.math.BigDecimal;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.PlainDimensionIndexCodec;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.util.DataTypeUtil;
/**
* Default factory will select encoding base on column page data type and statistics
*/
public class DefaultEncodingFactory extends EncodingFactory {
private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
private static final int THREE_BYTES_MIN = -THREE_BYTES_MAX - 1;
private static final boolean newWay = false;
private static EncodingFactory encodingFactory = new DefaultEncodingFactory();
public static EncodingFactory getInstance() {
// TODO: make it configurable after added new encodingFactory
return encodingFactory;
}
@Override
public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
// TODO: add log
// choose the encoding type for measure type and no dictionary primitive type columns
if (columnSpec instanceof TableSpec.MeasureSpec || (
DataTypeUtil.isPrimitiveColumn(columnSpec.getSchemaDataType())
&& columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
return createEncoderForMeasureOrNoDictionaryPrimitive(inputPage, columnSpec);
} else {
if (newWay) {
return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
} else {
assert columnSpec instanceof TableSpec.DimensionSpec;
return createEncoderForDimensionLegacy((TableSpec.DimensionSpec) columnSpec);
}
}
}
private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
ColumnPage inputPage) {
switch (columnSpec.getColumnType()) {
case DIRECT_DICTIONARY:
case PLAIN_VALUE:
return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
case COMPLEX:
return new ComplexDimensionIndexCodec(false, false).createEncoder(null);
default:
throw new RuntimeException("unsupported dimension type: " + columnSpec.getColumnType());
}
}
private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) {
switch (dimensionSpec.getColumnType()) {
case DIRECT_DICTIONARY:
return new DirectDictDimensionIndexCodec(
dimensionSpec.isInSortColumns(),
dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
.createEncoder(null);
case PLAIN_VALUE:
return new PlainDimensionIndexCodec(dimensionSpec.isInSortColumns(),
dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR
|| dimensionSpec.getSchemaDataType() == DataTypes.BINARY).createEncoder(null);
default:
throw new RuntimeException("unsupported dimension type: " +
dimensionSpec.getColumnType());
}
}
private ColumnPageEncoder createEncoderForMeasureOrNoDictionaryPrimitive(ColumnPage columnPage,
TableSpec.ColumnSpec columnSpec) {
SimpleStatsResult stats = columnPage.getStatistics();
DataType dataType = stats.getDataType();
if (dataType == DataTypes.BOOLEAN
|| dataType == DataTypes.BYTE_ARRAY
|| columnPage.getDataType() == DataTypes.BINARY) {
return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
} else if (dataType == DataTypes.BYTE ||
dataType == DataTypes.SHORT ||
dataType == DataTypes.INT ||
dataType == DataTypes.LONG ||
dataType == DataTypes.TIMESTAMP) {
return selectCodecByAlgorithmForIntegral(stats, false, columnSpec).createEncoder(null);
} else if (DataTypes.isDecimal(dataType)) {
return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec);
} else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
return selectCodecByAlgorithmForFloating(stats, false, columnSpec).createEncoder(null);
} else {
throw new RuntimeException("unsupported data type: " + stats.getDataType());
}
}
private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage,
TableSpec.ColumnSpec columnSpec) {
DecimalConverterFactory.DecimalConverterType decimalConverterType =
((DecimalColumnPage) columnPage).getDecimalConverter().getDecimalConverterType();
switch (decimalConverterType) {
case DECIMAL_INT:
case DECIMAL_LONG:
return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType,
columnSpec)
.createEncoder(null);
default:
return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
}
}
private static DataType fitLongMinMax(long max, long min) {
if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
return DataTypes.BYTE;
} else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
return DataTypes.SHORT;
} else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
return DataTypes.SHORT_INT;
} else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
return DataTypes.INT;
} else {
return DataTypes.LONG;
}
}
private static DataType fitMinMax(DataType dataType, Object max, Object min) {
if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
return fitLongMinMax((byte) max, (byte) min);
} else if (dataType == DataTypes.SHORT) {
return fitLongMinMax((short) max, (short) min);
} else if (dataType == DataTypes.INT) {
return fitLongMinMax((int) max, (int) min);
} else if ((dataType == DataTypes.LONG) || (dataType == DataTypes.TIMESTAMP)) {
return fitLongMinMax((long) max, (long) min);
} else if (dataType == DataTypes.DOUBLE) {
return fitLongMinMax((long) (double) max, (long) (double) min);
} else {
throw new RuntimeException("internal error: " + dataType);
}
}
private static DataType fitMinMaxForDecimalType(DataType dataType, Object max, Object min,
DecimalConverterFactory.DecimalConverterType decimalConverterType) {
long maxValue = ((BigDecimal) max).unscaledValue().longValue();
long minValue = ((BigDecimal) min).unscaledValue().longValue();
switch (decimalConverterType) {
case DECIMAL_INT:
return fitLongMinMax((int) maxValue, (int) minValue);
case DECIMAL_LONG:
return fitLongMinMax(maxValue, minValue);
default:
throw new RuntimeException("internal error: " + dataType);
}
}
private static DataType fitDeltaForDecimalType(DataType dataType, Object max, Object min,
DecimalConverterFactory.DecimalConverterType decimalConverterType) {
long maxValue = ((BigDecimal) max).unscaledValue().longValue();
long minValue = ((BigDecimal) min).unscaledValue().longValue();
switch (decimalConverterType) {
case DECIMAL_INT:
long value = maxValue - minValue;
return compareMinMaxAndSelectDataType(value);
case DECIMAL_LONG:
return DataTypes.LONG;
default:
throw new RuntimeException("internal error: " + dataType);
}
}
// fit the long input value into minimum data type
private static DataType fitDelta(DataType dataType, Object max, Object min) {
// use long data type to calculate delta to avoid overflow
long value;
if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
value = (long) (byte) max - (long) (byte) min;
} else if (dataType == DataTypes.SHORT) {
value = (long) (short) max - (long) (short) min;
} else if (dataType == DataTypes.INT) {
value = (long) (int) max - (long) (int) min;
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
value = (long) max - (long) min;
// The subtraction overflowed iff the operands have opposing signs
// and the result's sign differs from the minuend.
boolean overflow = (((long) max ^ (long) min) & ((long) max ^ value)) < 0;
if (overflow) {
return DataTypes.LONG;
}
} else if (dataType == DataTypes.DOUBLE) {
return DataTypes.LONG;
} else {
throw new RuntimeException("internal error: " + dataType);
}
return compareMinMaxAndSelectDataType(value);
}
private static DataType compareMinMaxAndSelectDataType(long value) {
return fitLongMinMax(value, value);
}
/**
* choose between adaptive encoder or delta adaptive encoder, based on whose target data type
* size is smaller
*/
static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats,
boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
DataType srcDataType = stats.getDataType();
DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
DataType deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
// for complex primitive, if source and destination data type is same, use adaptive encoding.
if (!isComplexPrimitive) {
// in case of decimal datatype, check if the decimal converter type is Int or Long and based
// on that get size in bytes
if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
.getSizeInBytes()) {
// no effect to use adaptive or delta, use compression only
return new DirectCompressCodec(stats.getDataType());
}
}
boolean isInvertedIndex = isInvertedIndex(isComplexPrimitive, columnSpec);
if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
// choose adaptive encoding
return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
isInvertedIndex);
} else {
// choose delta adaptive encoding
return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
isInvertedIndex);
}
}
/**
* Check whether the column is sort column and inverted index column
*
* @param isComplexPrimitive
* @param columnSpec
* @return
*/
private static boolean isInvertedIndex(boolean isComplexPrimitive,
TableSpec.ColumnSpec columnSpec) {
boolean isSort;
boolean isInvertedIndex = false;
if (columnSpec instanceof TableSpec.DimensionSpec && !isComplexPrimitive) {
isSort = ((TableSpec.DimensionSpec) columnSpec).isInSortColumns();
isInvertedIndex = isSort && ((TableSpec.DimensionSpec) columnSpec).isDoInvertedIndex();
}
return isInvertedIndex;
}
// choose between upscale adaptive encoder or upscale delta adaptive encoder,
// based on whose target data type size is smaller
static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats,
boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
DataType srcDataType = stats.getDataType();
double maxValue;
double minValue;
if (srcDataType == DataTypes.FLOAT) {
maxValue = (float) stats.getMax();
minValue = (float) stats.getMin();
} else {
maxValue = (double) stats.getMax();
minValue = (double) stats.getMin();
}
int decimalCount = stats.getDecimalCount();
// For Complex Type primitive we should always choose adaptive path
// as LV format will be reduced to only V format. Therefore inorder
// to do that decimal count should be actual count instead of -1.
if (isComplexPrimitive && decimalCount == -1 && stats instanceof PrimitivePageStatsCollector) {
decimalCount = ((PrimitivePageStatsCollector)stats).getDecimalForComplexPrimitive();
}
//Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
//but we can't use -1 to getDatatype, we should use -10000000.
double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
if (srcDataType == DataTypes.FLOAT && decimalCount == 0) {
return getColumnPageCodec(stats, isComplexPrimitive, columnSpec, srcDataType, maxValue,
minValue, decimalCount, absMaxValue);
} else if (decimalCount == 0) {
// short, int, long
return selectCodecByAlgorithmForIntegral(stats, false, columnSpec);
} else if (decimalCount < 0 && !isComplexPrimitive) {
return new DirectCompressCodec(DataTypes.DOUBLE);
} else {
return getColumnPageCodec(stats, isComplexPrimitive, columnSpec, srcDataType, maxValue,
minValue, decimalCount, absMaxValue);
}
}
private static ColumnPageCodec getColumnPageCodec(SimpleStatsResult stats,
boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec, DataType srcDataType,
double maxValue, double minValue, int decimalCount, double absMaxValue) {
// double
// If absMaxValue exceeds LONG.MAX_VALUE, then go for direct compression
if ((Math.pow(10, decimalCount) * absMaxValue) > Long.MAX_VALUE) {
return new DirectCompressCodec(DataTypes.DOUBLE);
} else {
long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
DataType adaptiveDataType = fitLongMinMax(max, 0);
DataType deltaDataType = compareMinMaxAndSelectDataType(
(long) (Math.pow(10, decimalCount) * (maxValue - minValue)));
if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) {
return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats,
isInvertedIndex(isComplexPrimitive, columnSpec));
} else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes() || (
(isComplexPrimitive) && (adaptiveDataType.getSizeInBytes() == DataTypes.DOUBLE
.getSizeInBytes()))) {
return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats,
isInvertedIndex(isComplexPrimitive, columnSpec));
} else {
return new DirectCompressCodec(DataTypes.DOUBLE);
}
}
}
/**
* choose between adaptive encoder or delta adaptive encoder, based on whose target data type
* size is smaller for decimal data type
*/
static ColumnPageCodec selectCodecByAlgorithmForDecimal(SimpleStatsResult stats,
DecimalConverterFactory.DecimalConverterType decimalConverterType,
TableSpec.ColumnSpec columnSpec) {
DataType srcDataType = stats.getDataType();
DataType adaptiveDataType =
fitMinMaxForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
decimalConverterType);
DataType deltaDataType;
if (adaptiveDataType == DataTypes.LONG) {
deltaDataType = DataTypes.LONG;
} else {
deltaDataType = fitDeltaForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
decimalConverterType);
}
// in case of decimal data type check if the decimal converter type is Int or Long and based on
// that get size in bytes
if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
.getSizeInBytes()) {
// no effect to use adaptive or delta, use compression only
return new DirectCompressCodec(stats.getDataType());
}
if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
// choose adaptive encoding
return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
} else {
// choose delta adaptive encoding
return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
}
}
}