blob: 7e17673532c0f4abf46be19269eac8be0bc204dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.indexstore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.index.dev.Index;
import org.apache.carbondata.core.indexstore.blockletindex.BlockIndex;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletIndexUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
* Class to handle loading, unloading,clearing,storing of the table
* blocks
*/
public class BlockletIndexStore
implements Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> {
private static final Logger LOGGER =
LogServiceFactory.getLogService(BlockletIndexStore.class.getName());
/**
* CarbonLRU cache
*/
protected CarbonLRUCache lruCache;
/**
* map of block info to lock object map, while loading the btree this will be filled
* and removed after loading the tree for that particular block info, this will be useful
* while loading the tree concurrently so only block level lock will be applied another
* block can be loaded concurrently
*/
private Map<String, Object> segmentLockMap;
/**
* constructor to initialize the SegmentTaskIndexStore
*
* @param lruCache
*/
public BlockletIndexStore(CarbonLRUCache lruCache) {
this.lruCache = lruCache;
segmentLockMap = new ConcurrentHashMap<String, Object>();
}
@Override
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) {
return get(identifierWrapper, null);
}
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
Map<String, Map<String, BlockMetaInfo>> segInfoCache) {
TableBlockIndexUniqueIdentifier identifier =
identifierWrapper.getTableBlockIndexUniqueIdentifier();
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
BlockletIndexWrapper blockletIndexWrapper =
(BlockletIndexWrapper) lruCache.get(lruCacheKey);
List<BlockIndex> indexes = new ArrayList<>();
if (blockletIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
Set<String> filesRead = new HashSet<>();
String segmentFilePath = identifier.getIndexFilePath();
if (segInfoCache == null) {
segInfoCache = new HashMap<>();
}
Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
segInfoCache.get(segmentFilePath);
if (carbonDataFileBlockMetaInfoMapping == null) {
carbonDataFileBlockMetaInfoMapping =
BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
identifierWrapper.getConfiguration());
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
}
// if the identifier is not a merge file we can directly load the indexes
if (identifier.getMergeIndexFileName() == null) {
List<DataFileFooter> indexInfos = new ArrayList<>();
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping, indexInfos);
BlockIndex blockIndex =
loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
identifierWrapper.isAddToUnsafe(),
identifierWrapper.getConfiguration(),
identifierWrapper.isSerializeDmStore(),
indexInfos);
indexes.add(blockIndex);
blockletIndexWrapper =
new BlockletIndexWrapper(identifier.getSegmentId(), indexes);
} else {
// if the identifier is a merge file then collect the index files and load the indexes
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
BlockletIndexUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
tableBlockIndexUniqueIdentifiers) {
List<DataFileFooter> indexInfos = new ArrayList<>();
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap(
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping, indexInfos);
if (!blockMetaInfoMap.isEmpty()) {
BlockIndex blockIndex =
loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
identifierWrapper.isAddToUnsafe(),
identifierWrapper.getConfiguration(),
identifierWrapper.isSerializeDmStore(),
indexInfos);
indexes.add(blockIndex);
}
}
blockletIndexWrapper =
new BlockletIndexWrapper(identifier.getSegmentId(), indexes);
}
if (identifierWrapper.isAddTableBlockToUnsafeAndLRUCache()) {
long expiration_time = CarbonUtil.getExpiration_time(identifierWrapper.getCarbonTable());
lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletIndexWrapper,
blockletIndexWrapper.getMemorySize(), expiration_time);
}
} catch (Throwable e) {
// clear all the memory used by indexes loaded
for (Index index : indexes) {
index.clear();
}
LOGGER.error("memory exception when loading index: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}
return blockletIndexWrapper;
}
@Override
public List<BlockletIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
throws IOException {
Map<String, Map<String, BlockMetaInfo>> segInfoCache =
new HashMap<String, Map<String, BlockMetaInfo>>();
List<BlockletIndexWrapper> blockletIndexWrappers =
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
List<TableBlockIndexUniqueIdentifierWrapper> missedIdentifiersWrapper = new ArrayList<>();
BlockletIndexWrapper blockletIndexWrapper = null;
// Get the indexes for each index file from cache.
try {
for (TableBlockIndexUniqueIdentifierWrapper
identifierWrapper : tableSegmentUniqueIdentifiers) {
BlockletIndexWrapper indexWrapper = getIfPresent(identifierWrapper);
if (indexWrapper != null) {
blockletIndexWrappers.add(indexWrapper);
} else {
missedIdentifiersWrapper.add(identifierWrapper);
}
}
if (missedIdentifiersWrapper.size() > 0) {
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
blockletIndexWrapper = get(identifierWrapper, segInfoCache);
blockletIndexWrappers.add(blockletIndexWrapper);
}
}
} catch (Throwable e) {
if (null != blockletIndexWrapper) {
List<BlockIndex> indexes = blockletIndexWrapper.getIndexes();
for (Index index : indexes) {
index.clear();
}
}
throw new IOException("Problem in loading segment blocks: " + e.getMessage(), e);
}
return blockletIndexWrappers;
}
/**
* returns the SegmentTaskIndexWrapper
*
* @param tableSegmentUniqueIdentifierWrapper
* @return
*/
@Override
public BlockletIndexWrapper getIfPresent(
TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) {
return (BlockletIndexWrapper) lruCache.get(
tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier());
}
/**
* method invalidate the segment cache for segment
*
* @param tableSegmentUniqueIdentifierWrapper
*/
@Override
public void invalidate(
TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) {
BlockletIndexWrapper blockletIndexWrapper =
getIfPresent(tableSegmentUniqueIdentifierWrapper);
if (null != blockletIndexWrapper) {
// clear the segmentProperties cache
List<BlockIndex> indexes = blockletIndexWrapper.getIndexes();
if (null != indexes && !indexes.isEmpty()) {
String segmentId =
tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier().getSegmentId();
// as segmentId will be same for all the indexes and segmentProperties cache is
// maintained at segment level so it need to be called only once for clearing
SegmentPropertiesAndSchemaHolder.getInstance()
.invalidate(segmentId, indexes.get(0).getSegmentPropertiesWrapper(),
tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafeAndLRUCache());
}
}
lruCache.remove(tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier());
}
@Override
public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletIndexWrapper wrapper) throws IOException {
// As index will use unsafe memory, it is not recommended to overwrite an existing entry
// as in that case clearing unsafe memory need to be taken card. If at all index entry
// in the cache need to be overwritten then use the invalidate interface
// and then use the put interface
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
List<BlockIndex> indexes = wrapper.getIndexes();
try {
for (BlockIndex blockIndex : indexes) {
blockIndex.convertToUnsafeDMStore();
}
// get cacheExpirationTime for table from tableProperties
long expirationTime =
CarbonUtil.getExpiration_time(tableBlockIndexUniqueIdentifierWrapper.getCarbonTable());
// Locking is not required here because in LRU cache map add method is synchronized to add
// only one entry at a time and if a key already exists it will not overwrite the entry
lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize(), expirationTime);
} catch (Throwable e) {
// clear all the memory acquired by index in case of any failure
for (Index blockletIndex : indexes) {
blockletIndex.clear();
}
throw new IOException("Problem in adding index to cache.", e);
}
}
}
/**
* Below method will be used to load the segment of segments
* One segment may have multiple task , so table segment will be loaded
* based on task id and will return the map of taskId to table segment
* map
*
* @return map of taks id to segment mapping
* @throws IOException
*/
private BlockIndex loadAndGetIndex(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration,
boolean serializeDmStore, List<DataFileFooter> indexInfos) throws IOException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
if (lock == null) {
lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
}
BlockIndex blockIndex;
synchronized (lock) {
blockIndex = (BlockIndex) BlockletIndexFactory.createIndex(carbonTable);
final BlockletIndexModel blockletIndexModel = new BlockletIndexModel(carbonTable,
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration,
serializeDmStore);
blockletIndexModel.setIndexInfos(indexInfos);
blockIndex.init(blockletIndexModel);
}
return blockIndex;
}
/**
* Below method will be used to get the segment level lock object
*
* @param uniqueIdentifier
* @return lock object
*/
private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
// get the segment lock object if it is present then return
// otherwise add the new lock and return
Object segmentLockObject = segmentLockMap.get(uniqueIdentifier);
if (null == segmentLockObject) {
segmentLockObject = new Object();
segmentLockMap.put(uniqueIdentifier, segmentLockObject);
}
return segmentLockObject;
}
/**
* The method clears the access count of table segments
*
* @param tableSegmentUniqueIdentifiersWrapper
*/
@Override
public void clearAccessCount(
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiersWrapper) {
for (TableBlockIndexUniqueIdentifierWrapper
identifierWrapper : tableSegmentUniqueIdentifiersWrapper) {
BlockIndex cacheable = (BlockIndex) lruCache.get(
identifierWrapper.getTableBlockIndexUniqueIdentifier().getUniqueTableSegmentIdentifier());
cacheable.clear();
}
}
}