blob: 7ad92e50e124b9b0c6798126f1c6859f8fac9921 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
import org.apache.carbondata.format.Encoding;
import org.apache.commons.lang.ArrayUtils;
/**
* Measure column V3 Reader class which will be used to read and uncompress
* V3 format data
* data format
* Data Format
* <FileHeader>
* <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
* <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
* <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
* <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
* <File Footer>
*/
public class MeasureChunkReaderV3 extends AbstractMeasureChunkReader {
/**
* end position of last measure in carbon data file
*/
private long measureOffsets;
public MeasureChunkReaderV3(BlockletInfo blockletInfo, String filePath) {
super(blockletInfo, filePath);
measureOffsets = blockletInfo.getMeasureOffsets();
}
/**
* Below method will be used to read the measure column data form carbon data file
* 1. Get the length of the data to be read
* 2. Allocate the direct buffer
* 3. read the data from file
* 4. Get the data chunk object from data read
* 5. Create the raw chunk object and fill the details
*
* @param fileReader reader for reading the column from carbon data file
* @param columnIndex column to be read
* @return measure raw chunk
*/
@Override
public MeasureRawColumnChunk readRawMeasureChunk(FileReader fileReader,
int columnIndex) throws IOException {
int dataLength = 0;
// to calculate the length of the data to be read
// column other than last column we can subtract the offset of current column with
// next column and get the total length.
// but for last column we need to use lastDimensionOffset which is the end position
// of the last dimension, we can subtract current dimension offset from lastDimensionOffset
if (measureColumnChunkOffsets.size() - 1 == columnIndex) {
dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(columnIndex));
} else {
dataLength =
(int) (measureColumnChunkOffsets.get(columnIndex + 1) - measureColumnChunkOffsets
.get(columnIndex));
}
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
buffer = fileReader
.readByteBuffer(filePath, measureColumnChunkOffsets.get(columnIndex), dataLength);
}
// get the data chunk which will have all the details about the data pages
DataChunk3 dataChunk =
CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(columnIndex));
return getMeasureRawColumnChunk(fileReader, columnIndex, 0, dataLength, buffer,
dataChunk);
}
MeasureRawColumnChunk getMeasureRawColumnChunk(FileReader fileReader, int columnIndex,
long offset, int dataLength, ByteBuffer buffer, DataChunk3 dataChunk) {
// creating a raw chunks instance and filling all the details
MeasureRawColumnChunk rawColumnChunk =
new MeasureRawColumnChunk(columnIndex, buffer, offset, dataLength, this);
int numberOfPages = dataChunk.getPage_length().size();
byte[][] maxValueOfEachPage = new byte[numberOfPages][];
byte[][] minValueOfEachPage = new byte[numberOfPages][];
int[] eachPageLength = new int[numberOfPages];
for (int i = 0; i < minValueOfEachPage.length; i++) {
maxValueOfEachPage[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
minValueOfEachPage[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
}
rawColumnChunk.setDataChunkV3(dataChunk);
rawColumnChunk.setFileReader(fileReader);
rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
rawColumnChunk.setMaxValues(maxValueOfEachPage);
rawColumnChunk.setMinValues(minValueOfEachPage);
rawColumnChunk.setRowCount(eachPageLength);
rawColumnChunk.setOffsets(ArrayUtils
.toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
return rawColumnChunk;
}
/**
* Below method will be used to read the multiple measure column data in group
* and divide into measure raw chunk object
* Steps for reading
* 1. Get the length of the data to be read
* 2. Allocate the direct buffer
* 3. read the data from file
* 4. Get the data chunk object from file for each column
* 5. Create the raw chunk object and fill the details for each column
* 6. increment the offset of the data
*
* @param fileReader
* reader which will be used to read the measure columns data from file
* @param startColumnIndex
* column index of the first measure column
* @param endColumnIndex
* column index of the last measure column
* @return MeasureRawColumnChunk array
*/
protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileReader,
int startColumnIndex, int endColumnIndex) throws IOException {
// to calculate the length of the data to be read
// column we can subtract the offset of start column offset with
// end column+1 offset and get the total length.
long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnIndex);
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
buffer = fileReader.readByteBuffer(filePath, currentMeasureOffset,
(int) (measureColumnChunkOffsets.get(endColumnIndex + 1) - currentMeasureOffset));
}
// create raw chunk for each measure column
MeasureRawColumnChunk[] measureDataChunk =
new MeasureRawColumnChunk[endColumnIndex - startColumnIndex + 1];
int runningLength = 0;
int index = 0;
for (int i = startColumnIndex; i <= endColumnIndex; i++) {
int currentLength =
(int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
DataChunk3 dataChunk =
CarbonUtil.readDataChunk3(buffer, runningLength, measureColumnChunkLength.get(i));
MeasureRawColumnChunk measureRawColumnChunk =
getMeasureRawColumnChunk(fileReader, i, runningLength, currentLength, buffer, dataChunk);
measureDataChunk[index] = measureRawColumnChunk;
runningLength += currentLength;
index++;
}
return measureDataChunk;
}
/**
* Below method will be used to convert the compressed measure chunk raw data to actual data
*
* @param rawColumnChunk measure raw chunk
* @param pageNumber number
* @return DimensionColumnPage
*/
@Override
public ColumnPage decodeColumnPage(MeasureRawColumnChunk rawColumnChunk, int pageNumber,
ReusableDataBuffer reusableDataBuffer)
throws IOException {
return decodeColumnPage(rawColumnChunk, pageNumber, null, reusableDataBuffer);
}
@Override
public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo, ReusableDataBuffer reusableDataBuffer)
throws IOException {
decodeColumnPage(measureRawColumnChunk, pageNumber, vectorInfo, reusableDataBuffer);
}
private ColumnPage decodeColumnPage(MeasureRawColumnChunk rawColumnChunk, int pageNumber,
ColumnVectorInfo vectorInfo, ReusableDataBuffer reusableDataBuffer)
throws IOException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be data chunk offset +
// data chunk length + page offset
int offset = (int) rawColumnChunk.getOffSet() +
measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
dataChunk3.getPage_offset().get(pageNumber);
BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
ColumnPage decodedPage =
decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset, vectorInfo, nullBitSet,
reusableDataBuffer);
if (decodedPage == null) {
return null;
}
decodedPage.setNullBits(nullBitSet);
return decodedPage;
}
/**
* Decode measure column page with page header and raw data starting from offset
*/
protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
ColumnVectorInfo vectorInfo, BitSet nullBitSet, ReusableDataBuffer reusableDataBuffer)
throws IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
org.apache.carbondata.core.metadata.encoder.Encoding.validateEncodingTypes(encodings);
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName =
CarbonMetadataUtil.getCompressorNameFromChunkMeta(pageMetadata.getChunk_meta());
ColumnPageDecoder codec =
encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
if (vectorInfo != null) {
codec.decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
nullBitSet, false, pageMetadata.numberOfRowsInpage, reusableDataBuffer);
return null;
} else {
return codec
.decode(pageData.array(), offset, pageMetadata.data_page_length);
}
}
}