blob: c2f76410edcdc593947773a123233979fd5f72a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.stream.core.storage.columnar;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.BuiltInFunctionTransformer;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.dimension.IDimensionEncodingMap;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.stream.core.query.IStreamingGTSearcher;
import org.apache.kylin.stream.core.query.IStreamingSearchResult;
import org.apache.kylin.stream.core.query.ResponseResultSchema;
import org.apache.kylin.stream.core.query.ResultCollector;
import org.apache.kylin.stream.core.query.StreamingSearchContext;
import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo;
import org.apache.kylin.stream.core.storage.columnar.protocol.FragmentMetaInfo;
import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker;
import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult;
import org.apache.kylin.dimension.TimeDerivedColumnType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* FragmentFileSearcher is responsible to scan the columnar based storage on disk and deal with all the bytes level details for each DataFragment and return the result as GTRecords.
*
*/
public class FragmentFileSearcher implements IStreamingGTSearcher {
private static Logger logger = LoggerFactory.getLogger(FragmentFileSearcher.class);
private FragmentData fragmentData;
private DataSegmentFragment fragment;
public FragmentFileSearcher(DataSegmentFragment fragment, FragmentData fragmentData) {
this.fragment = fragment;
this.fragmentData = fragmentData;
}
@Override
public void search(StreamingSearchContext searchContext, ResultCollector collector) throws IOException {
String timezone = searchContext.getCubeDesc().getConfig().getStreamingDerivedTimeTimezone();
long timezoneOffset = 0;
if (timezone != null && timezone.length() > 0) {
timezoneOffset = TimeZone.getTimeZone(timezone).getRawOffset();
}
FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo();
CuboidMetaInfo cuboidMetaInfo;
if (searchContext.hitBasicCuboid()) {
cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
} else {
cuboidMetaInfo = fragmentMetaInfo.getCuboidMetaInfo(searchContext.getHitCuboid());
if (cuboidMetaInfo == null) {
logger.warn("the cuboid:{} is not exist in the fragment:{}, use basic cuboid instead",
searchContext.getHitCuboid(), fragment.getFragmentId());
cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
}
}
ResponseResultSchema responseSchema = searchContext.getRespResultSchema();
TblColRef[] dimensions = responseSchema.getDimensions();
FunctionDesc[] metrics = responseSchema.getMetrics();
Map<TblColRef, Dictionary<String>> dictMap = fragmentData.getDimensionDictionaries(dimensions);
CubeDesc cubeDesc = responseSchema.getCubeDesc();
List<MeasureDesc> allMeasures = cubeDesc.getMeasures();
Map<FunctionDesc, MeasureDesc> funcMeasureMap = Maps.newHashMap();
for (MeasureDesc measure : allMeasures) {
funcMeasureMap.put(measure.getFunction(), measure);
}
MeasureDesc[] measures = new MeasureDesc[metrics.length];
for (int i = 0; i < measures.length; i++) {
measures[i] = funcMeasureMap.get(metrics[i]);
}
DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo.getDimensionEncodings(cubeDesc, dimensions,
dictMap);
ColumnarMetricsEncoding[] metricsEncodings = ParsedStreamingCubeInfo.getMetricsEncodings(measures);
ColumnarRecordCodec recordCodec = new ColumnarRecordCodec(dimensionEncodings, metricsEncodings);
// change the unEvaluable dimensions to groupBy
Set<TblColRef> unEvaluateDims = Sets.newHashSet();
TupleFilter fragmentFilter = null;
if (searchContext.getFilter() != null) {
fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec, dimensions,
new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims, timezoneOffset);
}
if (ConstantTupleFilter.TRUE == fragmentFilter) {
fragmentFilter = null;
} else if (ConstantTupleFilter.FALSE == fragmentFilter) {
collector.collectSearchResult(IStreamingSearchResult.EMPTY_RESULT);
}
Set<TblColRef> groups = searchContext.getGroups();
if (!unEvaluateDims.isEmpty()) {
searchContext.addNewGroups(unEvaluateDims);
groups = Sets.union(groups, unEvaluateDims);
}
collector.collectSearchResult(new FragmentSearchResult(fragment, fragmentData, cuboidMetaInfo, responseSchema, fragmentFilter, groups, searchContext.getHavingFilter(),
recordCodec));
}
private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, TupleFilter rootFilter,
ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
final Set<TblColRef> unEvaluableColumnCollector, long timezoneOffset) {
Map<TblColRef, Integer> colMapping = Maps.newHashMap();
for (int i = 0; i < dimensions.length; i++) {
colMapping.put(dimensions[i], i);
}
byte[] bytes = TupleFilterSerializer.serialize(rootFilter, null, StringCodeSystem.INSTANCE);
TupleFilter filter = TupleFilterSerializer.deserialize(bytes, StringCodeSystem.INSTANCE);
BuiltInFunctionTransformer builtInFunctionTransformer = new BuiltInFunctionTransformer(dimEncodingMap);
filter = builtInFunctionTransformer.transform(filter);
FragmentFilterConverter fragmentFilterConverter = new FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector,
colMapping, recordCodec);
fragmentFilterConverter.setTimezoneOffset(timezoneOffset);
filter = fragmentFilterConverter.transform(filter);
filter = new FilterOptimizeTransformer().transform(filter);
return filter;
}
protected static class FragmentFilterConverter implements ITupleFilterTransformer {
protected final Set<TblColRef> unEvaluableColumnCollector;
protected final Map<TblColRef, Integer> colMapping;
private CompareFilterTimeRangeChecker filterTimeRangeChecker;
private ColumnarRecordCodec recordCodec;
transient ByteBuffer buf;
private long timezoneOffset = 0;
public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, Set<TblColRef> unEvaluableColumnCollector,
Map<TblColRef, Integer> colMapping, ColumnarRecordCodec recordCodec) {
this.unEvaluableColumnCollector = unEvaluableColumnCollector;
this.recordCodec = recordCodec;
this.colMapping = colMapping;
if (fragmentMetaInfo.hasValidEventTimeRange()) {
this.filterTimeRangeChecker = new CompareFilterTimeRangeChecker(fragmentMetaInfo.getMinEventTime(),
fragmentMetaInfo.getMaxEventTime(), true);
}
buf = ByteBuffer.allocate(recordCodec.getMaxDimLength());
}
protected int mapCol(TblColRef col) {
Integer i = colMapping.get(col);
return i == null ? -1 : i;
}
@Override
public TupleFilter transform(TupleFilter filter) {
if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT
&& !TupleFilter.isEvaluableRecursively(filter)) {
TupleFilter.collectColumns(filter, unEvaluableColumnCollector);
return ConstantTupleFilter.TRUE;
}
// shortcut for unEvaluatable filter
if (!filter.isEvaluable()) {
TupleFilter.collectColumns(filter, unEvaluableColumnCollector);
return ConstantTupleFilter.TRUE;
}
if (filter instanceof CompareTupleFilter) {
return translateCompareFilter((CompareTupleFilter) filter);
} else if (filter instanceof LogicalTupleFilter) {
@SuppressWarnings("unchecked")
ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) filter.getChildren().listIterator();
while (childIterator.hasNext()) {
TupleFilter transformed = transform(childIterator.next());
if (transformed != null) {
childIterator.set(transformed);
} else {
throw new IllegalStateException("Should not be null");
}
}
}
return filter;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected TupleFilter translateCompareFilter(CompareTupleFilter oldCompareFilter) {
// extract ColumnFilter & ConstantFilter
TblColRef externalCol = oldCompareFilter.getColumn();
if (externalCol == null) {
return oldCompareFilter;
}
Collection constValues = oldCompareFilter.getValues();
if (constValues == null || constValues.isEmpty()) {
return oldCompareFilter;
}
if (TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && filterTimeRangeChecker != null) {
CheckResult checkResult = filterTimeRangeChecker.check(oldCompareFilter,
TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()), timezoneOffset);
if (checkResult == CheckResult.INCLUDED) {
return ConstantTupleFilter.TRUE;
} else if (checkResult == CheckResult.EXCLUDED) {
return ConstantTupleFilter.FALSE;
}
}
//CompareTupleFilter containing BuiltInFunctionTupleFilter will not reach here caz it will be transformed by BuiltInFunctionTransformer
CompareTupleFilter newCompareFilter = new CompareTupleFilter(oldCompareFilter.getOperator());
newCompareFilter.addChild(new ColumnTupleFilter(externalCol));
//for CompareTupleFilter containing dynamicVariables, the below codes will actually replace dynamicVariables
//with normal ConstantTupleFilter
Object firstValue = constValues.iterator().next();
int col = mapCol(externalCol);
TupleFilter result;
ByteArray code;
// translate constant into code
switch (newCompareFilter.getOperator()) {
case EQ:
case IN:
Set newValues = Sets.newHashSet();
for (Object value : constValues) {
code = translate(col, value, 0);
if (code != null)
newValues.add(code);
}
if (newValues.isEmpty()) {
result = ConstantTupleFilter.FALSE;
} else {
newCompareFilter.addChild(new ConstantTupleFilter(newValues));
result = newCompareFilter;
}
break;
case NOTIN:
Set notInValues = Sets.newHashSet();
for (Object value : constValues) {
code = translate(col, value, 0);
if (code != null)
notInValues.add(code);
}
if (notInValues.isEmpty()) {
result = ConstantTupleFilter.TRUE;
} else {
newCompareFilter.addChild(new ConstantTupleFilter(notInValues));
result = newCompareFilter;
}
break;
case NEQ:
code = translate(col, firstValue, 0);
if (code == null) {
result = ConstantTupleFilter.TRUE;
} else {
newCompareFilter.addChild(new ConstantTupleFilter(code));
result = newCompareFilter;
}
break;
case LT:
code = translate(col, firstValue, 0);
if (code == null) {
code = translate(col, firstValue, -1);
if (code == null)
result = ConstantTupleFilter.FALSE;
else
result = newCompareFilter(FilterOperatorEnum.LTE, externalCol, code);
} else {
newCompareFilter.addChild(new ConstantTupleFilter(code));
result = newCompareFilter;
}
break;
case LTE:
code = translate(col, firstValue, -1);
if (code == null) {
result = ConstantTupleFilter.FALSE;
} else {
newCompareFilter.addChild(new ConstantTupleFilter(code));
result = newCompareFilter;
}
break;
case GT:
code = translate(col, firstValue, 0);
if (code == null) {
code = translate(col, firstValue, 1);
if (code == null)
result = ConstantTupleFilter.FALSE;
else
result = newCompareFilter(FilterOperatorEnum.GTE, externalCol, code);
} else {
newCompareFilter.addChild(new ConstantTupleFilter(code));
result = newCompareFilter;
}
break;
case GTE:
code = translate(col, firstValue, 1);
if (code == null) {
result = ConstantTupleFilter.FALSE;
} else {
newCompareFilter.addChild(new ConstantTupleFilter(code));
result = newCompareFilter;
}
break;
default:
throw new IllegalStateException("Cannot handle operator " + newCompareFilter.getOperator());
}
return result;
}
private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, ByteArray code) {
CompareTupleFilter r = new CompareTupleFilter(op);
r.addChild(new ColumnTupleFilter(col));
r.addChild(new ConstantTupleFilter(code));
return r;
}
protected ByteArray translate(int col, Object value, int roundingFlag) {
try {
buf.clear();
recordCodec.encodeDimension(col, value, roundingFlag, buf);
int length = buf.position();
return ByteArray.copyOf(buf.array(), 0, length);
} catch (IllegalArgumentException ex) {
return null;
}
}
public void setTimezoneOffset(long timezoneOffset) {
this.timezoneOffset = timezoneOffset;
}
}
}