blob: eadb698c061924a7fd073e219427580baa11860f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kylin.query.relnode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
import org.apache.kylin.metadata.cube.model.DimensionRangeInfo;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.model.graph.JoinsGraph;
import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.apache.kylin.metadata.query.QueryMetrics;
import org.apache.kylin.metadata.realization.HybridRealization;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.query.routing.RealizationCheck;
import org.apache.kylin.query.schema.OLAPSchema;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.val;
public class OLAPContext {
public static final String PRM_ACCEPT_PARTIAL_RESULT = "AcceptPartialResult";
static final ThreadLocal<Map<String, String>> _localPrarameters = new ThreadLocal<>();
static final ThreadLocal<Map<Integer, OLAPContext>> _localContexts = new ThreadLocal<>();
private static final Logger logger = LoggerFactory.getLogger(OLAPContext.class);
public final int id;
public final StorageContext storageContext;
// query info
public OLAPSchema olapSchema = null;
public OLAPTableScan firstTableScan = null; // to be fact table scan except "select * from lookupTable"
public Set<OLAPTableScan> allTableScans = new LinkedHashSet<>();
public Set<OLAPJoinRel> allOlapJoins = new HashSet<>();
public Set<MeasureDesc> involvedMeasure = new HashSet<>();
public TupleInfo returnTupleInfo = null;
public boolean afterAggregate = false;
public boolean afterHavingClauseFilter = false;
public boolean afterLimit = false;
public boolean limitPrecedesAggr = false;
public boolean hasWindow = false;
// cube metadata
public IRealization realization;
public RealizationCheck realizationCheck = new RealizationCheck();
public Set<TblColRef> allColumns = new HashSet<>();
public Set<TblColRef> metricsColumns = new HashSet<>();
public List<FunctionDesc> aggregations = new ArrayList<>(); // storage level measure type, on top of which various sql aggr function may apply
public Set<TblColRef> filterColumns = new LinkedHashSet<>();
public List<JoinDesc> joins = new LinkedList<>();
// rewrite info
public Map<String, RelDataType> rewriteFields = new HashMap<>();
// hive query
public String sql = "";
protected boolean isExactlyAggregate = false;
protected boolean hasBitmapMeasure = false;
protected boolean isExactlyFastBitmap = false;
boolean afterTopJoin = false;
boolean fixedModel;
List<SQLDigest.OrderEnum> sortOrders;
SQLDigest sqlDigest;
private OLAPRel topNode = null; // the context's toppest node
private RelNode parentOfTopNode = null; // record the JoinRel that cuts off its children into new context(s), in other case it should be null
private int limit = Integer.MAX_VALUE;
private boolean hasJoin = false;
private boolean hasPreCalcJoin = false;
private boolean hasAgg = false;
private boolean hasSelected = false;
private Set<TblColRef> groupByColumns = Sets.newLinkedHashSet();
// collect inner columns in group keys
// this filed is used by CC proposer only
private Set<TableColRefWithRel> innerGroupByColumns = Sets.newLinkedHashSet();
// collect inner columns in filter
// this filed is used by CC proposer only
private Set<TblColRef> innerFilterColumns = Sets.newLinkedHashSet();
private Set<TblColRef> subqueryJoinParticipants = new HashSet<>();//subqueryJoinParticipants will be added to groupByColumns(only when other group by co-exists) and allColumns
private Set<TblColRef> outerJoinParticipants = new HashSet<>();// join keys in the direct outer join (without agg, union etc in between)
private List<FunctionDesc> constantAggregations = new ArrayList<>(); // agg like min(2),max(2),avg(2), not including count(1)
private List<RexNode> expandedFilterConditions = new LinkedList<>();
private Set<TableRef> notNullTables = new HashSet<>(); // tables which have not null filter(s), can be used in join-match-optimization
private JoinsGraph joinsGraph;
private List<TblColRef> sortColumns;
private Set<String> containedNotSupportedFunc = Sets.newHashSet();
private Map<TblColRef, TblColRef> groupCCColRewriteMapping = new HashMap<>();
private boolean hasAdminPermission = false;
private boolean needToManyDerived;
private String modelAlias;
public OLAPContext(int seq) { = seq;
this.storageContext = new StorageContext(seq);
this.sortColumns = Lists.newArrayList();
this.sortOrders = Lists.newArrayList();
public static void setParameters(Map<String, String> parameters) {
public static void clearParameter() {
public static void registerContext(OLAPContext ctx) {
if (_localContexts.get() == null) {
Map<Integer, OLAPContext> contextMap = new HashMap<>();
_localContexts.get().put(, ctx);
public static Collection<OLAPContext> getThreadLocalContexts() {
Map<Integer, OLAPContext> map = _localContexts.get();
return map == null ? null : map.values();
public static OLAPContext getThreadLocalContextById(int id) {
Map<Integer, OLAPContext> map = _localContexts.get();
return map.get(id);
public static void clearThreadLocalContexts() {
public static void clearThreadLocalContextById(int id) {
Map<Integer, OLAPContext> map = _localContexts.get();
public static List<NativeQueryRealization> getNativeRealizations() {
List<NativeQueryRealization> realizations = Lists.newArrayList();
// contexts can be null in case of 'explain plan for'
if (getThreadLocalContexts() == null) {
return realizations;
for (OLAPContext ctx : getThreadLocalContexts()) {
if (ctx.realization == null) {
final String realizationType;
Set<String> tableSets = Sets.newHashSet();
if (ctx.storageContext.isEmptyLayout() && ctx.storageContext.isFilterCondAlwaysFalse()) {
realizationType = QueryMetrics.FILTER_CONFLICT;
} else if (ctx.storageContext.isEmptyLayout()) {
realizationType = null;
} else if (ctx.storageContext.isUseSnapshot()) {
realizationType = QueryMetrics.TABLE_SNAPSHOT;
} else if (!ctx.storageContext.getCandidate().isEmptyCandidate()
&& ctx.storageContext.getCandidate().getLayoutEntity().getIndex().isTableIndex()) {
realizationType = QueryMetrics.TABLE_INDEX;
addTableSnapshots(tableSets, ctx);
} else {
realizationType = QueryMetrics.AGG_INDEX;
addTableSnapshots(tableSets, ctx);
val ctxRealizationModel = ctx.realization.getModel();
String modelId = ctxRealizationModel.getUuid();
//use fusion model alias
String modelAlias = ctxRealizationModel.getFusionModelAlias();
List<String> snapshots = Lists.newArrayList(tableSets);
if (ctx.storageContext.getStreamingLayoutId() != -1L) {
realizations.add(getStreamingNativeRealization(ctx, realizationType, modelId, modelAlias, snapshots));
if (ctx.realization instanceof HybridRealization) {
String batchModelId = ((HybridRealization) ctx.realization).getBatchRealization().getUuid();
.add(getBatchNativeRealization(ctx, realizationType, batchModelId, modelAlias, snapshots));
} else {
realizations.add(getBatchNativeRealization(ctx, realizationType, modelId, modelAlias, snapshots));
return realizations;
private static NativeQueryRealization getStreamingNativeRealization(OLAPContext ctx, String realizationType,
String modelId, String modelAlias, List<String> snapshots) {
val streamingRealization = new NativeQueryRealization(modelId, modelAlias,
ctx.storageContext.getStreamingLayoutId(), realizationType, ctx.storageContext.isPartialMatchModel(),
.getOrDefault(streamingRealization.getLayoutId(), false));
return streamingRealization;
private static NativeQueryRealization getBatchNativeRealization(OLAPContext ctx, String realizationType,
String modelId, String modelAlias, List<String> snapshots) {
val realization = new NativeQueryRealization(modelId, modelAlias, ctx.storageContext.getLayoutId(),
realizationType, ctx.storageContext.isPartialMatchModel(), snapshots);
QueryContext.current().getSecondStorageUsageMap().getOrDefault(realization.getLayoutId(), false));
recommendSecondStorage(ctx.realization.getProject(), modelId, realizationType));
return realization;
private static void addTableSnapshots(Set<String> tableSets, OLAPContext ctx) {
private static boolean recommendSecondStorage(String project, String modelId, String realizationType) {
return QueryMetrics.TABLE_INDEX.equals(realizationType) && SecondStorageUtil.isProjectEnable(project)
&& !SecondStorageUtil.isModelEnable(project, modelId);
public static RexInputRef createUniqueInputRefAmongTables(OLAPTableScan table, int columnIdx,
Collection<OLAPTableScan> tables) {
List<TableScan> sorted = new ArrayList<>(tables);
int offset = 0;
for (TableScan tableScan : sorted) {
if (tableScan == table) {
return new RexInputRef(
table.getTableName() + "." + table.getRowType().getFieldList().get(columnIdx).getName(),
offset + columnIdx, table.getRowType().getFieldList().get(columnIdx).getType());
offset += tableScan.getRowType().getFieldCount();
return null;
public boolean isExactlyAggregate() {
return isExactlyAggregate;
public void setExactlyAggregate(boolean exactlyAggregate) {
isExactlyAggregate = exactlyAggregate;
public boolean isExactlyFastBitmap() {
return isExactlyFastBitmap;
public void setExactlyFastBitmap(boolean isExactlyFastBitmap) {
this.isExactlyFastBitmap = isExactlyFastBitmap;
public void setHasBitmapMeasure(boolean bitmapMeasure) {
hasBitmapMeasure = bitmapMeasure;
public boolean isConstantQuery() {
return allColumns.isEmpty() && aggregations.isEmpty();
public boolean isConstantQueryWithAggregations() {
// deal with probing query like select min(2+2), max(2) from Table
return allColumns.isEmpty() && aggregations.isEmpty() && !constantAggregations.isEmpty();
public SQLDigest getSQLDigest() {
if (sqlDigest == null) {
sqlDigest = new SQLDigest(firstTableScan.getTableName(), Sets.newHashSet(allColumns),
Lists.newLinkedList(joins), // model
Lists.newArrayList(groupByColumns), Sets.newHashSet(subqueryJoinParticipants), // group by
Sets.newHashSet(metricsColumns), Lists.newArrayList(aggregations), // aggregation
Sets.newLinkedHashSet(filterColumns), // filter
Lists.newArrayList(sortColumns), Lists.newArrayList(sortOrders), limit, limitPrecedesAggr, // sort & limit
return sqlDigest;
public String getFirstTableIdentity() {
return firstTableScan.getTableRef().getTableIdentity();
public boolean isFirstTableLookupTableInModel(NDataModel model) {
return joins.isEmpty() && model.isLookupTable(getFirstTableIdentity());
public boolean hasPrecalculatedFields() {
NLayoutCandidate candidate = storageContext.getCandidate();
if (candidate.isEmptyCandidate()) {
return false;
boolean isTableIndex = candidate.getLayoutEntity().getIndex().isTableIndex();
boolean isLookupTable = isFirstTableLookupTableInModel(realization.getModel());
return !isTableIndex && !isLookupTable;
public void resetSQLDigest() {
this.sqlDigest = null;
public boolean belongToContextTables(TblColRef tblColRef) {
for (OLAPTableScan olapTableScan : this.allTableScans) {
if (olapTableScan.getColumnRowType().getAllColumns().contains(tblColRef)) {
return true;
return false;
public boolean isOriginAndBelongToCtxTables(TblColRef tblColRef) {
return belongToContextTables(tblColRef) && !tblColRef.getName().startsWith("_KY_");
public void setReturnTupleInfo(RelDataType rowType, ColumnRowType columnRowType) {
TupleInfo info = new TupleInfo();
List<RelDataTypeField> fieldList = rowType.getFieldList();
for (int i = 0; i < fieldList.size(); i++) {
RelDataTypeField field = fieldList.get(i);
TblColRef col = columnRowType == null ? null : columnRowType.getColumnByIndex(i);
info.setField(field.getName(), col, i);
this.returnTupleInfo = info;
public void addSort(TblColRef col, SQLDigest.OrderEnum order) {
if (col != null) {
public void fixModel(NDataModel model, Map<String, String> aliasMap) {
if (fixedModel)
for (OLAPTableScan tableScan : this.allTableScans) {
tableScan.fixColumnRowTypeWithModel(model, aliasMap);
fixedModel = true;
public void unfixModel() {
if (!fixedModel)
for (OLAPTableScan tableScan : this.allTableScans) {
fixedModel = false;
public void clearCtxInfo() {
//query info
this.afterAggregate = false;
this.afterHavingClauseFilter = false;
this.afterLimit = false;
this.limitPrecedesAggr = false;
this.afterTopJoin = false;
this.hasJoin = false;
this.hasPreCalcJoin = false;
this.hasAgg = false;
this.hasWindow = false;
this.joinsGraph = null;
this.sqlDigest = null;
public void addInnerGroupColumns(KapRel rel, Collection<TblColRef> innerGroupColumns) {
Set<TblColRef> innerGroupColumnsSet = new HashSet<>(innerGroupColumns);
for (TblColRef tblColRef : innerGroupColumnsSet) {
this.innerGroupByColumns.add(new TableColRefWithRel(rel, tblColRef));
// For streaming dataflow and fusion model, use streaming layout candidate of storage context
public boolean isAnsweredByTableIndex() {
NLayoutCandidate candidate;
if (this.realization.isStreaming()) {
candidate = this.storageContext.getStreamingCandidate();
} else {
candidate = this.storageContext.getCandidate();
return candidate != null && !candidate.isEmptyCandidate()
&& candidate.getLayoutEntity().getIndex().isTableIndex();
* Only used for recommendation or modeling.
public void simplify() {
if (firstTableScan != null) {
firstTableScan = firstTableScan.cleanRelOptCluster();
Set<OLAPTableScan> simplifiedTableScans = Sets.newHashSet();
allTableScans.forEach(olapTableScan -> olapTableScan.getCluster().getPlanner().clear());
allTableScans.forEach(olapTableScan -> simplifiedTableScans.add(olapTableScan.cleanRelOptCluster()));
this.allTableScans = simplifiedTableScans;
* It's very dangerous, only used for recommendation or modeling.
public void clean() {
topNode = null;
parentOfTopNode = null;
public String toString() {
return "OLAPContext{" + "firstTableScan=" + firstTableScan + ", allTableScans=" + allTableScans
+ ", allOlapJoins=" + allOlapJoins + ", groupByColumns=" + groupByColumns + ", innerGroupByColumns="
+ innerGroupByColumns + ", innerFilterColumns=" + innerFilterColumns + ", aggregations=" + aggregations
+ ", filterColumns=" + filterColumns + '}';
public final static String SEP = System.getProperty("line.separator");
public final static String INDENT = " ";
public final static String olapContextFormat = SEP
+ "{"
+ SEP + INDENT + "\"Fact Table\" = \"%s\","
+ SEP + INDENT + "\"Dimension Tables\" = [%s],"
+ SEP + INDENT + "\"Recommend Dimension(Group by)\" = [%s],"
+ SEP + INDENT + "\"Recommend Dimension(Filter cond)\" = [%s],"
+ SEP + INDENT + "\"Measures\" = [%s],"
+ SEP + "}"
+ SEP;
public String tipsForUser() {
Set<String> allTables =;
if (!allTables.isEmpty() && firstTableScan != null) {
return String.format(olapContextFormat,
Strings.join( -> "\"" + c + "\"").iterator(), ','),
Strings.join( -> "\"" + c.getColumnWithTableAndSchema() + "\"").iterator(), ','),
Strings.join( -> "\"" + c.getColumnWithTableAndSchema() + "\"").iterator(), ','),
Strings.join( -> "\"" + c.getFullExpression() + "\"").iterator(), ',')
} else {
return "empty";
public String toHumanReadString() {
String r = realization == null ? "not matched" : realization.getCanonicalName();
return "{id = " + id
+ ", model = " + r
+ ", fact table = " + (firstTableScan != null ? firstTableScan.getTableName() : "?")
+ "}";
public void matchJoinWithFilterTransformation() {
Set<TableRef> leftOrInnerTables = getNotNullTables();
if (CollectionUtils.isEmpty(leftOrInnerTables)) {
for (JoinDesc join : joins) {
if (leftOrInnerTables.contains(join.getPKSide())) {
joinsGraph.setJoinToLeftOrInner(join);"Current join: {} is set to LEFT_OR_INNER", join);
public void matchJoinWithEnhancementTransformation() {
public RexInputRef createUniqueInputRefContextTables(OLAPTableScan table, int columnIdx) {
return createUniqueInputRefAmongTables(table, columnIdx, allTableScans);
public String genExecFunc(OLAPRel rel, String tableName) {
setReturnTupleInfo(rel.getRowType(), rel.getColumnRowType());
if (canMinMaxDimAnsweredByMetadata(rel)) {
return "executeMetadataQuery";
if (isConstantQueryWithAggregations()) {
return "executeSimpleAggregationQuery";
// If the table being scanned is not a fact table, then it is a lookup table.
if (realization.getModel().isLookupTable(tableName)) {
return "executeLookupTableQuery";
return "executeOLAPQuery";
private boolean canMinMaxDimAnsweredByMetadata(OLAPRel rel) {
if (!KylinConfig.getInstanceFromEnv().isRouteToMetadataEnabled()) {
return false;
if (!(realization instanceof NDataflow) || !(rel instanceof OLAPJoinRel || rel instanceof OLAPTableScan)) {"Can't route to metadata, the realization is {} and this OLAPRel is {}", realization, rel);
return false;
* Find the target pattern as shown below.
* (other rel)
* |
* Agg
* |
* Project
* |
* (TableScan or JoinRel)
List<OLAPRel> relStack = new ArrayList<>();
OLAPRel current = this.topNode;
while (current != rel && current.getInputs().size() == 1 && current.getInput(0) instanceof OLAPRel) {
current = (OLAPRel) current.getInput(0);
if (current != rel || relStack.size() < 2 || !(relStack.get(relStack.size() - 1) instanceof OLAPProjectRel)
|| !(relStack.get(relStack.size() - 2) instanceof OLAPAggregateRel)) {"Can't route to query metadata, the rel stack is not matched");
return false;
OLAPAggregateRel aggregateRel = (OLAPAggregateRel) relStack.get(relStack.size() - 2);
if (aggregateRel.groups.size() > 1 || aggregateRel.groups.size() == 1 && !TblColRef.InnerDataTypeEnum.LITERAL
.getDataType().equals(aggregateRel.groups.get(0).getDatatype())) {"Cannot route to query metadata, only group by constants are supported.");
return false;
if (aggregations.isEmpty() || ! -> agg.isMin() || agg.isMax())) {"Cannot route to query metadata, only min/max aggregate functions are supported.");
return false;
if (
.anyMatch(agg -> TblColRef.InnerDataTypeEnum.contains(agg.getColRefs().get(0).getDatatype()))) {"Cannot route to query metadata, not support min(expression), such as min(id+1)");
return false;
if (!Sets.newHashSet(realization.getAllDimensions()).containsAll(allColumns)) {"Cannot route to query metadata, not all columns queried are treated as dimensions of index.");
return false;
// reset rewriteAggCalls to aggCall, to avoid using measures.
aggregateRel.rewriteAggCalls.addAll(aggregateRel.getAggCallList());"Use kylin metadata to answer query with realization : {}", realization);
return true;
public List<Object[]> getColValuesRange() {
Preconditions.checkState(realization instanceof NDataflow, "Only support dataflow");
// As it is a min/max aggregate function, it only has one parameter.
List<TblColRef> cols = //
.map(FunctionDesc::getColRefs) //
.filter(tblColRefs -> tblColRefs.size() == 1) //
.map(tblColRefs -> tblColRefs.get(0)) //
List<TblColRef> allFields = new ArrayList<>();
allTableScans.forEach(tableScan -> {
List<TblColRef> colRefs = tableScan.getColumnRowType().getAllColumns();
List<Object[]> result = new ArrayList<>();
for (NDataSegment segment : ((NDataflow) realization).getSegments()) {
if (segment.getStatus() != SegmentStatusEnum.READY) {
Map<String, DimensionRangeInfo> infoMap = segment.getDimensionRangeInfoMap();
Object[] minList = new Object[allFields.size()];
Object[] maxList = new Object[allFields.size()];
for (TblColRef col : cols) {
String dataType = col.getColumnDesc().getUpgradedType().getName();
int colId = allFields.indexOf(col);
String tblColRefIndex = getTblColRefIndex(col, realization);
DimensionRangeInfo dimensionRangeInfo = infoMap.get(tblColRefIndex);
if (dimensionRangeInfo == null) {
minList[colId] = null;
maxList[colId] = null;
} else {
minList[colId] = Tuple.convertOptiqCellValue(dimensionRangeInfo.getMin(), dataType);
maxList[colId] = Tuple.convertOptiqCellValue(dimensionRangeInfo.getMax(), dataType);
return result;
private String getTblColRefIndex(TblColRef colRef, IRealization df) {
NDataModel model = df.getModel();
return String.valueOf(model.getColumnIdByColumnName(colRef.getAliasDotName()));
public interface IAccessController {
void check(List<OLAPContext> contexts, OLAPRel tree, KylinConfig config);