blob: f8ee3b863fd36e89bea67feb2ceaf4f383e69330 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.stream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;
@InterfaceAudience.Internal
public class StreamPruner {
private CarbonTable carbonTable;
private FilterExecutor filterExecutor;
private int totalFileNums = 0;
public StreamPruner(CarbonTable carbonTable) {
this.carbonTable = carbonTable;
}
public void init(FilterResolverIntf filterExp) {
if (filterExp != null) {
// cache all columns
List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
for (CarbonDimension dimension : carbonTable.getVisibleDimensions()) {
if (!dimension.isComplex()) {
minMaxCacheColumns.add(dimension);
}
}
minMaxCacheColumns.addAll(carbonTable.getVisibleMeasures());
// prepare cardinality of all dimensions
List<ColumnSchema> listOfColumns =
carbonTable.getTableInfo().getFactTable().getListOfColumns();
// initial filter executor
SegmentProperties segmentProperties = new SegmentProperties(listOfColumns);
filterExecutor = FilterUtil.getFilterExecutorTree(
filterExp, segmentProperties, null, minMaxCacheColumns, false);
}
}
public List<StreamFile> prune(List<Segment> segments) throws IOException {
if (filterExecutor == null) {
// if filter is null, list all steam files
return listAllStreamFiles(segments, false);
} else {
List<StreamFile> streamFileList = new ArrayList<>();
for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
if (isScanRequire(streamFile)) {
// if stream file is required to scan
streamFileList.add(streamFile);
streamFile.setMinMaxIndex(null);
}
}
return streamFileList;
}
}
private boolean isScanRequire(StreamFile streamFile) {
// backward compatibility, old stream file without min/max index
if (streamFile.getMinMaxIndex() == null) {
return true;
}
byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
BitSet bitSet = filterExecutor
.isScanRequired(maxValue, minValue, streamFile.getMinMaxIndex().getIsMinMaxSet());
if (!bitSet.isEmpty()) {
return true;
} else {
return false;
}
}
// TODO optimize and move the code to StreamSegment , but it's in the streaming module.
private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
throws IOException {
List<StreamFile> streamFileList = new ArrayList<>();
for (Segment segment : segments) {
String segmentDir = CarbonTablePath.getSegmentPath(
carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo());
String indexFile = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
if (FileFactory.isFileExist(indexFile)) {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
indexReader.openThriftReader(indexFile);
try {
while (indexReader.hasNext()) {
BlockIndex blockIndex = indexReader.readBlockIndexInfo();
String filePath = segmentDir + File.separator + blockIndex.getFile_name();
long length = blockIndex.getFile_size();
StreamFile streamFile = new StreamFile(segment.getSegmentNo(), filePath, length);
streamFileList.add(streamFile);
if (withMinMax) {
if (blockIndex.getBlock_index() != null
&& blockIndex.getBlock_index().getMin_max_index() != null) {
streamFile.setMinMaxIndex(CarbonMetadataUtil
.convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index()));
}
}
}
} finally {
indexReader.closeThriftReader();
}
}
}
totalFileNums = streamFileList.size();
return streamFileList;
}
public int getTotalFileNums() {
return totalFileNums;
}
}