blob: 27520c9affd67b55593deb711f8807e31c2299c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.page.encoding.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.VarLengthColumnPageBase;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertibleVector;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.Encoding;
/**
* This codec directly apply compression on the input data
*/
public class DirectCompressCodec implements ColumnPageCodec {
private DataType dataType;
public DirectCompressCodec(DataType dataType) {
this.dataType = dataType;
}
boolean isComplexPrimitiveIntLengthEncoding = false;
public void setComplexPrimitiveIntLengthEncoding(boolean complexPrimitiveIntLengthEncoding) {
isComplexPrimitiveIntLengthEncoding = complexPrimitiveIntLengthEncoding;
}
@Override
public String getName() {
return "DirectCompressCodec";
}
@Override
public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
return new ColumnPageEncoder() {
@Override
protected ByteBuffer encodeData(ColumnPage input) throws IOException {
Compressor compressor = CompressorFactory.getInstance().getCompressor(
input.getColumnCompressorName());
return input.compress(compressor);
}
@Override
protected List<Encoding> getEncodingList() {
List<Encoding> encodings = new ArrayList<>();
encodings.add(dataType == DataTypes.VARCHAR ?
Encoding.DIRECT_COMPRESS_VARCHAR :
Encoding.DIRECT_COMPRESS);
return encodings;
}
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
inputPage.getStatistics(), inputPage.getColumnCompressorName());
}
};
}
@Override
public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
return new ColumnPageDecoder() {
@Override
public ColumnPage decode(byte[] input, int offset, int length) {
ColumnPage decodedPage;
if (DataTypes.isDecimal(dataType)) {
decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
} else {
decodedPage = ColumnPage
.decompress(meta, input, offset, length, false, isComplexPrimitiveIntLengthEncoding);
}
return LazyColumnPage.newPage(decodedPage, converter);
}
@Override
public void decodeAndFillVector(byte[] input, int offset, int length,
ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize,
ReusableDataBuffer reusableDataBuffer) {
Compressor compressor =
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
int uncompressedLength;
byte[] unCompressData;
if (null != reusableDataBuffer && compressor.supportReusableBuffer()) {
uncompressedLength = compressor.unCompressedLength(input, offset, length);
unCompressData = reusableDataBuffer.getDataBuffer(uncompressedLength);
compressor.rawUncompress(input, offset, length, unCompressData);
} else {
unCompressData = compressor.unCompressByte(input, offset, length);
uncompressedLength = unCompressData.length;
}
if (DataTypes.isDecimal(dataType)) {
TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
DecimalConverterFactory.DecimalConverter decimalConverter =
DecimalConverterFactory.INSTANCE
.getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
vectorInfo.decimalConverter = decimalConverter;
if (DataTypes.isDecimal(meta.getStoreDataType())) {
ColumnPage decimalColumnPage = VarLengthColumnPageBase
.newDecimalColumnPage(meta, unCompressData, uncompressedLength);
decimalConverter.fillVector(decimalColumnPage.getByteArrayPage(), pageSize, vectorInfo,
nullBits, meta.getStoreDataType());
} else {
converter
.decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
pageSize);
}
} else {
converter
.decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
pageSize);
}
}
@Override
public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) {
return LazyColumnPage.newPage(ColumnPage
.decompress(meta, input, offset, length, isLVEncoded,
isComplexPrimitiveIntLengthEncoding), converter);
}
};
}
private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
@Override
public void encode(int rowId, byte value) {
throw new RuntimeException("internal error");
}
@Override
public void encode(int rowId, short value) {
throw new RuntimeException("internal error");
}
@Override
public void encode(int rowId, int value) {
throw new RuntimeException("internal error");
}
@Override
public void encode(int rowId, long value) {
throw new RuntimeException("internal error");
}
@Override
public void encode(int rowId, float value) {
throw new RuntimeException("internal error");
}
@Override
public void encode(int rowId, double value) {
throw new RuntimeException("internal error");
}
@Override
public long decodeLong(byte value) {
return value;
}
@Override
public long decodeLong(short value) {
return value;
}
@Override
public long decodeLong(int value) {
return value;
}
@Override
public double decodeDouble(byte value) {
return value;
}
@Override
public double decodeDouble(short value) {
return value;
}
@Override
public double decodeDouble(int value) {
return value;
}
@Override
public double decodeDouble(long value) {
return value;
}
@Override
public double decodeDouble(float value) {
return value;
}
@Override
public double decodeDouble(double value) {
return value;
}
@Override
public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
DataType pageDataType, int pageSize) {
CarbonColumnVector vector = vectorInfo.vector;
DataType vectorDataType = vector.getType();
BitSet deletedRows = vectorInfo.deletedRows;
if (vectorDataType.isComplexType() && vectorInfo.vectorStack.isEmpty()) {
// Only if vectorStack is empty, initialize with the parent vector
vectorInfo.vectorStack.push(vectorInfo.vector);
}
// If top of vector stack is a complex vector,
// then add their children into the stack and load them too.
if (!vectorInfo.vectorStack.isEmpty() && vectorInfo.vectorStack.peek().getType()
.isComplexType()) {
CarbonColumnVector parentVector = vectorInfo.vectorStack.peek();
CarbonColumnVectorImpl parentVectorImpl =
(CarbonColumnVectorImpl) (parentVector.getColumnVector());
// parse the parent page data,
// save the information about number of child in each row in parent vector
if (DataTypes.isStructType(parentVectorImpl.getType())) {
parentVectorImpl.setNumberOfChildElementsForStruct(pageData, pageSize);
} else {
parentVectorImpl.setNumberOfChildElementsForArray(pageData, pageSize);
}
for (CarbonColumnVector childVector : parentVector.getColumnVector().getChildrenVector()) {
// push each child
vectorInfo.vectorStack.push(childVector);
// load the child page, here child page loading flow updated vector from top of the stack
// and pop() the child vector once loading is finished.
((CarbonColumnVectorImpl) (vectorInfo.vectorStack.peek().getColumnVector())).loadPage();
}
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vectorInfo, parentVector, vectorInfo.invertedIndex,
nullBits, vectorInfo.deletedRows, true, false);
fillVectorBasedOnType(pageData, vector, vectorDataType, pageDataType, pageSize,
vectorInfo, nullBits);
} else {
pageSize = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, pageSize);
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vectorInfo, vector, vectorInfo.invertedIndex, nullBits,
deletedRows, true, false);
fillVectorBasedOnType(pageData, vector, vector.getType(), pageDataType, pageSize,
vectorInfo, nullBits);
if ((deletedRows == null || deletedRows.isEmpty())
&& !(vectorInfo.vector instanceof SequentialFill)) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
}
if (vector instanceof ConvertibleVector) {
((ConvertibleVector) vector).convert();
}
}
}
private void fillVectorBasedOnType(byte[] pageData, CarbonColumnVector vector,
DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo,
BitSet nullBits) {
if (vectorInfo.vector.getColumnVector() != null
&& vector.getColumnVector() == vectorInfo.vector.getColumnVector() && vectorInfo.vector
.getColumnVector().getType().isComplexType()) {
List<Integer> childElementsForEachRow =
((CarbonColumnVectorImpl) vector.getColumnVector())
.getNumberOfChildrenElementsInEachRow();
vector.getColumnVector().putComplexObject(childElementsForEachRow);
} else {
fillPrimitiveType(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo,
nullBits);
}
}
private void fillPrimitiveType(byte[] pageData, CarbonColumnVector vector,
DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo,
BitSet nullBits) {
int rowId = 0;
if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
if (vectorDataType == DataTypes.SHORT) {
for (int i = 0; i < pageSize; i++) {
vector.putShort(i, pageData[i]);
}
} else if (vectorDataType == DataTypes.INT) {
for (int i = 0; i < pageSize; i++) {
vector.putInt(i, pageData[i]);
}
} else if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, pageData[i]);
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, (long) pageData[i] * 1000);
}
} else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
vector.putBytes(0, pageSize, pageData, 0);
} else if (DataTypes.isDecimal(vectorDataType)) {
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
} else {
for (int i = 0; i < pageSize; i++) {
vector.putDouble(i, pageData[i]);
}
}
} else if (pageDataType == DataTypes.SHORT) {
int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
int size = pageSize * shortSizeInBytes;
if (vectorDataType == DataTypes.SHORT) {
for (int i = 0; i < size; i += shortSizeInBytes) {
vector.putShort(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
}
} else if (vectorDataType == DataTypes.INT) {
for (int i = 0; i < size; i += shortSizeInBytes) {
vector.putInt(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
}
} else if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < size; i += shortSizeInBytes) {
vector.putLong(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < size; i += shortSizeInBytes) {
vector.putLong(rowId++, (long) ByteUtil.toShortLittleEndian(pageData, i) * 1000);
}
} else if (DataTypes.isDecimal(vectorDataType)) {
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
} else {
for (int i = 0; i < size; i += shortSizeInBytes) {
vector.putDouble(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
}
}
} else if (pageDataType == DataTypes.SHORT_INT) {
if (vectorDataType == DataTypes.INT) {
for (int i = 0; i < pageSize; i++) {
int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
vector.putInt(i, shortInt);
}
} else if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < pageSize; i++) {
int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
vector.putLong(i, shortInt);
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < pageSize; i++) {
int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
vector.putLong(i, (long) shortInt * 1000);
}
} else if (DataTypes.isDecimal(vectorDataType)) {
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
} else {
for (int i = 0; i < pageSize; i++) {
int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
vector.putDouble(i, shortInt);
}
}
} else {
int intSizeInBytes = DataTypes.INT.getSizeInBytes();
if (pageDataType == DataTypes.INT) {
int size = pageSize * intSizeInBytes;
if (vectorDataType == DataTypes.INT) {
for (int i = 0; i < size; i += intSizeInBytes) {
vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
}
} else if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < size; i += intSizeInBytes) {
vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < size; i += intSizeInBytes) {
vector.putLong(rowId++, (long) ByteUtil.toIntLittleEndian(pageData, i) * 1000);
}
} else if (DataTypes.isDecimal(vectorDataType)) {
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
} else {
for (int i = 0; i < size; i += intSizeInBytes) {
vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
}
}
} else if (pageDataType == DataTypes.LONG) {
int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
int size = pageSize * longSizeInBytes;
if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < size; i += longSizeInBytes) {
vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < size; i += longSizeInBytes) {
vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 1000);
}
} else if (DataTypes.isDecimal(vectorDataType)) {
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
}
} else if (pageDataType == DataTypes.BYTE_ARRAY) {
// for complex primitive types
if (vectorDataType == DataTypes.STRING || vectorDataType == DataTypes.BINARY
|| vectorDataType == DataTypes.VARCHAR) {
// for complex primitive string, binary, varchar type
int offset = 0;
for (int i = 0; i < pageSize; i++) {
int len = ByteBuffer.wrap(pageData, offset, intSizeInBytes).getInt();
offset += intSizeInBytes;
if (vectorDataType == DataTypes.BINARY && len == 0) {
vector.putNull(i);
continue;
}
byte[] row = new byte[len];
System.arraycopy(pageData, offset, row, 0, len);
if (Arrays.equals(row, CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY)) {
vector.putNull(i);
} else {
vector.putObject(i, row);
}
offset += len;
}
} else if (vectorDataType == DataTypes.DATE) {
// for complex primitive date type
int offset = 0;
for (int i = 0; i < pageSize; i++) {
int len = ByteBuffer.wrap(pageData, offset, intSizeInBytes).getInt();
offset += intSizeInBytes;
int surrogateInternal =
ByteUtil.toXorInt(pageData, offset, intSizeInBytes);
if (len == 0) {
vector.putObject(0, null);
} else {
vector.putObject(0, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate);
}
offset += len;
}
} else if (DataTypes.isDecimal(vectorDataType)) {
// for complex primitive decimal type
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
if (decimalConverter == null) {
decimalConverter = DecimalConverterFactory.INSTANCE
.getDecimalConverter(((DecimalType) vectorDataType).getPrecision(),
((DecimalType) vectorDataType).getScale());
}
decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
}
} else if (vectorDataType == DataTypes.FLOAT) {
int floatSizeInBytes = DataTypes.FLOAT.getSizeInBytes();
int size = pageSize * floatSizeInBytes;
for (int i = 0; i < size; i += floatSizeInBytes) {
vector.putFloat(rowId++, ByteUtil.toFloatLittleEndian(pageData, i));
}
} else {
int doubleSizeInBytes = DataTypes.DOUBLE.getSizeInBytes();
int size = pageSize * doubleSizeInBytes;
for (int i = 0; i < size; i += doubleSizeInBytes) {
vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, i));
}
}
}
}
};
}