blob: f559655e2f257250c9b11264e25e1267588aebbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.reader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.carbondata.core.cache.dictionary.ColumnDictionaryChunkIterator;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.format.ColumnDictionaryChunk;
import org.apache.thrift.TBase;
/**
* This class performs the functionality of reading a carbon dictionary file.
* It implements various overloaded method for read functionality.
*/
public class CarbonDictionaryReaderImpl implements CarbonDictionaryReader {
/**
* column name
*/
protected DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier;
/**
* dictionary file path
*/
protected String columnDictionaryFilePath;
/**
* dictionary thrift file reader
*/
private ThriftReader dictionaryFileReader;
/**
* Constructor
*
* @param dictionaryColumnUniqueIdentifier column unique identifier
*/
public CarbonDictionaryReaderImpl(
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) {
this.dictionaryColumnUniqueIdentifier = dictionaryColumnUniqueIdentifier;
initFileLocation();
}
/**
* This method should be used when complete dictionary data needs to be read.
* Applicable scenarios :
* 1. Global dictionary generation in case of incremental load
* 2. Reading dictionary file on first time query
* 3. Loading a dictionary column in memory based on query requirement.
* This is a case where carbon column cache feature is enabled in which a
* column dictionary is read if it is present in the query.
*
* @return list of byte array. Each byte array is unique dictionary value
* @throws IOException if an I/O error occurs
*/
@Override
public List<byte[]> read() throws IOException {
return read(0L);
}
/**
* This method should be used when data has to be read from a given offset.
* Applicable scenarios :
* 1. Incremental data load. If column dictionary is already loaded in memory
* and incremental load is done, then for the new query only new dictionary data
* has to be read form memory.
*
* @param startOffset start offset of dictionary file
* @return list of byte array. Each byte array is unique dictionary value
* @throws IOException if an I/O error occurs
*/
@Override
public List<byte[]> read(long startOffset) throws IOException {
List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks =
readDictionaryMetadataFile();
// get the last entry for carbon dictionary meta chunk
CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk =
carbonDictionaryColumnMetaChunks.get(carbonDictionaryColumnMetaChunks.size() - 1);
// end offset till where the dictionary file has to be read
long endOffset = carbonDictionaryColumnMetaChunk.getEnd_offset();
List<ColumnDictionaryChunk> columnDictionaryChunks =
read(carbonDictionaryColumnMetaChunks, startOffset, endOffset);
return getDictionaryList(columnDictionaryChunks);
}
/**
* This method will be used to read data between given start and end offset.
* Applicable scenarios:
* 1. Truncate operation. If there is any inconsistency while writing the dictionary file
* then we can give the start and end offset till where the data has to be retained.
*
* @param startOffset start offset of dictionary file
* @param endOffset end offset of dictionary file
* @return iterator over byte array. Each byte array is unique dictionary value
* @throws IOException if an I/O error occurs
*/
@Override
public Iterator<byte[]> read(long startOffset, long endOffset) throws IOException {
List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks =
readDictionaryMetadataFile();
List<ColumnDictionaryChunk> columnDictionaryChunks =
read(carbonDictionaryColumnMetaChunks, startOffset, endOffset);
return (Iterator<byte[]>) new ColumnDictionaryChunkIterator(columnDictionaryChunks);
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
if (null != dictionaryFileReader) {
dictionaryFileReader.close();
dictionaryFileReader = null;
}
}
/**
* @param carbonDictionaryColumnMetaChunks dictionary meta chunk list
* @param startOffset start offset for dictionary data file
* @param endOffset end offset till where data has
* to be read from dictionary data file
* @return list of byte column dictionary values
* @throws IOException readDictionary file method throws IO exception
*/
private List<ColumnDictionaryChunk> read(
List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks, long startOffset,
long endOffset) throws IOException {
// calculate the number of chunks to be read from dictionary file from start offset
int dictionaryChunkCountsToBeRead =
calculateTotalDictionaryChunkCountsToBeRead(carbonDictionaryColumnMetaChunks, startOffset,
endOffset);
// open dictionary file thrift reader
openThriftReader();
// read the required number of chunks from dictionary file
return readDictionaryFile(startOffset, dictionaryChunkCountsToBeRead);
}
/**
* This method will put all the dictionary chunks into one list and return that list
*
* @param columnDictionaryChunks
* @return
*/
private List<byte[]> getDictionaryList(List<ColumnDictionaryChunk> columnDictionaryChunks) {
int dictionaryListSize = 0;
for (ColumnDictionaryChunk dictionaryChunk : columnDictionaryChunks) {
dictionaryListSize = dictionaryListSize + dictionaryChunk.getValues().size();
}
// convert byte buffer list to byte array list of dictionary values
List<byte[]> dictionaryValues = new ArrayList<byte[]>(dictionaryListSize);
for (ColumnDictionaryChunk dictionaryChunk : columnDictionaryChunks) {
convertAndFillByteBufferListToByteArrayList(dictionaryValues, dictionaryChunk.getValues());
}
return dictionaryValues;
}
/**
* This method will convert and fill list of byte buffer to list of byte array
*
* @param dictionaryValues list of byte array. Each byte array is
* unique dictionary value
* @param dictionaryValueBufferList dictionary thrift object which is a list of byte buffer.
* Each dictionary value is a wrapped in byte buffer before
* writing to file
*/
private void convertAndFillByteBufferListToByteArrayList(List<byte[]> dictionaryValues,
List<ByteBuffer> dictionaryValueBufferList) {
for (ByteBuffer buffer : dictionaryValueBufferList) {
int length = buffer.limit();
byte[] value = new byte[length];
buffer.get(value, 0, value.length);
dictionaryValues.add(value);
}
}
/**
* This method will form the path for dictionary file for a given column
*/
protected void initFileLocation() {
this.columnDictionaryFilePath = dictionaryColumnUniqueIdentifier.getDictionaryFilePath();
}
/**
* This method will read the dictionary file and return the list of dictionary thrift object
*
* @param dictionaryStartOffset start offset for dictionary file
* @param dictionaryChunkCountToBeRead number of dictionary chunks to be read
* @return list of dictionary chunks
* @throws IOException setReadOffset method throws I/O exception
*/
private List<ColumnDictionaryChunk> readDictionaryFile(long dictionaryStartOffset,
int dictionaryChunkCountToBeRead) throws IOException {
List<ColumnDictionaryChunk> dictionaryChunks =
new ArrayList<ColumnDictionaryChunk>(dictionaryChunkCountToBeRead);
// skip the number of bytes if a start offset is given
dictionaryFileReader.setReadOffset(dictionaryStartOffset);
// read till dictionary chunk count
while (dictionaryFileReader.hasNext()
&& dictionaryChunks.size() != dictionaryChunkCountToBeRead) {
dictionaryChunks.add((ColumnDictionaryChunk) dictionaryFileReader.read());
}
return dictionaryChunks;
}
/**
* This method will read the dictionary metadata file for a given column
* and calculate the number of chunks to be read from the dictionary file.
* It will do a strict validation for start and end offset as if the offsets are not
* exactly matching, because data is written in thrift format, the thrift object
* will not be retrieved properly
*
* @param dictionaryChunkMetaList list of dictionary chunk metadata
* @param dictionaryChunkStartOffset start offset for a dictionary chunk
* @param dictionaryChunkEndOffset end offset for a dictionary chunk
* @return
*/
private int calculateTotalDictionaryChunkCountsToBeRead(
List<CarbonDictionaryColumnMetaChunk> dictionaryChunkMetaList,
long dictionaryChunkStartOffset, long dictionaryChunkEndOffset) {
boolean chunkWithStartOffsetFound = false;
int dictionaryChunkCount = 0;
for (CarbonDictionaryColumnMetaChunk metaChunk : dictionaryChunkMetaList) {
// find the column meta chunk whose start offset value matches
// with the given dictionary start offset
if (!chunkWithStartOffsetFound && dictionaryChunkStartOffset == metaChunk.getStart_offset()) {
chunkWithStartOffsetFound = true;
}
// start offset is found then keep adding the chunk count to be read
if (chunkWithStartOffsetFound) {
dictionaryChunkCount = dictionaryChunkCount + metaChunk.getChunk_count();
}
// when end offset is reached then break the loop
if (dictionaryChunkEndOffset == metaChunk.getEnd_offset()) {
break;
}
}
return dictionaryChunkCount;
}
/**
* This method will read dictionary metadata file and return the dictionary meta chunks
*
* @return list of dictionary metadata chunks
* @throws IOException read and close method throws IO exception
*/
private List<CarbonDictionaryColumnMetaChunk> readDictionaryMetadataFile() throws IOException {
CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunkList = null;
// read metadata file
try {
dictionaryMetaChunkList = columnMetadataReaderImpl.read();
} finally {
// close the metadata reader
columnMetadataReaderImpl.close();
}
return dictionaryMetaChunkList;
}
/**
* @return
*/
protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
return new CarbonDictionaryMetadataReaderImpl(this.dictionaryColumnUniqueIdentifier);
}
/**
* This method will open the dictionary file stream for reading
*
* @throws IOException thrift reader open method throws IOException
*/
private void openThriftReader() throws IOException {
if (null == dictionaryFileReader) {
// initialise dictionary file reader which will return dictionary thrift object
// dictionary thrift object contains a list of byte buffer
dictionaryFileReader =
new ThriftReader(this.columnDictionaryFilePath, new ThriftReader.TBaseCreator() {
@Override
public TBase create() {
return new ColumnDictionaryChunk();
}
});
// Open dictionary file reader
dictionaryFileReader.open();
}
}
}