blob: e6e0737f3f40235578338297c9d12c61bf495678 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.gtrecord;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.FuzzyValueCombination;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.RecordComparators;
import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GTUtil;
import org.apache.kylin.gridtable.IGTComparator;
import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.DynamicFunctionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
public class CubeScanRangePlanner extends ScanRangePlannerBase {
private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class);
protected int maxScanRanges;
protected int maxFuzzyKeysPerSplit;
protected int maxFuzzyKeys;
//non-GT
protected CubeSegment cubeSegment;
protected CubeDesc cubeDesc;
protected Cuboid cuboid;
public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, //
Set<TblColRef> groupByDims, List<TblColRef> dynGroupsDims, List<TupleExpression> dynGroupExprs, //
Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
TupleFilter havingFilter, StorageContext context) {
this.context = context;
this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
this.cubeSegment = cubeSegment;
this.cubeDesc = cubeSegment.getCubeDesc();
this.cuboid = cuboid;
final CuboidToGridTableMapping mapping = context.getMapping();
this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment), mapping);
IGTComparator comp = gtInfo.getCodeSystem().getComparator();
//start key GTRecord compare to start key GTRecord
this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp);
//stop key GTRecord compare to stop key GTRecord
this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
//start key GTRecord compare to stop key GTRecord
this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
//replace the constant values in filter to dictionary codes
Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims);
groupByPushDown.addAll(dynGroupsDims);
this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown);
this.havingFilter = havingFilter;
this.gtDimensions = mapping.makeGridTableColumns(dimensions);
this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
// for dynamic cols, which are as appended columns to GTInfo
BitSet tmpGtDynCols = new BitSet();
this.tupleExpressionMap = Maps.newHashMap();
// for dynamic dimensions
for (int i = 0; i < dynGroupsDims.size(); i++) {
int c = mapping.getIndexOf(dynGroupsDims.get(i));
tmpGtDynCols.set(c);
TupleExpression tupleExpr = GTUtil.convertFilterColumnsAndConstants(dynGroupExprs.get(i), gtInfo, mapping,
groupByPushDown);
this.tupleExpressionMap.put(c, tupleExpr);
}
// for dynamic measures
Set<FunctionDesc> tmpRtAggrMetrics = Sets.newHashSet();
for (DynamicFunctionDesc dynFunc : dynFuncs) {
tmpRtAggrMetrics.addAll(dynFunc.getRuntimeFuncs());
int c = mapping.getIndexOf(dynFunc);
tmpGtDynCols.set(c);
this.tupleExpressionMap.put(c, GTUtil.convertFilterColumnsAndConstants(dynFunc.getTupleExpression(), gtInfo,
mapping, dynFunc.getRuntimeFuncMap(), groupByPushDown));
}
this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics);
this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
}
protected StorageContext context;
/**
* Construct GTScanRangePlanner with incomplete information. For UT only.
*/
public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
this.gtInfo = info;
IGTComparator comp = gtInfo.getCodeSystem().getComparator();
//start key GTRecord compare to start key GTRecord
this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp);
//stop key GTRecord compare to stop key GTRecord
this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
//start key GTRecord compare to stop key GTRecord
this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
this.gtFilter = gtFilter;
}
public GTScanRequest planScanRequest() {
GTScanRequest scanRequest;
List<GTScanRange> scanRanges = this.planScanRanges();
if (scanRanges != null && !scanRanges.isEmpty()) {
scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions)
.setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs)
.setFilterPushDown(gtFilter)//
.setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
.setExprsPushDown(tupleExpressionMap)//
.setAllowStorageAggregation(context.isNeedStorageAggregation())
.setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB())//
.setStoragePushDownLimit(context.getFinalPushDownLimit())
.setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter)
.createGTScanRequest();
} else {
scanRequest = null;
}
return scanRequest;
}
/**
* Overwrite this method to provide smarter storage visit plans
* @return
*/
public List<GTScanRange> planScanRanges() {
TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
GTScanRange scanRange = newScanRange(andDimRanges);
if (scanRange != null)
scanRanges.add(scanRange);
}
List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
mergedRanges = splitFuzzyKeys(mergedRanges);
mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
return mergedRanges;
}
private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
Set<TblColRef> ret = Sets.newHashSet();
for (TblColRef col : input) {
if (cubeDesc.hasHostColumn(col)) {
for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
ret.add(host);
}
} else {
ret.add(col);
}
}
return ret;
}
protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
GTRecord pkStart = new GTRecord(gtInfo);
GTRecord pkEnd = new GTRecord(gtInfo);
Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
for (ColumnRange range : andDimRanges) {
int col = range.column.getColumnDesc().getZeroBasedIndex();
if (!gtInfo.getPrimaryKey().get(col))
continue;
pkStart.set(col, range.begin);
pkEnd.set(col, range.end);
if (range.valueSet != null && !range.valueSet.isEmpty()) {
fuzzyValues.put(col, range.valueSet);
}
}
List<GTRecord> fuzzyKeys = buildFuzzyKeys(fuzzyValues);
return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
}
private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) {
ArrayList<GTRecord> result = Lists.newArrayList();
if (fuzzyValueSet.isEmpty())
return result;
// debug/profiling purpose
if (BackdoorToggles.getDisableFuzzyKey()) {
logger.info("The execution of this query will not use fuzzy key");
return result;
}
List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys);
for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
GTRecord fuzzy = new GTRecord(gtInfo);
for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
fuzzy.set(entry.getKey(), entry.getValue());
}
result.add(fuzzy);
}
return result;
}
protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
if (ranges.size() <= 1) {
return ranges;
}
// sort ranges by start key
Collections.sort(ranges, new Comparator<GTScanRange>() {
@Override
public int compare(GTScanRange a, GTScanRange b) {
return rangeStartComparator.compare(a.pkStart, b.pkStart);
}
});
// merge the overlap range
List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
int mergeBeginIndex = 0;
GTRecord mergeEnd = ranges.get(0).pkEnd;
for (int index = 1; index < ranges.size(); index++) {
GTScanRange range = ranges.get(index);
// if overlap, swallow it
if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) {
mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd);
continue;
}
// not overlap, split here
GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
mergedRanges.add(mergedRange);
// start new split
mergeBeginIndex = index;
mergeEnd = range.pkEnd;
}
// don't miss the last range
GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
mergedRanges.add(mergedRange);
return mergedRanges;
}
private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
GTScanRange first = ranges.get(0);
if (ranges.size() == 1)
return first;
GTRecord start = first.pkStart;
GTRecord end = first.pkEnd;
Set<GTRecord> newFuzzyKeys = Sets.newLinkedHashSet();
boolean hasNonFuzzyRange = false;
for (GTScanRange range : ranges) {
hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty();
newFuzzyKeys.addAll(range.fuzzyKeys);
end = rangeEndComparator.max(end, range.pkEnd);
}
// if any range is non-fuzzy, then all fuzzy keys must be cleared
// too many fuzzy keys will slow down HBase scan
if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) {
if (newFuzzyKeys.size() > maxFuzzyKeys) {
logger.debug("too many FuzzyKeys, clean it!");
}
newFuzzyKeys.clear();
}
return new GTScanRange(start, end, Lists.newArrayList(newFuzzyKeys));
}
protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
if (ranges.size() <= maxRanges) {
return ranges;
}
// TODO: check the distance between range and merge the large distance range
List<GTScanRange> result = new ArrayList<GTScanRange>(1);
GTScanRange mergedRange = mergeKeyRange(ranges);
result.add(mergedRange);
result = splitFuzzyKeys(result);
return result;
}
private List<GTScanRange> splitFuzzyKeys(List<GTScanRange> mergedRanges) {
List<GTScanRange> result = Lists.newArrayList();
for (GTScanRange range : mergedRanges) {
// if the fuzzy key is huge but still within in split range, then we split fuzzy keys to multiple ones.
if (range.fuzzyKeys.size() > maxFuzzyKeysPerSplit && range.fuzzyKeys.size() <= maxFuzzyKeys) {
List<GTRecord> fuzzyKeys = range.fuzzyKeys;
Collections.sort(fuzzyKeys);
int nSplit = (fuzzyKeys.size() - 1) / maxFuzzyKeysPerSplit + 1;
int nFuzzyKeysPerSplit = fuzzyKeys.size() / nSplit;
int startIndex = 0;
for (int i = 1; i <= nSplit; i++) {
int endIndex = i == nSplit ? fuzzyKeys.size() : i * nFuzzyKeysPerSplit;
List<GTRecord> subFuzzyKeys = fuzzyKeys.subList(startIndex, endIndex);
result.add(new GTScanRange(range.pkStart, range.pkEnd, subFuzzyKeys));
startIndex = endIndex;
}
logger.debug(String.format(Locale.ROOT, "large FuzzyKeys split size : %d", result.size()));
} else {
result.add(range);
}
}
return result;
}
public int getMaxScanRanges() {
return maxScanRanges;
}
public void setMaxScanRanges(int maxScanRanges) {
this.maxScanRanges = maxScanRanges;
}
}