blob: 2538687baaf1922b9b3ab3e799d77481285ab2cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractDimensionChunkReader;
import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
import org.apache.carbondata.format.Encoding;
import org.apache.commons.lang.ArrayUtils;
/**
* Dimension column V3 Reader class which will be used to read and uncompress
* V3 format data
* data format
* Data Format
* <FileHeader>
* <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
* <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
* <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
* <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
* <File Footer>
*/
public class DimensionChunkReaderV3 extends AbstractDimensionChunkReader {
private EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
/**
* end position of last dimension in carbon data file
*/
private long lastDimensionOffsets;
public DimensionChunkReaderV3(BlockletInfo blockletInfo, String filePath) {
super(blockletInfo, filePath);
lastDimensionOffsets = blockletInfo.getDimensionOffset();
}
/**
* Below method will be used to read the dimension column data form carbon data file
* Steps for reading
* 1. Get the length of the data to be read
* 2. Allocate the direct buffer
* 3. read the data from file
* 4. Get the data chunk object from data read
* 5. Create the raw chunk object and fill the details
*
* @param fileReader reader for reading the column from carbon data file
* @param columnIndex blocklet index of the column in carbon data file
* @return dimension raw chunk
*/
public DimensionRawColumnChunk readRawDimensionChunk(FileReader fileReader,
int columnIndex) throws IOException {
// get the current dimension offset
long currentDimensionOffset = dimensionChunksOffset.get(columnIndex);
int length = 0;
// to calculate the length of the data to be read
// column other than last column we can subtract the offset of current column with
// next column and get the total length.
// but for last column we need to use lastDimensionOffset which is the end position
// of the last dimension, we can subtract current dimension offset from lastDimensionOffset
if (dimensionChunksOffset.size() - 1 == columnIndex) {
length = (int) (lastDimensionOffsets - currentDimensionOffset);
} else {
length = (int) (dimensionChunksOffset.get(columnIndex + 1) - currentDimensionOffset);
}
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset, length);
}
// get the data chunk which will have all the details about the data pages
DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length);
return getDimensionRawColumnChunk(fileReader, columnIndex, 0, length, buffer,
dataChunk);
}
protected DimensionRawColumnChunk getDimensionRawColumnChunk(FileReader fileReader,
int columnIndex, long offset, int length, ByteBuffer buffer, DataChunk3 dataChunk) {
// creating a raw chunks instance and filling all the details
DimensionRawColumnChunk rawColumnChunk =
new DimensionRawColumnChunk(columnIndex, buffer, offset, length, this);
int numberOfPages = dataChunk.getPage_length().size();
byte[][] maxValueOfEachPage = new byte[numberOfPages][];
byte[][] minValueOfEachPage = new byte[numberOfPages][];
boolean[] minMaxFlag = new boolean[minValueOfEachPage.length];
Arrays.fill(minMaxFlag, true);
int[] eachPageLength = new int[numberOfPages];
for (int i = 0; i < minValueOfEachPage.length; i++) {
maxValueOfEachPage[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
minValueOfEachPage[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
boolean isMinMaxFlagSet =
dataChunk.getData_chunk_list().get(i).getMin_max().isSetMin_max_presence();
if (isMinMaxFlagSet) {
minMaxFlag[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMin_max_presence().get(0);
}
}
rawColumnChunk.setDataChunkV3(dataChunk);
rawColumnChunk.setFileReader(fileReader);
rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
rawColumnChunk.setMaxValues(maxValueOfEachPage);
rawColumnChunk.setMinValues(minValueOfEachPage);
rawColumnChunk.setMinMaxFlagArray(minMaxFlag);
rawColumnChunk.setRowCount(eachPageLength);
rawColumnChunk.setOffsets(ArrayUtils
.toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
return rawColumnChunk;
}
/**
* Below method will be used to read the multiple dimension column data in group
* and divide into dimension raw chunk object
* Steps for reading
* 1. Get the length of the data to be read
* 2. Allocate the direct buffer
* 3. read the data from file
* 4. Get the data chunk object from file for each column
* 5. Create the raw chunk object and fill the details for each column
* 6. increment the offset of the data
*
* @param fileReader
* reader which will be used to read the dimension columns data from file
* @param startBlockletColumnIndex
* blocklet index of the first dimension column
* @param endBlockletColumnIndex
* blocklet index of the last dimension column
* @ DimensionRawColumnChunk array
*/
protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fileReader,
int startBlockletColumnIndex, int endBlockletColumnIndex) throws IOException {
// to calculate the length of the data to be read
// column we can subtract the offset of start column offset with
// end column+1 offset and get the total length.
long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex);
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset,
(int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
}
// create raw chunk for each dimension column
DimensionRawColumnChunk[] dimensionDataChunks =
new DimensionRawColumnChunk[endBlockletColumnIndex - startBlockletColumnIndex + 1];
int index = 0;
int runningLength = 0;
for (int i = startBlockletColumnIndex; i <= endBlockletColumnIndex; i++) {
int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
DataChunk3 dataChunk =
CarbonUtil.readDataChunk3(buffer, runningLength, dimensionChunksLength.get(i));
dimensionDataChunks[index] =
getDimensionRawColumnChunk(fileReader, i, runningLength, currentLength, buffer,
dataChunk);
runningLength += currentLength;
index++;
}
return dimensionDataChunks;
}
/**
* Below method will be used to convert the compressed dimension chunk raw data to actual data
*
* @param rawColumnPage dimension raw chunk
* @param pageNumber number
* @return DimensionColumnPage
*/
@Override
public DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk rawColumnPage,
int pageNumber, ReusableDataBuffer reusableDataBuffer) throws IOException {
return decodeColumnPage(rawColumnPage, pageNumber, null, reusableDataBuffer);
}
private DimensionColumnPage decodeColumnPage(
DimensionRawColumnChunk rawColumnPage, int pageNumber, ColumnVectorInfo vectorInfo,
ReusableDataBuffer reusableDataBuffer) throws IOException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// get the data buffer
ByteBuffer rawData = rawColumnPage.getRawData();
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be data chunk offset +
// data chunk length + page offset
int offset = (int) rawColumnPage.getOffSet() + dimensionChunksLength
.get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
// first read the data and uncompressed it
return decodeDimension(rawColumnPage, rawData, pageMetadata, offset, vectorInfo,
reusableDataBuffer);
}
@Override
public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo, ReusableDataBuffer reusableDataBuffer)
throws IOException {
DimensionColumnPage columnPage =
decodeColumnPage(dimensionRawColumnChunk, pageNumber, vectorInfo, reusableDataBuffer);
columnPage.freeMemory();
}
private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
boolean isLocalDictEncodedPage, ColumnVectorInfo vectorInfo, BitSet nullBitSet,
ReusableDataBuffer reusableDataBuffer) throws IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
compressorName, vectorInfo != null);
if (vectorInfo != null) {
decoder
.decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
nullBitSet, isLocalDictEncodedPage, pageMetadata.numberOfRowsInpage,
reusableDataBuffer);
if (vectorInfo.vector.getType().isComplexType() && !vectorInfo.vectorStack.isEmpty()) {
// For complex type, always top of the vector stack is processed in decodeAndFillVector.
// so, pop() the top vector as its processing is finished.
vectorInfo.vectorStack.pop();
}
return null;
} else {
return decoder
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}
}
protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo,
ReusableDataBuffer reusableDataBuffer) throws IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
org.apache.carbondata.core.metadata.encoder.Encoding.validateEncodingTypes(encodings);
if (CarbonUtil.isEncodedWithMeta(encodings)) {
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
// in case of no dictionary measure data types, if it is included in sort columns
// then inverted index to be uncompressed
boolean isExplicitSorted =
CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX);
int dataOffset = offset;
if (isExplicitSorted) {
offset += pageMetadata.data_page_length;
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
if (vectorInfo == null) {
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
} else {
vectorInfo.invertedIndex = invertedIndexes;
}
}
BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet, reusableDataBuffer);
if (decodedPage != null) {
decodedPage.setNullBits(nullBitSet);
}
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
} else {
// following code is for backward compatibility
return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset, vectorInfo,
reusableDataBuffer);
}
}
public boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (encodings != null && !encodings.isEmpty()) {
Encoding encoding = encodings.get(0);
switch (encoding) {
case ADAPTIVE_INTEGRAL:
case ADAPTIVE_DELTA_INTEGRAL:
case ADAPTIVE_FLOATING:
case ADAPTIVE_DELTA_FLOATING:
return true;
}
}
return false;
}
private int getColumnValueSize(List<Encoding> encodings) {
return encodings.contains(Encoding.DICTIONARY) ? 4 : -1;
}
private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo,
ReusableDataBuffer reusableDataBuffer) {
byte[] dataPage;
int[] rlePage;
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
int uncompressedSize = 0;
if (null != reusableDataBuffer && compressor.supportReusableBuffer()) {
uncompressedSize =
compressor.unCompressedLength(pageData.array(), offset, pageMetadata.data_page_length);
dataPage = reusableDataBuffer.getDataBuffer(uncompressedSize);
compressor.rawUncompress(pageData.array(), offset, pageMetadata.data_page_length, dataPage);
} else {
dataPage = compressor.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
uncompressedSize = dataPage.length;
}
offset += pageMetadata.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
offset += pageMetadata.rowid_page_length;
if (vectorInfo == null) {
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.RLE)) {
rlePage =
CarbonUtil.getIntArray(pageData, offset, pageMetadata.rle_page_length);
// uncompress the data with rle indexes
dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
null == rawColumnPage.getLocalDictionary() ?
getColumnValueSize(pageMetadata.encoders) :
CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE, uncompressedSize);
uncompressedSize = dataPage.length;
}
DimensionColumnPage columnDataChunk = null;
// if no dictionary column then first create a no dictionary column chunk
// and set to data chunk instance
if (!CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.DICTIONARY)) {
DimensionChunkStoreFactory.DimensionStoreType dimStoreType =
null != rawColumnPage.getLocalDictionary() ?
DimensionChunkStoreFactory.DimensionStoreType.LOCAL_DICT :
(CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.DIRECT_COMPRESS_VARCHAR) ?
DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_INT_LENGTH :
DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH);
columnDataChunk =
new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(), dimStoreType,
rawColumnPage.getLocalDictionary(), vectorInfo, uncompressedSize);
} else {
// to store fixed length column chunk values
columnDataChunk =
new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(),
getColumnValueSize(pageMetadata.encoders), vectorInfo, uncompressedSize);
}
return columnDataChunk;
}
}