| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.index.bloom; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.carbondata.common.annotations.InterfaceAudience; |
| import org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandException; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.cache.Cache; |
| import org.apache.carbondata.core.cache.CacheProvider; |
| import org.apache.carbondata.core.cache.CacheType; |
| import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; |
| import org.apache.carbondata.core.datastore.block.SegmentProperties; |
| import org.apache.carbondata.core.datastore.filesystem.CarbonFile; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.features.TableOperation; |
| import org.apache.carbondata.core.index.IndexInputSplit; |
| import org.apache.carbondata.core.index.IndexLevel; |
| import org.apache.carbondata.core.index.IndexMeta; |
| import org.apache.carbondata.core.index.Segment; |
| import org.apache.carbondata.core.index.dev.IndexBuilder; |
| import org.apache.carbondata.core.index.dev.IndexFactory; |
| import org.apache.carbondata.core.index.dev.IndexWriter; |
| import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndex; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.IndexSchema; |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; |
| import org.apache.carbondata.core.scan.filter.intf.ExpressionType; |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| import org.apache.carbondata.events.Event; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * This class is for Bloom Filter for blocklet level |
| */ |
| @InterfaceAudience.Internal |
| public class BloomCoarseGrainIndexFactory extends IndexFactory<CoarseGrainIndex> { |
| private static final Logger LOGGER = LogServiceFactory.getLogService( |
| BloomCoarseGrainIndexFactory.class.getName()); |
| /** |
| * property for size of bloom filter |
| */ |
| private static final String BLOOM_SIZE = "bloom_size"; |
| /** |
| * default size for bloom filter, cardinality of the column. |
| */ |
| private static final int DEFAULT_BLOOM_FILTER_SIZE = |
| CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT * 20; |
| /** |
| * property for fpp(false-positive-probability) of bloom filter |
| */ |
| private static final String BLOOM_FPP = "bloom_fpp"; |
| /** |
| * default value for fpp of bloom filter is 0.001% |
| */ |
| private static final double DEFAULT_BLOOM_FILTER_FPP = 0.00001d; |
| |
| /** |
| * property for compressing bloom while saving to disk. |
| */ |
| private static final String COMPRESS_BLOOM = "bloom_compress"; |
| /** |
| * Default value of compressing bloom while save to disk. |
| */ |
| private static final boolean DEFAULT_BLOOM_COMPRESS = true; |
| |
| private IndexMeta indexMeta; |
| private String indexName; |
| private int bloomFilterSize; |
| private double bloomFilterFpp; |
| private boolean bloomCompress; |
| private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache; |
| // segmentId -> list of index file |
| private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>(); |
| |
| public BloomCoarseGrainIndexFactory(CarbonTable carbonTable, IndexSchema indexSchema) |
| throws MalformedIndexCommandException { |
| super(carbonTable, indexSchema); |
| Objects.requireNonNull(carbonTable); |
| Objects.requireNonNull(indexSchema); |
| |
| this.indexName = indexSchema.getIndexName(); |
| |
| List<CarbonColumn> indexedColumns = |
| carbonTable.getIndexedColumns(indexSchema.getIndexColumns()); |
| this.bloomFilterSize = validateAndGetBloomFilterSize(indexSchema); |
| this.bloomFilterFpp = validateAndGetBloomFilterFpp(indexSchema); |
| this.bloomCompress = validateAndGetBloomCompress(indexSchema); |
| List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>(); |
| // todo: support more optimize operations |
| optimizedOperations.add(ExpressionType.EQUALS); |
| optimizedOperations.add(ExpressionType.IN); |
| this.indexMeta = new IndexMeta(this.indexName, indexedColumns, optimizedOperations); |
| LOGGER.info(String.format("Index %s works for %s with bloom size %d", |
| this.indexName, this.indexMeta, this.bloomFilterSize)); |
| try { |
| this.cache = CacheProvider.getInstance() |
| .createCache(new CacheType("bloom_cache"), BloomIndexCache.class.getName()); |
| } catch (Exception e) { |
| LOGGER.error(e.getMessage(), e); |
| throw new MalformedIndexCommandException(e.getMessage()); |
| } |
| } |
| |
| /** |
| * validate Lucene Index BLOOM_SIZE |
| * 1. BLOOM_SIZE property is optional, 32000 * 20 will be the default size. |
| * 2. BLOOM_SIZE should be an integer that greater than 0 |
| */ |
| private int validateAndGetBloomFilterSize(IndexSchema dmSchema) |
| throws MalformedIndexCommandException { |
| String bloomFilterSizeStr = dmSchema.getProperties().get(BLOOM_SIZE); |
| if (StringUtils.isBlank(bloomFilterSizeStr)) { |
| LOGGER.warn( |
| String.format("Bloom filter size is not configured for index %s, use default value %d", |
| indexName, DEFAULT_BLOOM_FILTER_SIZE)); |
| return DEFAULT_BLOOM_FILTER_SIZE; |
| } |
| int bloomFilterSize; |
| try { |
| bloomFilterSize = Integer.parseInt(bloomFilterSizeStr); |
| } catch (NumberFormatException e) { |
| throw new MalformedIndexCommandException( |
| String.format("Invalid value of bloom filter size '%s', it should be an integer", |
| bloomFilterSizeStr)); |
| } |
| // todo: reconsider the boundaries of bloom filter size |
| if (bloomFilterSize <= 0) { |
| throw new MalformedIndexCommandException( |
| String.format("Invalid value of bloom filter size '%s', it should be greater than 0", |
| bloomFilterSizeStr)); |
| } |
| return bloomFilterSize; |
| } |
| |
| /** |
| * validate bloom Index BLOOM_FPP |
| * 1. BLOOM_FPP property is optional, 0.00001 will be the default value. |
| * 2. BLOOM_FPP should be (0, 1) |
| */ |
| private double validateAndGetBloomFilterFpp(IndexSchema dmSchema) |
| throws MalformedIndexCommandException { |
| String bloomFilterFppStr = dmSchema.getProperties().get(BLOOM_FPP); |
| if (StringUtils.isBlank(bloomFilterFppStr)) { |
| LOGGER.warn( |
| String.format("Bloom filter FPP is not configured for index %s, use default value %f", |
| indexName, DEFAULT_BLOOM_FILTER_FPP)); |
| return DEFAULT_BLOOM_FILTER_FPP; |
| } |
| double bloomFilterFpp; |
| try { |
| bloomFilterFpp = Double.parseDouble(bloomFilterFppStr); |
| } catch (NumberFormatException e) { |
| throw new MalformedIndexCommandException( |
| String.format("Invalid value of bloom filter fpp '%s', it should be an numeric", |
| bloomFilterFppStr)); |
| } |
| if (bloomFilterFpp < 0 || bloomFilterFpp - 1 >= 0) { |
| throw new MalformedIndexCommandException( |
| String.format("Invalid value of bloom filter fpp '%s', it should be in range 0~1", |
| bloomFilterFppStr)); |
| } |
| return bloomFilterFpp; |
| } |
| |
| /** |
| * validate bloom Index COMPRESS_BLOOM |
| * Default value is true |
| */ |
| private boolean validateAndGetBloomCompress(IndexSchema dmSchema) { |
| String bloomCompress = dmSchema.getProperties().get(COMPRESS_BLOOM); |
| if (StringUtils.isBlank(bloomCompress)) { |
| LOGGER.warn( |
| String.format("Bloom compress is not configured for index %s, use default value %b", |
| indexName, DEFAULT_BLOOM_COMPRESS)); |
| return DEFAULT_BLOOM_COMPRESS; |
| } |
| return Boolean.parseBoolean(bloomCompress); |
| } |
| |
| @Override |
| public IndexWriter createWriter(Segment segment, String shardName, |
| SegmentProperties segmentProperties) throws IOException { |
| LOGGER.info( |
| String.format("Data of BloomCoarseGrainIndex %s for table %s will be written to %s", |
| this.indexName, getCarbonTable().getTableName() , shardName)); |
| return new BloomIndexWriter(getCarbonTable().getTablePath(), this.indexName, |
| this.indexMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize, |
| this.bloomFilterFpp, bloomCompress); |
| } |
| |
| @Override |
| public IndexBuilder createBuilder(Segment segment, String shardName, |
| SegmentProperties segmentProperties) throws IOException { |
| return new BloomIndexBuilder(getCarbonTable().getTablePath(), this.indexName, |
| this.indexMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize, |
| this.bloomFilterFpp, bloomCompress); |
| } |
| |
| /** |
| * returns all shard directories of bloom index files for query |
| * if bloom index files are merged we should get only one shard path |
| */ |
| public static Set<String> getAllShardPaths(String tablePath, String segmentId, String indexName) { |
| String indexStorePath = CarbonTablePath.getIndexesStorePath( |
| tablePath, segmentId, indexName); |
| CarbonFile[] carbonFiles = FileFactory.getCarbonFile(indexStorePath).listFiles(); |
| Set<String> shardPaths = new HashSet<>(); |
| boolean mergeShardInProgress = false; |
| CarbonFile mergeShardFile = null; |
| for (CarbonFile carbonFile : carbonFiles) { |
| if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { |
| mergeShardFile = carbonFile; |
| } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { |
| mergeShardInProgress = true; |
| } else if (carbonFile.isDirectory()) { |
| shardPaths.add(FileFactory.getPath(carbonFile.getAbsolutePath()).toString()); |
| } |
| } |
| if (mergeShardFile != null && !mergeShardInProgress) { |
| // should only get one shard path if mergeShard is generated successfully |
| shardPaths.clear(); |
| shardPaths.add(FileFactory.getPath(mergeShardFile.getAbsolutePath()).toString()); |
| } |
| return shardPaths; |
| } |
| |
| @Override |
| public List<CoarseGrainIndex> getIndexes(Segment segment) throws IOException { |
| List<CoarseGrainIndex> indexes = new ArrayList<>(); |
| try { |
| Set<String> shardPaths = segmentMap.get(segment.getSegmentNo()); |
| if (shardPaths == null) { |
| shardPaths = |
| getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), indexName); |
| segmentMap.put(segment.getSegmentNo(), shardPaths); |
| } |
| Set<String> filteredShards = segment.getFilteredIndexShardNames(); |
| for (String shard : shardPaths) { |
| if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) || |
| filteredShards.contains(new File(shard).getName())) { |
| // Filter out the tasks which are filtered through Main index. |
| // for merge shard, shard pruning delay to be done before pruning blocklet |
| BloomCoarseGrainIndex bloomDM = new BloomCoarseGrainIndex(); |
| bloomDM.init(new BloomIndexModel(shard, cache, segment.getConfiguration())); |
| bloomDM.initIndexColumnConverters(getCarbonTable(), indexMeta.getIndexedColumns()); |
| bloomDM.setFilteredShard(filteredShards); |
| indexes.add(bloomDM); |
| } |
| } |
| } catch (Exception e) { |
| throw new IOException("Error occurs while init Bloom Index", e); |
| } |
| return indexes; |
| } |
| |
| @Override |
| public List<CoarseGrainIndex> getIndexes(Segment segment, Set<Path> partitionLocations) |
| throws IOException { |
| return getIndexes(segment); |
| } |
| |
| @Override |
| public List<CoarseGrainIndex> getIndexes(IndexInputSplit distributable) { |
| List<CoarseGrainIndex> indexes = new ArrayList<>(); |
| String indexPath = ((BloomIndexInputSplit) distributable).getIndexPath(); |
| Set<String> filteredShards = ((BloomIndexInputSplit) distributable).getFilteredShards(); |
| BloomCoarseGrainIndex bloomDM = new BloomCoarseGrainIndex(); |
| bloomDM.init(new BloomIndexModel(indexPath, cache, FileFactory.getConfiguration())); |
| bloomDM.initIndexColumnConverters(getCarbonTable(), indexMeta.getIndexedColumns()); |
| bloomDM.setFilteredShard(filteredShards); |
| indexes.add(bloomDM); |
| return indexes; |
| } |
| |
| @Override |
| public List<IndexInputSplit> toDistributable(Segment segment) { |
| List<IndexInputSplit> indexInputSplitList = new ArrayList<>(); |
| Set<String> shardPaths = segmentMap.get(segment.getSegmentNo()); |
| if (shardPaths == null) { |
| shardPaths = |
| getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), indexName); |
| segmentMap.put(segment.getSegmentNo(), shardPaths); |
| } |
| Set<String> filteredShards = segment.getFilteredIndexShardNames(); |
| for (String shardPath : shardPaths) { |
| // Filter out the tasks which are filtered through Main index. |
| // for merge shard, shard pruning delay to be done before pruning blocklet |
| if (shardPath.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) || |
| filteredShards.contains(new File(shardPath).getName())) { |
| IndexInputSplit bloomIndexInputSplit = |
| new BloomIndexInputSplit(shardPath, filteredShards); |
| bloomIndexInputSplit.setSegment(segment); |
| bloomIndexInputSplit.setIndexSchema(getIndexSchema()); |
| indexInputSplitList.add(bloomIndexInputSplit); |
| } |
| } |
| return indexInputSplitList; |
| } |
| |
| @Override |
| public void fireEvent(Event event) { |
| |
| } |
| |
| @Override |
| public void clear(String segment) { |
| Set<String> shards = segmentMap.remove(segment); |
| if (shards != null) { |
| for (String shard : shards) { |
| for (CarbonColumn carbonColumn : indexMeta.getIndexedColumns()) { |
| cache.invalidate(new BloomCacheKeyValue.CacheKey(shard, carbonColumn.getColName())); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void clear() { |
| if (segmentMap.size() > 0) { |
| List<String> segments = new ArrayList<>(segmentMap.keySet()); |
| for (String segmentId : segments) { |
| clear(segmentId); |
| } |
| } |
| } |
| |
| @Override |
| public void deleteIndexData(Segment segment) throws IOException { |
| deleteSegmentIndexData(segment.getSegmentNo()); |
| } |
| |
| @Override |
| public void deleteSegmentIndexData(String segmentId) throws IOException { |
| try { |
| String indexPath = CarbonTablePath |
| .getIndexesStorePath(getCarbonTable().getTablePath(), segmentId, indexName); |
| if (FileFactory.isFileExist(indexPath)) { |
| CarbonFile file = FileFactory.getCarbonFile(indexPath); |
| CarbonUtil.deleteFoldersAndFilesSilent(file); |
| } |
| clear(segmentId); |
| } catch (InterruptedException ex) { |
| throw new IOException("Failed to delete index for segment_" + segmentId); |
| } |
| } |
| |
| @Override |
| public void deleteIndexData() { |
| SegmentStatusManager ssm = |
| new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier()); |
| try { |
| List<Segment> validSegments = |
| ssm.getValidAndInvalidSegments(getCarbonTable().isMV()).getValidSegments(); |
| for (Segment segment : validSegments) { |
| deleteIndexData(segment); |
| } |
| } catch (IOException e) { |
| LOGGER.error("drop index failed, failed to delete index directory"); |
| } |
| } |
| |
| @Override |
| public boolean willBecomeStale(TableOperation operation) { |
| switch (operation) { |
| case ALTER_RENAME: |
| return false; |
| case ALTER_DROP: |
| return true; |
| case ALTER_ADD_COLUMN: |
| return false; |
| case ALTER_CHANGE_DATATYPE: |
| return true; |
| case ALTER_COLUMN_RENAME: |
| return true; |
| case STREAMING: |
| return false; |
| case DELETE: |
| return true; |
| case UPDATE: |
| return true; |
| case PARTITION: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| @Override |
| public boolean isOperationBlocked(TableOperation operation, Object... targets) { |
| switch (operation) { |
| case ALTER_DROP: { |
| // alter table drop columns |
| // will be blocked if the columns in bloomfilter index |
| List<String> columnsToDrop = (List<String>) targets[0]; |
| List<String> indexedColumnNames = indexMeta.getIndexedColumnNames(); |
| for (String indexedcolumn : indexedColumnNames) { |
| for (String column : columnsToDrop) { |
| if (column.equalsIgnoreCase(indexedcolumn)) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| case ALTER_CHANGE_DATATYPE: |
| case ALTER_COLUMN_RENAME: { |
| // alter table change one column datatype, or rename |
| // will be blocked if the column in bloomfilter index |
| String columnToChangeDatatype = (String) targets[0]; |
| List<String> indexedColumnNames = indexMeta.getIndexedColumnNames(); |
| for (String indexedcolumn : indexedColumnNames) { |
| if (indexedcolumn.equalsIgnoreCase(columnToChangeDatatype)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| default: |
| return false; |
| } |
| } |
| |
| @Override |
| public IndexMeta getMeta() { |
| return this.indexMeta; |
| } |
| |
| @Override |
| public IndexLevel getIndexLevel() { |
| return IndexLevel.CG; |
| } |
| |
| @Override |
| public String getCacheSize() { |
| long sum = 0L; |
| for (Map.Entry<String, Set<String>> entry : segmentMap.entrySet()) { |
| for (String shardName : entry.getValue()) { |
| for (CarbonColumn carbonColumn : indexMeta.getIndexedColumns()) { |
| BloomCacheKeyValue.CacheValue cacheValue = cache |
| .getIfPresent(new BloomCacheKeyValue.CacheKey(shardName, carbonColumn.getColName())); |
| if (cacheValue != null) { |
| sum += cacheValue.getMemorySize(); |
| } |
| } |
| } |
| } return 0 + ":" + sum; |
| } |
| } |