blob: f26f654801c3dbd15cf34fc26abf811bf0d9b039 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.indexstore.blockletindex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.IndexInputSplit;
import org.apache.carbondata.core.index.IndexMeta;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.index.dev.CacheableIndex;
import org.apache.carbondata.core.index.dev.Index;
import org.apache.carbondata.core.index.dev.IndexBuilder;
import org.apache.carbondata.core.index.dev.IndexWriter;
import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndex;
import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndexFactory;
import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.SegmentBlockIndexInfo;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo;
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
import org.apache.carbondata.core.util.BlockletIndexUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.Path;
/**
* Index for blocklet
*/
public class BlockletIndexFactory extends CoarseGrainIndexFactory
implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableIndex {
private static final String NAME = "clustered.btree.blocklet";
/**
* variable for cache level BLOCKLET
*/
public static final String CACHE_LEVEL_BLOCKLET = "BLOCKLET";
public static final IndexSchema INDEX_SCHEMA =
new IndexSchema(NAME, BlockletIndexFactory.class.getName());
private AbsoluteTableIdentifier identifier;
// segmentId -> list of index file
private Map<String, SegmentBlockIndexInfo> segmentMap = new ConcurrentHashMap<>();
private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache;
public BlockletIndexFactory(CarbonTable carbonTable, IndexSchema indexSchema) {
super(carbonTable, indexSchema);
this.identifier = carbonTable.getAbsoluteTableIdentifier();
cache = CacheProvider.getInstance()
.createCache(CacheType.DRIVER_BLOCKLET_INDEX);
}
/**
* create index based on cache level
*
* @param carbonTable
* @return
*/
public static Index createIndex(CarbonTable carbonTable) {
boolean cacheLevelBlock = BlockletIndexUtil.isCacheLevelBlock(carbonTable);
if (cacheLevelBlock) {
// case1: when CACHE_LEVEL = BLOCK
return new BlockIndex();
} else {
// case2: when CACHE_LEVEL = BLOCKLET
return new BlockletIndex();
}
}
@Override
public IndexWriter createWriter(Segment segment, String shardName,
SegmentProperties segmentProperties) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public IndexBuilder createBuilder(Segment segment, String shardName,
SegmentProperties segmentProperties) {
throw new UnsupportedOperationException("not implemented");
}
/**
* Get the indexes for all segments
*/
public Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments,
IndexFilter filter) throws IOException {
return getIndexes(segments, new HashSet<>(), filter);
}
/**
* Get the index for all segments
*/
public Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments,
Set<Path> partitionLocations, IndexFilter filter) throws IOException {
List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
new ArrayList<>();
Map<Segment, List<CoarseGrainIndex>> indexMap = new HashMap<>();
Map<String, Segment> segmentMap = new HashMap<>();
for (Segment segment : segments) {
segmentMap.put(segment.getSegmentNo(), segment);
Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
if (!partitionLocations.isEmpty()) {
// get tableBlockIndexUniqueIdentifierWrappers from segment file info
getTableBlockUniqueIdentifierWrappers(partitionLocations,
tableBlockIndexUniqueIdentifierWrappers, identifiers);
} else {
SegmentMetaDataInfo segmentMetaDataInfo = segment.getSegmentMetaDataInfo();
boolean isLoadAllIndex = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT));
if (!isLoadAllIndex && null != segmentMetaDataInfo && null != filter && !filter.isEmpty()
&& null != filter.getExpression() && null == FilterUtil
.getImplicitFilterExpression(filter.getExpression())) {
getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(segment, segmentMetaDataInfo, filter,
identifiers, tableBlockIndexUniqueIdentifierWrappers);
} else {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
}
}
}
}
List<BlockletIndexWrapper> blockletIndexWrappers =
cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
for (BlockletIndexWrapper wrapper : blockletIndexWrappers) {
Segment segment = segmentMap.get(wrapper.getSegmentId());
List<CoarseGrainIndex> indexes = indexMap.get(segment);
if (null == indexes) {
indexes = new ArrayList<CoarseGrainIndex>();
}
indexes.addAll(wrapper.getIndexes());
indexMap.put(segment, indexes);
}
return indexMap;
}
/**
* get tableBlockUniqueIdentifierWrappers from segment info. If partitionLocations is defined,
* then get tableBlockUniqueIdentifierWrappers for the matched partitions.
*/
private void getTableBlockUniqueIdentifierWrappers(Set<Path> partitionLocations,
List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers,
Set<TableBlockIndexUniqueIdentifier> identifiers) {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
if (!partitionLocations.isEmpty()) {
// add only tableBlockUniqueIdentifier that matches the partition
// get the indexFile Parent path and compare with the PartitionPath, if matches, then add
// the corresponding tableBlockIndexUniqueIdentifier for pruning
if (partitionLocations
.contains(new Path(tableBlockIndexUniqueIdentifier.getIndexFilePath()))) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
}
} else {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
}
}
}
/**
* Using blockLevel minmax values, identify if segment has to be added for further pruning and to
* load segment index info to cache
* @param segment to be identified if needed for loading block indexes
* @param segmentMetaDataInfo list of block level min max values
* @param filter filter expression
* @param identifiers tableBlockIndexUniqueIdentifiers
* @param tableBlockIndexUniqueIdentifierWrappers to add tableBlockIndexUniqueIdentifiers
*/
private void getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(Segment segment,
SegmentMetaDataInfo segmentMetaDataInfo, IndexFilter filter,
Set<TableBlockIndexUniqueIdentifier> identifiers,
List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers) {
boolean isScanRequired = false;
Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap =
segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap();
int length = segmentColumnMetaDataInfoMap.size();
// Add columnSchemas based on the columns present in segment
List<ColumnSchema> columnSchemas = new ArrayList<>();
byte[][] min = new byte[length][];
byte[][] max = new byte[length][];
boolean[] minMaxFlag = new boolean[length];
int i = 0;
// get current columnSchema list for the table
Map<String, ColumnSchema> tableColumnSchemas =
this.getCarbonTable().getTableInfo().getFactTable().getListOfColumns().stream()
.collect(Collectors.toMap(ColumnSchema::getColumnUniqueId, ColumnSchema::clone));
// fill min,max and columnSchema values
for (Map.Entry<String, SegmentColumnMetaDataInfo> columnMetaData :
segmentColumnMetaDataInfoMap.entrySet()) {
ColumnSchema columnSchema = tableColumnSchemas.get(columnMetaData.getKey());
if (null != columnSchema) {
// get segment sort column and column drift info
boolean isSortColumnInSegment = columnMetaData.getValue().isSortColumn();
boolean isColumnDriftInSegment = columnMetaData.getValue().isColumnDrift();
if (null != columnSchema.getColumnProperties()) {
// get current sort column and column drift info from current columnSchema
String isSortColumn =
columnSchema.getColumnProperties().get(CarbonCommonConstants.SORT_COLUMNS);
String isColumnDrift =
columnSchema.getColumnProperties().get(CarbonCommonConstants.COLUMN_DRIFT);
if (null != isSortColumn) {
if (isSortColumn.equalsIgnoreCase("true") && !isSortColumnInSegment) {
// Unset current column schema column properties
modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
false);
} else if (isSortColumn.equalsIgnoreCase("false") && isSortColumnInSegment) {
// set sort column to true in current column schema column properties
modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
true);
}
} else {
modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
false);
}
}
columnSchemas.add(columnSchema);
min[i] = columnMetaData.getValue().getColumnMinValue();
max[i] = columnMetaData.getValue().getColumnMaxValue();
minMaxFlag[i] = min[i].length != 0 && max[i].length != 0;
i++;
}
}
// get segmentProperties using created columnSchemas list
SegmentProperties segmentProperties = SegmentPropertiesAndSchemaHolder.getInstance()
.addSegmentProperties(this.getCarbonTable(), columnSchemas, segment.getSegmentNo())
.getSegmentProperties();
FilterResolverIntf resolver =
new IndexFilter(segmentProperties, this.getCarbonTable(), filter.getExpression())
.getResolver();
// prepare filter executor using IndexFilter resolver
FilterExecutor filterExecutor =
FilterUtil.getFilterExecutorTree(resolver, segmentProperties, null, null, false);
// check if block has to be pruned based on segment minmax
BitSet scanRequired = filterExecutor.isScanRequired(max, min, minMaxFlag);
if (!scanRequired.isEmpty()) {
isScanRequired = true;
}
if (isScanRequired) {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
}
}
}
private void modifyColumnSchemaForSortColumn(ColumnSchema columnSchema, boolean columnDrift,
String isColumnDrift, boolean isSortColumnInSegment) {
if (!isSortColumnInSegment) {
if (null != isColumnDrift && isColumnDrift.equalsIgnoreCase("true") && !columnDrift) {
columnSchema.setDimensionColumn(false);
}
columnSchema.setSortColumn(false);
columnSchema.getColumnProperties().clear();
} else {
// modify column schema, if current columnSchema is changed
columnSchema.setSortColumn(true);
if (!columnSchema.isDimensionColumn()) {
columnSchema.setDimensionColumn(true);
columnSchema.getColumnProperties().put(CarbonCommonConstants.COLUMN_DRIFT, "true");
}
columnSchema.getColumnProperties().put(CarbonCommonConstants.SORT_COLUMNS, "true");
}
}
@Override
public List<CoarseGrainIndex> getIndexes(Segment segment) throws IOException {
return getIndexes(segment, new HashSet<>());
}
@Override
public List<CoarseGrainIndex> getIndexes(Segment segment,
Set<Path> partitionLocations) throws IOException {
List<CoarseGrainIndex> indexes = new ArrayList<>();
Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
new ArrayList<>(identifiers.size());
getTableBlockUniqueIdentifierWrappers(partitionLocations,
tableBlockIndexUniqueIdentifierWrappers, identifiers);
List<BlockletIndexWrapper> blockletIndexWrappers =
cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
for (BlockletIndexWrapper wrapper : blockletIndexWrappers) {
indexes.addAll(wrapper.getIndexes());
}
return indexes;
}
public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
throws IOException {
SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
if (null != segmentBlockIndexInfo &&
CollectionUtils.isNotEmpty(
segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers())) {
if (null != segmentBlockIndexInfo.getSegmentMetaDataInfo()) {
segment.setSegmentMetaDataInfo(
segmentMap.get(segment.getSegmentNo()).getSegmentMetaDataInfo());
}
return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
} else {
tableBlockIndexUniqueIdentifiers =
BlockletIndexUtil.getTableBlockUniqueIdentifiers(segment);
if (tableBlockIndexUniqueIdentifiers.size() > 0) {
segmentMap.put(segment.getSegmentNo(),
new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
segment.getSegmentMetaDataInfo()));
}
}
return tableBlockIndexUniqueIdentifiers;
}
/**
* Get the blocklet detail information based on blockletId, blockId and segmentId. This method is
* exclusively for BlockletIndexFactory as detail information is only available in this
* default index.
*/
@Override
public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
throws IOException {
List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(blocklets.size() + 1);
// if the blocklets is empty, return the empty detailed blocklets list directly.
if (blocklets.size() == 0) {
return detailedBlocklets;
}
// If it is already detailed blocklet then type cast and return same
if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
for (Blocklet blocklet : blocklets) {
detailedBlocklets.add((ExtendedBlocklet) blocklet);
}
return detailedBlocklets;
}
Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
new HashSet<>(identifiers.size());
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
}
// Retrieve each blocklets detail information from blocklet index
for (Blocklet blocklet : blocklets) {
detailedBlocklets.add(getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet));
}
return detailedBlocklets;
}
@Override
public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
throws IOException {
if (blocklet instanceof ExtendedBlocklet) {
return (ExtendedBlocklet) blocklet;
}
Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
new HashSet<>(identifiers.size());
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
}
return getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet);
}
private ExtendedBlocklet getExtendedBlocklet(
Set<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper, Blocklet blocklet)
throws IOException {
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : identifiersWrapper) {
BlockletIndexWrapper wrapper = cache.get(identifierWrapper);
List<BlockIndex> indexes = wrapper.getIndexes();
for (Index index : indexes) {
if (((BlockIndex) index)
.getTableTaskInfo(BlockletIndexRowIndexes.SUMMARY_INDEX_FILE_NAME)
.startsWith(blocklet.getFilePath())) {
return ((BlockIndex) index).getDetailedBlocklet(blocklet.getBlockletId());
}
}
}
throw new IOException("Blocklet not found: " + blocklet.toString());
}
@Override
public List<IndexInputSplit> toDistributable(Segment segment) {
List<IndexInputSplit> distributableList = new ArrayList<>();
try {
BlockletIndexInputSplit distributable = new BlockletIndexInputSplit();
distributable.setSegment(segment);
distributable.setIndexSchema(INDEX_SCHEMA);
distributable.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath(),
segment.getSegmentNo()));
distributableList.add(new IndexInputSplitWrapper(UUID.randomUUID().toString(),
distributable).getDistributable());
} catch (Exception e) {
throw new RuntimeException(e);
}
return distributableList;
}
@Override
public void fireEvent(Event event) {
}
@Override
public void clear(String segment) {
SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.remove(segment);
Set<TableBlockIndexUniqueIdentifier> blockIndexes = null;
if (null != segmentBlockIndexInfo) {
blockIndexes = segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
}
if (blockIndexes != null) {
for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper =
new TableBlockIndexUniqueIdentifierWrapper(blockIndex, this.getCarbonTable());
BlockletIndexWrapper wrapper = cache.getIfPresent(blockIndexWrapper);
if (null != wrapper) {
List<BlockIndex> indexes = wrapper.getIndexes();
for (Index index : indexes) {
if (index != null) {
cache.invalidate(blockIndexWrapper);
index.clear();
}
}
}
}
}
}
@Override
public synchronized void clear() {
if (segmentMap.size() > 0) {
for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
clear(segmentId);
}
}
}
@Override
public String getCacheSize() {
long sum = 0L;
int numOfIndexFiles = 0;
for (Map.Entry<String, SegmentBlockIndexInfo> entry : segmentMap.entrySet()) {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()
.getTableBlockIndexUniqueIdentifiers()) {
BlockletIndexWrapper blockletIndexWrapper = cache.getIfPresent(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
getCarbonTable()));
if (blockletIndexWrapper != null) {
sum += blockletIndexWrapper.getMemorySize();
numOfIndexFiles++;
}
}
}
return numOfIndexFiles + ":" + sum;
}
@Override
public List<CoarseGrainIndex> getIndexes(IndexInputSplit distributable)
throws IOException {
BlockletIndexInputSplit mapDistributable = (BlockletIndexInputSplit) distributable;
List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper;
String segmentNo = mapDistributable.getSegment().getSegmentNo();
if (mapDistributable.getSegmentPath() != null) {
identifiersWrapper = getTableBlockIndexUniqueIdentifier(distributable);
} else {
identifiersWrapper =
getTableBlockIndexUniqueIdentifier(mapDistributable.getFilePath(), segmentNo);
}
List<CoarseGrainIndex> indexes = new ArrayList<>();
try {
List<BlockletIndexWrapper> wrappers = cache.getAll(identifiersWrapper);
for (BlockletIndexWrapper wrapper : wrappers) {
indexes.addAll(wrapper.getIndexes());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return indexes;
}
private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
IndexInputSplit distributable) throws IOException {
List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
SegmentBlockIndexInfo segmentBlockIndexInfo =
segmentMap.get(distributable.getSegment().getSegmentNo());
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
if (null != segmentBlockIndexInfo) {
tableBlockIndexUniqueIdentifiers =
segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
}
if (tableBlockIndexUniqueIdentifiers == null) {
tableBlockIndexUniqueIdentifiers = new HashSet<>();
Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet();
for (String indexFile : indexFiles) {
CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
String indexFileName;
String mergeIndexName;
if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
indexFileName = carbonFile.getName();
mergeIndexName = null;
} else {
indexFileName = carbonFile.getName();
mergeIndexName = carbonFile.getName();
}
String parentPath = carbonFile.getParentFile().getAbsolutePath();
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier =
new TableBlockIndexUniqueIdentifier(parentPath, indexFileName, mergeIndexName,
distributable.getSegment().getSegmentNo());
identifiersWrapper.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
}
segmentMap.put(distributable.getSegment().getSegmentNo(),
new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
distributable.getSegment().getSegmentMetaDataInfo()));
} else {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
tableBlockIndexUniqueIdentifiers) {
identifiersWrapper.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
getCarbonTable()));
}
}
return identifiersWrapper;
}
private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
String indexFilePath, String segmentId) throws IOException {
List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
String parent = indexFilePath.substring(0, indexFilePath.lastIndexOf("/"));
String name =
indexFilePath.substring(indexFilePath.lastIndexOf("/") + 1, indexFilePath.length());
if (indexFilePath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
new TableBlockIndexUniqueIdentifier(parent, name, null, segmentId),
this.getCarbonTable()));
} else if (indexFilePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(indexFilePath);
for (String indexFile : indexFiles) {
identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
new TableBlockIndexUniqueIdentifier(parent, indexFile, name,
segmentId), this.getCarbonTable()));
}
}
return identifiersWrapper;
}
@Override
public IndexMeta getMeta() {
// TODO: pass SORT_COLUMNS into this class
return null;
}
@Override
public void deleteIndexData(Segment segment) {
}
@Override
public void deleteIndexData() {
}
@Override
public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
return getSegmentProperties(segment, new HashSet<>());
}
@Override
public SegmentProperties getSegmentProperties(Segment segment, Set<Path> partitionLocations)
throws IOException {
List<CoarseGrainIndex> indexes = getIndexes(segment, partitionLocations);
assert (indexes.size() > 0);
CoarseGrainIndex coarseGrainIndex = indexes.get(0);
assert (coarseGrainIndex instanceof BlockIndex);
BlockIndex index = (BlockIndex) coarseGrainIndex;
return index.getSegmentProperties();
}
@Override
public SegmentProperties getSegmentPropertiesFromIndex(Index coarseGrainIndex) {
assert (coarseGrainIndex instanceof BlockIndex);
BlockIndex index = (BlockIndex) coarseGrainIndex;
return index.getSegmentProperties();
}
@Override
public List<Blocklet> getAllBlocklets(Segment segment, Set<Path> partitionLocations)
throws IOException {
List<Blocklet> blocklets = new ArrayList<>();
List<CoarseGrainIndex> indexes = getIndexes(segment, partitionLocations);
if (indexes.size() == 0) {
return blocklets;
}
SegmentProperties segmentProperties = getSegmentPropertiesFromIndex(indexes.get(0));
for (CoarseGrainIndex index : indexes) {
blocklets.addAll(index
.prune(null, segmentProperties, null, this.getCarbonTable()));
}
return blocklets;
}
@Override
public boolean willBecomeStale(TableOperation operation) {
return false;
}
@Override
public void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletIndexWrapper blockletIndexWrapper) throws IOException {
cache.put(tableBlockIndexUniqueIdentifierWrapper, blockletIndexWrapper);
}
@Override
public List<IndexInputSplit> getAllUncached(
List<IndexInputSplit> distributableList) throws IOException {
List<IndexInputSplit> distributableToBeLoaded = new ArrayList<>(distributableList.size());
for (IndexInputSplit distributable : distributableList) {
Segment segment = distributable.getSegment();
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
getTableBlockIndexUniqueIdentifiers(segment);
// filter out the tableBlockIndexUniqueIdentifiers based on distributable
TableBlockIndexUniqueIdentifier validIdentifier = BlockletIndexUtil
.filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
(BlockletIndexInputSplit) distributable);
if (null == cache.getIfPresent(
new TableBlockIndexUniqueIdentifierWrapper(validIdentifier, this.getCarbonTable()))) {
((BlockletIndexInputSplit) distributable)
.setTableBlockIndexUniqueIdentifier(validIdentifier);
distributableToBeLoaded.add(distributable);
}
}
return distributableToBeLoaded;
}
private Set<TableBlockIndexUniqueIdentifier> getTableSegmentUniqueIdentifiers(Segment segment)
throws IOException {
SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
if (segmentBlockIndexInfo == null) {
return BlockletIndexUtil.getSegmentUniqueIdentifiers(segment);
}
return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
}
public void updateSegmentIndex(
Map<String, Set<TableBlockIndexUniqueIdentifier>> indexUniqueIdentifiers) {
for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> identifier : indexUniqueIdentifiers
.entrySet()) {
segmentMap.put(identifier.getKey(), new SegmentBlockIndexInfo(identifier.getValue(), null));
}
}
@Override
public List<IndexInputSplit> getAllUncached(List<Segment> validSegments,
IndexExprWrapper indexExprWrapper) throws IOException {
List<IndexInputSplit> distributableToBeLoaded = new ArrayList<>();
for (Segment segment : validSegments) {
IndexInputSplitWrapper indexInputSplitWrappers =
indexExprWrapper.toDistributableSegment(segment);
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
getTableSegmentUniqueIdentifiers(segment);
for (TableBlockIndexUniqueIdentifier identifier : tableBlockIndexUniqueIdentifiers) {
BlockletIndexWrapper blockletIndexWrapper = cache.getIfPresent(
new TableBlockIndexUniqueIdentifierWrapper(identifier, this.getCarbonTable()));
if (identifier.getIndexFilePath() == null || blockletIndexWrapper == null) {
((BlockletIndexInputSplit) indexInputSplitWrappers.getDistributable())
.setTableBlockIndexUniqueIdentifier(identifier);
distributableToBeLoaded.add(indexInputSplitWrappers.getDistributable());
}
}
}
return distributableToBeLoaded;
}
@Override
public IndexInputSplitWrapper toDistributableSegment(Segment segment,
IndexSchema schema, AbsoluteTableIdentifier identifier, String uniqueId) {
try {
BlockletIndexInputSplit distributable = new BlockletIndexInputSplit();
distributable.setIndexSchema(schema);
distributable.setSegment(segment);
distributable.setSegmentPath(
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
distributable.setTablePath(identifier.getTablePath());
return new IndexInputSplitWrapper(uniqueId, distributable);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}