blob: 7eef6b7140ef331711c63997dc2c286a5518222d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.cache.dictionary;
import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
import org.apache.carbondata.core.service.CarbonCommonFactory;
import org.apache.carbondata.core.service.DictionaryService;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.ObjectSizeCalculator;
/**
* Abstract class which implements methods common to reverse and forward dictionary cache
*/
public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueIdentifier,
V extends Dictionary>
implements Cache<DictionaryColumnUniqueIdentifier, Dictionary> {
/**
* thread pool size to be used for dictionary data reading
*/
protected int threadPoolSize;
/**
* LRU cache variable
*/
protected CarbonLRUCache carbonLRUCache;
/**
* @param carbonLRUCache
*/
public AbstractDictionaryCache(CarbonLRUCache carbonLRUCache) {
this.carbonLRUCache = carbonLRUCache;
initThreadPoolSize();
}
@Override
public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
throw new UnsupportedOperationException("Operation not supported");
}
/**
* This method will initialize the thread pool size to be used for creating the
* max number of threads for a job
*/
private void initThreadPoolSize() {
threadPoolSize = CarbonProperties.getInstance().getNumberOfLoadingCores();
}
/**
* This method will read dictionary metadata file and return the dictionary meta chunks
*
* @param dictionaryColumnUniqueIdentifier
* @return list of dictionary metadata chunks
* @throws IOException read and close method throws IO exception
*/
protected CarbonDictionaryColumnMetaChunk readLastChunkFromDictionaryMetadataFile(
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) throws IOException {
DictionaryService dictService = CarbonCommonFactory.getDictionaryService();
CarbonDictionaryMetadataReader columnMetadataReaderImpl = dictService
.getDictionaryMetadataReader(dictionaryColumnUniqueIdentifier);
CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk = null;
// read metadata file
try {
carbonDictionaryColumnMetaChunk =
columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
} finally {
// close the metadata reader
columnMetadataReaderImpl.close();
}
return carbonDictionaryColumnMetaChunk;
}
/**
* get the dictionary column meta chunk for object already read and stored in LRU cache
* @param dictionaryColumnUniqueIdentifier
* @param offsetRead
* @return
* @throws IOException
*/
protected long getNumRecordsInCarbonDictionaryColumnMetaChunk(
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier, long offsetRead)
throws IOException {
DictionaryService dictService = CarbonCommonFactory.getDictionaryService();
CarbonDictionaryMetadataReader columnMetadataReaderImpl = dictService
.getDictionaryMetadataReader(dictionaryColumnUniqueIdentifier);
CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk = null;
// read metadata file
try {
carbonDictionaryColumnMetaChunk =
columnMetadataReaderImpl.readEntryOfDictionaryMetaChunk(offsetRead);
} finally {
// close the metadata reader
columnMetadataReaderImpl.close();
}
return carbonDictionaryColumnMetaChunk.getMax_surrogate_key();
}
/**
* This method will validate dictionary metadata file for any modification
*
* @param carbonFile
* @param fileTimeStamp
* @param endOffset
* @return
*/
private boolean isDictionaryMetaFileModified(CarbonFile carbonFile, long fileTimeStamp,
long endOffset) {
return carbonFile.isFileModified(fileTimeStamp, endOffset);
}
/**
* This method will return the carbon file objetc based on its type (local, HDFS)
*
* @param dictionaryColumnUniqueIdentifier
* @return
*/
private CarbonFile getDictionaryMetaCarbonFile(
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) throws IOException {
String dictionaryFilePath = dictionaryColumnUniqueIdentifier.getDictionaryFilePath();
CarbonFile dictFile = FileFactory.getCarbonFile(dictionaryFilePath);
// When rename table triggered parallely with select query, dictionary files may not exist
if (!dictFile.exists()) {
throw new IOException("Dictionary file does not exist: " + dictionaryFilePath);
}
return dictFile;
}
protected long getSortIndexSize(long numOfRecords) {
// sort index has sort index and reverse sort index,each is 4 byte integer.
// 32 byte is the array header of both the integer arrays
return numOfRecords * ObjectSizeCalculator.estimate(0, 16) * 2 + 32;
}
/**
* This method will get the value for the given key. If value does not exist
* for the given key, it will check and load the value.
*
* @param dictionaryColumnUniqueIdentifier unique identifier which contains dbName,
* tableName and columnIdentifier
* @param dictionaryInfo
* @param lruCacheKey
* @throws IOException in case memory is not sufficient to load dictionary
* into memory
*/
protected void checkAndLoadDictionaryData(
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier,
DictionaryInfo dictionaryInfo, String lruCacheKey, boolean loadSortIndex)
throws IOException {
// read last segm
// ent dictionary meta chunk entry to get the end offset of file
CarbonFile carbonFile = getDictionaryMetaCarbonFile(dictionaryColumnUniqueIdentifier);
boolean dictionaryMetaFileModified =
isDictionaryMetaFileModified(carbonFile, dictionaryInfo.getFileTimeStamp(),
dictionaryInfo.getDictionaryMetaFileLength());
// if dictionary metadata file is modified then only read the last entry from dictionary
// meta file
if (dictionaryMetaFileModified) {
synchronized (dictionaryInfo) {
carbonFile = getDictionaryMetaCarbonFile(dictionaryColumnUniqueIdentifier);
dictionaryMetaFileModified =
isDictionaryMetaFileModified(carbonFile, dictionaryInfo.getFileTimeStamp(),
dictionaryInfo.getDictionaryMetaFileLength());
// Double Check :
// if dictionary metadata file is modified then only read the last entry from dictionary
// meta file
if (dictionaryMetaFileModified) {
CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk =
readLastChunkFromDictionaryMetadataFile(dictionaryColumnUniqueIdentifier);
long requiredSize = getEstimatedDictionarySize(dictionaryInfo,
carbonDictionaryColumnMetaChunk,
dictionaryColumnUniqueIdentifier, loadSortIndex);
if (requiredSize > 0) {
dictionaryInfo.setMemorySize(requiredSize);
boolean colCanBeAddedToLRUCache =
carbonLRUCache.tryPut(lruCacheKey, requiredSize);
// if column can be added to lru cache then only load the
// dictionary data
if (colCanBeAddedToLRUCache) {
// load dictionary data
loadDictionaryData(dictionaryInfo, dictionaryColumnUniqueIdentifier,
dictionaryInfo.getOffsetTillFileIsRead(),
carbonDictionaryColumnMetaChunk.getEnd_offset(),
loadSortIndex);
// set the end offset till where file is read
dictionaryInfo
.setOffsetTillFileIsRead(carbonDictionaryColumnMetaChunk.getEnd_offset());
long updateRequiredSize = ObjectSizeCalculator.estimate(dictionaryInfo, requiredSize);
dictionaryInfo.setMemorySize(updateRequiredSize);
if (!carbonLRUCache.put(lruCacheKey, dictionaryInfo, updateRequiredSize)) {
throw new DictionaryBuilderException(
"Cannot load dictionary into memory. Not enough memory available");
}
dictionaryInfo.setFileTimeStamp(carbonFile.getLastModifiedTime());
dictionaryInfo.setDictionaryMetaFileLength(carbonFile.getSize());
} else {
throw new DictionaryBuilderException(
"Cannot load dictionary into memory. Not enough memory available");
}
}
}
}
}
// increment the column access count
incrementDictionaryAccessCount(dictionaryInfo);
}
/**
* This method will prepare the lru cache key and return the same
*
* @param columnIdentifier
* @return
*/
protected String getLruCacheKey(String columnIdentifier, CacheType cacheType) {
return columnIdentifier + CarbonCommonConstants.UNDERSCORE + cacheType.getCacheName();
}
/**
* This method will check and load the dictionary file in memory for a given column
*
* @param dictionaryInfo holds dictionary information and data
* @param dictionaryColumnUniqueIdentifier unique identifier which contains dbName,
* tableName and columnIdentifier
* @param dictionaryChunkStartOffset start offset from where dictionary file has to
* be read
* @param dictionaryChunkEndOffset end offset till where dictionary file has to
* be read
* @param loadSortIndex
* @throws IOException
*/
private void loadDictionaryData(DictionaryInfo dictionaryInfo,
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier,
long dictionaryChunkStartOffset, long dictionaryChunkEndOffset, boolean loadSortIndex)
throws IOException {
DictionaryCacheLoader dictionaryCacheLoader =
new DictionaryCacheLoaderImpl(dictionaryColumnUniqueIdentifier);
dictionaryCacheLoader
.load(dictionaryInfo, dictionaryChunkStartOffset, dictionaryChunkEndOffset, loadSortIndex);
}
/**
* This method will increment the access count for a given dictionary column
*
* @param dictionaryInfo
*/
protected void incrementDictionaryAccessCount(DictionaryInfo dictionaryInfo) {
dictionaryInfo.incrementAccessCount();
}
/**
* This method will update the dictionary acceess count which is required for its removal
* from column LRU cache
*
* @param dictionaryList
*/
protected void clearDictionary(List<Dictionary> dictionaryList) {
for (Dictionary dictionary : dictionaryList) {
dictionary.clear();
}
}
/**
* calculate the probable size of Dictionary in java heap
* Use the value to check if can be added to lru cache
* This helps to avoid unnecessary loading of dictionary files
* if estimated size more than that can be fit into lru cache
* Estimated size can be less or greater than the actual size
* due to java optimizations
* @param dictionaryInfo
* @param carbonDictionaryColumnMetaChunk
* @param dictionaryColumnUniqueIdentifier
* @param readSortIndexSize
* @return
* @throws IOException
*/
protected long getEstimatedDictionarySize(DictionaryInfo dictionaryInfo,
CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk,
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier, boolean
readSortIndexSize) throws IOException {
return 0;
}
}