blob: 7211ec1fdb34f3488d66f0e75f9bd7f5e9146436 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.dimension.Dictionary;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import org.apache.kylin.storage.translate.ColumnValueRange;
import org.apache.kylin.storage.translate.DerivedFilterTranslator;
import org.apache.kylin.storage.translate.HBaseKeyRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@SuppressWarnings("unused")
public class CubeStorageQuery implements IStorageQuery {
private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
private static final int MERGE_KEYRANGE_THRESHOLD = 100;
private final CubeInstance cubeInstance;
private final CubeDesc cubeDesc;
private final String uuid;
public CubeStorageQuery(CubeInstance cube) {
this.cubeInstance = cube;
this.cubeDesc = cube.getDescriptor();
this.uuid = cube.getUuid();
}
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
// allow custom measures hack
notifyBeforeStorageQuery(sqlDigest);
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
Collection<TblColRef> dimensions = new HashSet<TblColRef>();
Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
// all dimensions = groups + others
Set<TblColRef> others = Sets.newHashSet(dimensions);
others.removeAll(groups);
// expand derived
Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
othersD.removeAll(groupsD);
// identify cuboid
Set<TblColRef> dimensionsD = Sets.newHashSet();
dimensionsD.addAll(groupsD);
dimensionsD.addAll(othersD);
Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
context.setCuboid(cuboid);
// isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
boolean isExactAggregation = isExactAggregation(cuboid, groupsD, othersD, singleValuesD, derivedPostAggregation);
context.setExactAggregation(isExactAggregation);
// translate filter for scan range and compose returning groups for coprocessor, note:
// - columns on non-evaluatable filter have to return
// - columns on loosened filter (due to derived translation) have to return
Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
collectNonEvaluable(filter, groupsCopD);
TupleFilter filterD = translateDerived(filter, groupsCopD);
// translate filter into segment scan ranges
List<HBaseKeyRange> scans = buildScanRanges(flattenToOrAndFilter(filterD), dimensionsD);
// check involved measures, build value decoder for each each family:column
List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHbaseMapping(), metrics, context);
// memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
Connection conn = HBaseConnection.get(context.getConnUrl());
// notice we're passing filterD down to storage instead of flatFilter
return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
}
private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
for (FunctionDesc func : sqlDigest.aggregations) {
if (!func.isDimensionAsMetric()) {
metrics.add(func);
}
}
for (TblColRef column : sqlDigest.allColumns) {
// skip measure columns
if (sqlDigest.metricColumns.contains(column)) {
continue;
}
dimensions.add(column);
}
}
private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
for (FunctionDesc metric : metrics) {
if (metric.getMeasureType().onlyAggrInBaseCuboid())
return Cuboid.getBaseCuboid(cubeDesc);
}
long cuboidID = 0;
for (TblColRef column : dimensions) {
int index = cubeDesc.getRowkey().getColumnBitIndex(column);
cuboidID |= 1L << index;
}
return Cuboid.findById(cubeDesc, cuboidID);
}
private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
boolean exact = true;
if (cuboid.requirePostAggregation()) {
exact = false;
logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
}
// derived aggregation is bad, unless expanded columns are already in group by
if (groups.containsAll(derivedPostAggregation) == false) {
exact = false;
logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
}
// other columns (from filter) is bad, unless they are ensured to have single value
if (singleValuesD.containsAll(othersD) == false) {
exact = false;
logger.info("exactAggregation is false because some column not on group by: " + othersD //
+ " (single value column: " + singleValuesD + ")");
}
if (exact) {
logger.info("exactAggregation is true");
}
return exact;
}
private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
Set<TblColRef> expanded = Sets.newHashSet();
for (TblColRef col : cols) {
if (cubeDesc.hasHostColumn(col)) {
DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
for (TblColRef hostCol : hostInfo.columns) {
expanded.add(hostCol);
if (hostInfo.isOneToOne == false)
derivedPostAggregation.add(hostCol);
}
} else {
expanded.add(col);
}
}
return expanded;
}
@SuppressWarnings("unchecked")
private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
Collection<? extends TupleFilter> toCheck;
if (filter instanceof CompareTupleFilter) {
toCheck = Collections.singleton(filter);
} else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
toCheck = filter.getChildren();
} else {
return (Set<TblColRef>) Collections.EMPTY_SET;
}
Set<TblColRef> result = Sets.newHashSet();
for (TupleFilter f : toCheck) {
if (f instanceof CompareTupleFilter) {
CompareTupleFilter compFilter = (CompareTupleFilter) f;
// is COL=const ?
if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
result.add(compFilter.getColumn());
}
}
}
// expand derived
Set<TblColRef> resultD = Sets.newHashSet();
for (TblColRef col : result) {
if (cubeDesc.isExtendedColumn(col)) {
throw new CubeDesc.CannotFilterExtendedColumnException(col);
}
if (cubeDesc.isDerived(col)) {
DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
if (hostInfo.isOneToOne) {
for (TblColRef hostCol : hostInfo.columns) {
resultD.add(hostCol);
}
}
//if not one2one, it will be pruned
} else {
resultD.add(col);
}
}
return resultD;
}
private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
if (filter == null)
return;
if (filter.isEvaluable()) {
for (TupleFilter child : filter.getChildren())
collectNonEvaluable(child, collector);
} else {
collectColumnsRecursively(filter, collector);
}
}
private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
if (filter == null)
return;
if (filter instanceof ColumnTupleFilter) {
collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
}
for (TupleFilter child : filter.getChildren()) {
collectColumnsRecursively(child, collector);
}
}
private void collectColumns(TblColRef col, Set<TblColRef> collector) {
if (cubeDesc.isExtendedColumn(col)) {
throw new CubeDesc.CannotFilterExtendedColumnException(col);
}
if (cubeDesc.isDerived(col)) {
DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
for (TblColRef h : hostInfo.columns)
collector.add(h);
} else {
collector.add(col);
}
}
@SuppressWarnings("unchecked")
private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
if (filter == null)
return filter;
if (filter instanceof CompareTupleFilter) {
return translateDerivedInCompare((CompareTupleFilter) filter, collector);
}
List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
boolean modified = false;
for (TupleFilter child : children) {
TupleFilter translated = translateDerived(child, collector);
newChildren.add(translated);
if (child != translated)
modified = true;
}
if (modified) {
filter = replaceChildren(filter, newChildren);
}
return filter;
}
private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
if (filter instanceof LogicalTupleFilter) {
LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
r.addChildren(newChildren);
return r;
} else
throw new IllegalStateException("Cannot replaceChildren on " + filter);
}
private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
if (compf.getColumn() == null || compf.getValues().isEmpty())
return compf;
TblColRef derived = compf.getColumn();
if (cubeDesc.isExtendedColumn(derived)) {
throw new CubeDesc.CannotFilterExtendedColumnException(derived);
}
if (cubeDesc.isDerived(derived) == false) {
return compf;
}
DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
CubeSegment seg = cubeInstance.getLatestReadySegment();
LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
TupleFilter translatedFilter = translated.getFirst();
boolean loosened = translated.getSecond();
if (loosened) {
collectColumnsRecursively(translatedFilter, collector);
}
return translatedFilter;
}
private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
StorageContext context) {
Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
for (FunctionDesc aggrFunc : metrics) {
Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
if (hbCols.isEmpty()) {
throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
}
HBaseColumnDesc bestHBCol = null;
int bestIndex = -1;
for (HBaseColumnDesc hbCol : hbCols) {
bestHBCol = hbCol;
bestIndex = hbCol.findMeasure(aggrFunc);
// we used to prefer specific measure over another (holistic distinct count), now it's gone
break;
}
RowValueDecoder codec = codecMap.get(bestHBCol);
if (codec == null) {
codec = new RowValueDecoder(bestHBCol);
codecMap.put(bestHBCol, codec);
}
codec.setProjectIndex(bestIndex);
}
return new ArrayList<RowValueDecoder>(codecMap.values());
}
//check TupleFilter.flatFilter's comment
private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
if (filter == null)
return null;
// core
TupleFilter flatFilter = filter.flatFilter();
// normalize to OR-AND filter
if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
f.addChild(flatFilter);
flatFilter = f;
}
if (flatFilter.getOperator() != FilterOperatorEnum.OR)
throw new IllegalStateException();
return flatFilter;
}
private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
List<HBaseKeyRange> result = Lists.newArrayList();
logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
logger.info("READY segs count: " + segs.size());
// build row key range for each cube segment
StringBuilder sb = new StringBuilder("hbasekeyrange trace: ");
for (CubeSegment cubeSeg : segs) {
// consider derived (lookup snapshot), filter on dimension may
// differ per segment
List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
if (orAndDimRanges == null) { // has conflict
continue;
}
List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
scanRanges.add(rowKeyRange);
}
//log
sb.append(scanRanges.size() + "=(mergeoverlap)>");
List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
//log
sb.append(mergedRanges.size() + "=(mergetoomany)>");
mergedRanges = mergeTooManyRanges(mergedRanges);
//log
sb.append(mergedRanges.size() + ",");
result.addAll(mergedRanges);
}
logger.info(sb.toString());
logger.info("hbasekeyrange count: " + result.size());
dropUnhitSegments(result);
logger.info("hbasekeyrange count after dropping unhit :" + result.size());
//TODO: should use LazyRowKeyEncoder.getRowKeysDifferentShards like CubeHBaseRPC, not do so because v1 query engine is retiring. not worth changing it
if (cubeDesc.isEnableSharding()) {
result = duplicateRangeByShard(result);
}
logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size());
return result;
}
private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
List<Collection<ColumnValueRange>> result = Lists.newArrayList();
if (flatFilter == null) {
result.add(Collections.<ColumnValueRange> emptyList());
return result;
}
for (TupleFilter andFilter : flatFilter.getChildren()) {
if (andFilter.getOperator() != FilterOperatorEnum.AND) {
throw new IllegalStateException("Filter should be AND instead of " + andFilter);
}
Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
if (andRanges != null) {
result.add(andRanges);
}
}
return preprocessConstantConditions(result);
}
private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
boolean globalAlwaysTrue = false;
Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
while (iterator.hasNext()) {
Collection<ColumnValueRange> andRanges = iterator.next();
Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
boolean hasAlwaysFalse = false;
while (iterator2.hasNext()) {
ColumnValueRange range = iterator2.next();
if (range.satisfyAll())
iterator2.remove();
else if (range.satisfyNone())
hasAlwaysFalse = true;
}
if (hasAlwaysFalse) {
iterator.remove();
} else if (andRanges.isEmpty()) {
globalAlwaysTrue = true;
break;
}
}
if (globalAlwaysTrue) {
orAndRanges.clear();
orAndRanges.add(Collections.<ColumnValueRange> emptyList());
}
return orAndRanges;
}
// return empty collection to mean true; return null to mean false
@SuppressWarnings("unchecked")
private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
for (TupleFilter filter : andFilters) {
if ((filter instanceof CompareTupleFilter) == false) {
continue;
}
CompareTupleFilter comp = (CompareTupleFilter) filter;
if (comp.getColumn() == null) {
continue;
}
ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
andMerge(range, rangeMap);
}
// a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
Iterator<ColumnValueRange> it = rangeMap.values().iterator();
while (it.hasNext()) {
ColumnValueRange range = it.next();
range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
if (range.satisfyAll())
it.remove();
else if (range.satisfyNone())
return null;
}
return rangeMap.values();
}
private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
ColumnValueRange columnRange = rangeMap.get(range.getColumn());
if (columnRange == null) {
rangeMap.put(range.getColumn(), range);
} else {
columnRange.andMerge(range);
}
}
private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
if (keyRanges.size() <= 1) {
return keyRanges;
}
if (logger.isDebugEnabled()) {
logger.debug("Merging key range from " + keyRanges.size());
}
// sort ranges by start key
Collections.sort(keyRanges);
// merge the overlap range
List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
int beginIndex = 0;
byte[] maxStopKey = keyRanges.get(0).getStopKey();
for (int index = 0; index < keyRanges.size(); index++) {
HBaseKeyRange keyRange = keyRanges.get(index);
if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
// merge the current key ranges
HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
mergedRanges.add(mergedRange);
// start new merge
beginIndex = index;
}
if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
// update the stop key
maxStopKey = keyRange.getStopKey();
}
}
// merge last range
HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
mergedRanges.add(mergedRange);
if (logger.isDebugEnabled()) {
logger.debug("Merging key range to " + mergedRanges.size());
}
return mergedRanges;
}
private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
HBaseKeyRange keyRange = keyRanges.get(from);
int mergeSize = to - from + 1;
if (mergeSize > 1) {
// merge range from mergeHeader to i - 1
CubeSegment cubeSegment = keyRange.getCubeSegment();
Cuboid cuboid = keyRange.getCuboid();
byte[] startKey = keyRange.getStartKey();
byte[] stopKey = keyRange.getStopKey();
long partitionColumnStartDate = Long.MAX_VALUE;
long partitionColumnEndDate = 0;
TreeSet<Pair<byte[], byte[]>> newFuzzyKeys = new TreeSet<>(new Comparator<Pair<byte[], byte[]>>() {
@Override
public int compare(Pair<byte[], byte[]> o1, Pair<byte[], byte[]> o2) {
int partialResult = Bytes.compareTo(o1.getFirst(), o2.getFirst());
if (partialResult != 0) {
return partialResult;
} else {
return Bytes.compareTo(o1.getSecond(), o2.getSecond());
}
}
});
List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
boolean hasNonFuzzyRange = false;
for (int k = from; k <= to; k++) {
HBaseKeyRange nextRange = keyRanges.get(k);
hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
stopKey = nextRange.getStopKey();
}
if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
}
if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
}
}
// if any range is non-fuzzy, then all fuzzy keys must be cleared
if (hasNonFuzzyRange) {
newFuzzyKeys.clear();
}
partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, Lists.newArrayList(newFuzzyKeys), newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
}
return keyRange;
}
private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
return keyRanges;
}
// TODO: check the distance between range. and merge the large distance range
List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
mergedRanges.add(mergedRange);
return mergedRanges;
}
private void dropUnhitSegments(List<HBaseKeyRange> scans) {
if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
Iterator<HBaseKeyRange> iterator = scans.iterator();
while (iterator.hasNext()) {
HBaseKeyRange scan = iterator.next();
if (scan.hitSegment() == false) {
iterator.remove();
}
}
}
}
private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) {
List<HBaseKeyRange> ret = Lists.newArrayList();
for (HBaseKeyRange scan : scans) {
CubeSegment segment = scan.getCubeSegment();
byte[] startKey = scan.getStartKey();
byte[] stopKey = scan.getStopKey();
short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId());
short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId());
for (short i = 0; i < cuboidShardNum; ++i) {
short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards());
byte[] newStartKey = duplicateKeyAndChangeShard(newShard, startKey);
byte[] newStopKey = duplicateKeyAndChangeShard(newShard, stopKey);
HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, //
scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
ret.add(newRange);
}
}
Collections.sort(ret, new Comparator<HBaseKeyRange>() {
@Override
public int compare(HBaseKeyRange o1, HBaseKeyRange o2) {
return Bytes.compareTo(o1.getStartKey(), o2.getStartKey());
}
});
return ret;
}
private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) {
byte[] ret = Arrays.copyOf(bytes, bytes.length);
BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN);
return ret;
}
private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) {
return;
}
int rowSizeEst = dimensions.size() * 3;
for (RowValueDecoder decoder : valueDecoders) {
MeasureDesc[] measures = decoder.getMeasures();
BitSet projectionIndex = decoder.getProjectionIndex();
for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
FunctionDesc func = measures[i].getFunction();
// FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
}
}
long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
if (rowEst > 0) {
logger.info("Memory budget is set to: " + rowEst);
context.setThreshold((int) rowEst);
} else {
logger.info("Memory budget is not set.");
}
}
private void setLimit(TupleFilter filter, StorageContext context) {
boolean goodAggr = context.isExactAggregation();
boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
boolean goodSort = !context.hasSort();
if (goodAggr && goodFilter && goodSort) {
logger.info("Enable limit " + context.getLimit());
context.enableLimit();
}
}
private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
}
private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
for (MeasureDesc measure : cubeDesc.getMeasures()) {
MeasureType<?> measureType = measure.getFunction().getMeasureType();
measureType.adjustSqlDigest(measure, sqlDigest);
}
}
}