blob: 4cb538593501a600d6747b5c3e8ecadaf88dfb98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.datamap.bloom;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
/**
* This class is for Bloom Filter for blocklet level
*/
@InterfaceAudience.Internal
public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDataMap> {
private static final Logger LOGGER = LogServiceFactory.getLogService(
BloomCoarseGrainDataMapFactory.class.getName());
/**
* property for size of bloom filter
*/
private static final String BLOOM_SIZE = "bloom_size";
/**
* default size for bloom filter, cardinality of the column.
*/
private static final int DEFAULT_BLOOM_FILTER_SIZE =
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT * 20;
/**
* property for fpp(false-positive-probability) of bloom filter
*/
private static final String BLOOM_FPP = "bloom_fpp";
/**
* default value for fpp of bloom filter is 0.001%
*/
private static final double DEFAULT_BLOOM_FILTER_FPP = 0.00001d;
/**
* property for compressing bloom while saving to disk.
*/
private static final String COMPRESS_BLOOM = "bloom_compress";
/**
* Default value of compressing bloom while save to disk.
*/
private static final boolean DEFAULT_BLOOM_COMPRESS = true;
private DataMapMeta dataMapMeta;
private String dataMapName;
private int bloomFilterSize;
private double bloomFilterFpp;
private boolean bloomCompress;
private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
// segmentId -> list of index file
private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
throws MalformedDataMapCommandException {
super(carbonTable, dataMapSchema);
Objects.requireNonNull(carbonTable);
Objects.requireNonNull(dataMapSchema);
this.dataMapName = dataMapSchema.getDataMapName();
List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
this.bloomFilterFpp = validateAndGetBloomFilterFpp(dataMapSchema);
this.bloomCompress = validateAndGetBloomCompress(dataMapSchema);
List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
// todo: support more optimize operations
optimizedOperations.add(ExpressionType.EQUALS);
optimizedOperations.add(ExpressionType.IN);
this.dataMapMeta = new DataMapMeta(this.dataMapName, indexedColumns, optimizedOperations);
LOGGER.info(String.format("DataMap %s works for %s with bloom size %d",
this.dataMapName, this.dataMapMeta, this.bloomFilterSize));
try {
this.cache = CacheProvider.getInstance()
.createCache(new CacheType("bloom_cache"), BloomDataMapCache.class.getName());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new MalformedDataMapCommandException(e.getMessage());
}
}
/**
* validate Lucene DataMap BLOOM_SIZE
* 1. BLOOM_SIZE property is optional, 32000 * 20 will be the default size.
* 2. BLOOM_SIZE should be an integer that greater than 0
*/
private int validateAndGetBloomFilterSize(DataMapSchema dmSchema)
throws MalformedDataMapCommandException {
String bloomFilterSizeStr = dmSchema.getProperties().get(BLOOM_SIZE);
if (StringUtils.isBlank(bloomFilterSizeStr)) {
LOGGER.warn(
String.format("Bloom filter size is not configured for datamap %s, use default value %d",
dataMapName, DEFAULT_BLOOM_FILTER_SIZE));
return DEFAULT_BLOOM_FILTER_SIZE;
}
int bloomFilterSize;
try {
bloomFilterSize = Integer.parseInt(bloomFilterSizeStr);
} catch (NumberFormatException e) {
throw new MalformedDataMapCommandException(
String.format("Invalid value of bloom filter size '%s', it should be an integer",
bloomFilterSizeStr));
}
// todo: reconsider the boundaries of bloom filter size
if (bloomFilterSize <= 0) {
throw new MalformedDataMapCommandException(
String.format("Invalid value of bloom filter size '%s', it should be greater than 0",
bloomFilterSizeStr));
}
return bloomFilterSize;
}
/**
* validate bloom DataMap BLOOM_FPP
* 1. BLOOM_FPP property is optional, 0.00001 will be the default value.
* 2. BLOOM_FPP should be (0, 1)
*/
private double validateAndGetBloomFilterFpp(DataMapSchema dmSchema)
throws MalformedDataMapCommandException {
String bloomFilterFppStr = dmSchema.getProperties().get(BLOOM_FPP);
if (StringUtils.isBlank(bloomFilterFppStr)) {
LOGGER.warn(
String.format("Bloom filter FPP is not configured for datamap %s, use default value %f",
dataMapName, DEFAULT_BLOOM_FILTER_FPP));
return DEFAULT_BLOOM_FILTER_FPP;
}
double bloomFilterFpp;
try {
bloomFilterFpp = Double.parseDouble(bloomFilterFppStr);
} catch (NumberFormatException e) {
throw new MalformedDataMapCommandException(
String.format("Invalid value of bloom filter fpp '%s', it should be an numeric",
bloomFilterFppStr));
}
if (bloomFilterFpp < 0 || bloomFilterFpp - 1 >= 0) {
throw new MalformedDataMapCommandException(
String.format("Invalid value of bloom filter fpp '%s', it should be in range 0~1",
bloomFilterFppStr));
}
return bloomFilterFpp;
}
/**
* validate bloom DataMap COMPRESS_BLOOM
* Default value is true
*/
private boolean validateAndGetBloomCompress(DataMapSchema dmSchema) {
String bloomCompress = dmSchema.getProperties().get(COMPRESS_BLOOM);
if (StringUtils.isBlank(bloomCompress)) {
LOGGER.warn(
String.format("Bloom compress is not configured for datamap %s, use default value %b",
dataMapName, DEFAULT_BLOOM_COMPRESS));
return DEFAULT_BLOOM_COMPRESS;
}
return Boolean.parseBoolean(bloomCompress);
}
@Override
public DataMapWriter createWriter(Segment segment, String shardName,
SegmentProperties segmentProperties) throws IOException {
LOGGER.info(
String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
this.dataMapName, getCarbonTable().getTableName() , shardName));
return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName,
this.dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties,
this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
}
@Override
public DataMapBuilder createBuilder(Segment segment, String shardName,
SegmentProperties segmentProperties) throws IOException {
return new BloomDataMapBuilder(getCarbonTable().getTablePath(), this.dataMapName,
this.dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties,
this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
}
/**
* returns all shard directories of bloom index files for query
* if bloom index files are merged we should get only one shard path
*/
public static Set<String> getAllShardPaths(String tablePath, String segmentId,
String dataMapName) {
String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
tablePath, segmentId, dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
Set<String> shardPaths = new HashSet<>();
boolean mergeShardInprogress = false;
CarbonFile mergeShardFile = null;
for (CarbonFile carbonFile : carbonFiles) {
if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) {
mergeShardFile = carbonFile;
} else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) {
mergeShardInprogress = true;
} else if (carbonFile.isDirectory()) {
shardPaths.add(FileFactory.getPath(carbonFile.getAbsolutePath()).toString());
}
}
if (mergeShardFile != null && !mergeShardInprogress) {
// should only get one shard path if mergeShard is generated successfully
shardPaths.clear();
shardPaths.add(FileFactory.getPath(mergeShardFile.getAbsolutePath()).toString());
}
return shardPaths;
}
@Override
public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
try {
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
shardPaths =
getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
for (String shard : shardPaths) {
if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
filteredShards.contains(new File(shard).getName())) {
// Filter out the tasks which are filtered through Main datamap.
// for merge shard, shard pruning delay to be done before pruning blocklet
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration()));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
}
}
} catch (Exception e) {
throw new IOException("Error occurs while init Bloom DataMap", e);
}
return dataMaps;
}
@Override
public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
Set<String> filteredShards = ((BloomDataMapDistributable) distributable).getFilteredShards();
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
return dataMaps;
}
@Override
public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
shardPaths =
getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
for (String shardPath : shardPaths) {
// Filter out the tasks which are filtered through Main datamap.
// for merge shard, shard pruning delay to be done before pruning blocklet
if (shardPath.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
filteredShards.contains(new File(shardPath).getName())) {
DataMapDistributable bloomDataMapDistributable =
new BloomDataMapDistributable(shardPath, filteredShards);
bloomDataMapDistributable.setSegment(segment);
bloomDataMapDistributable.setDataMapSchema(getDataMapSchema());
dataMapDistributableList.add(bloomDataMapDistributable);
}
}
return dataMapDistributableList;
}
@Override
public void fireEvent(Event event) {
}
@Override
public void clear(String segment) {
Set<String> shards = segmentMap.remove(segment);
if (shards != null) {
for (String shard : shards) {
for (CarbonColumn carbonColumn : dataMapMeta.getIndexedColumns()) {
cache.invalidate(new BloomCacheKeyValue.CacheKey(shard, carbonColumn.getColName()));
}
}
}
}
@Override
public synchronized void clear() {
if (segmentMap.size() > 0) {
List<String> segments = new ArrayList<>(segmentMap.keySet());
for (String segmentId : segments) {
clear(segmentId);
}
}
}
@Override
public void deleteDatamapData(Segment segment) throws IOException {
deleteSegmentDatamapData(segment.getSegmentNo());
}
@Override
public void deleteSegmentDatamapData(String segmentId) throws IOException {
try {
String datamapPath = CarbonTablePath
.getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName);
if (FileFactory.isFileExist(datamapPath)) {
CarbonFile file = FileFactory.getCarbonFile(datamapPath);
CarbonUtil.deleteFoldersAndFilesSilent(file);
}
clear(segmentId);
} catch (InterruptedException ex) {
throw new IOException("Failed to delete datamap for segment_" + segmentId);
}
}
@Override
public void deleteDatamapData() {
SegmentStatusManager ssm =
new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
try {
List<Segment> validSegments =
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTableForMV()).getValidSegments();
for (Segment segment : validSegments) {
deleteDatamapData(segment);
}
} catch (IOException e) {
LOGGER.error("drop datamap failed, failed to delete datamap directory");
}
}
@Override
public boolean willBecomeStale(TableOperation operation) {
switch (operation) {
case ALTER_RENAME:
return false;
case ALTER_DROP:
return true;
case ALTER_ADD_COLUMN:
return false;
case ALTER_CHANGE_DATATYPE:
return true;
case ALTER_COLUMN_RENAME:
return true;
case STREAMING:
return false;
case DELETE:
return true;
case UPDATE:
return true;
case PARTITION:
return true;
default:
return false;
}
}
@Override
public boolean isOperationBlocked(TableOperation operation, Object... targets) {
switch (operation) {
case ALTER_DROP: {
// alter table drop columns
// will be blocked if the columns in bloomfilter datamap
List<String> columnsToDrop = (List<String>) targets[0];
List<String> indexedColumnNames = dataMapMeta.getIndexedColumnNames();
for (String indexedcolumn : indexedColumnNames) {
for (String column : columnsToDrop) {
if (column.equalsIgnoreCase(indexedcolumn)) {
return true;
}
}
}
return false;
}
case ALTER_CHANGE_DATATYPE:
case ALTER_COLUMN_RENAME: {
// alter table change one column datatype, or rename
// will be blocked if the column in bloomfilter datamap
String columnToChangeDatatype = (String) targets[0];
List<String> indexedColumnNames = dataMapMeta.getIndexedColumnNames();
for (String indexedcolumn : indexedColumnNames) {
if (indexedcolumn.equalsIgnoreCase(columnToChangeDatatype)) {
return true;
}
}
return false;
}
default:
return false;
}
}
@Override
public DataMapMeta getMeta() {
return this.dataMapMeta;
}
@Override
public DataMapLevel getDataMapLevel() {
return DataMapLevel.CG;
}
@Override
public String getCacheSize() {
long sum = 0L;
for (Map.Entry<String, Set<String>> entry : segmentMap.entrySet()) {
for (String shardName : entry.getValue()) {
for (CarbonColumn carbonColumn : dataMapMeta.getIndexedColumns()) {
BloomCacheKeyValue.CacheValue cacheValue = cache
.getIfPresent(new BloomCacheKeyValue.CacheKey(shardName, carbonColumn.getColName()));
if (cacheValue != null) {
sum += cacheValue.getMemorySize();
}
}
}
} return 0 + ":" + sum;
}
}