blob: 6bf47f00c359c070a3f357779f6fe32e814b4db5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRewriteUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector;
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The abstract class for all materialized view rules
*/
public abstract class AbstractMaterializedViewRule implements ExplorationRuleFactory {
public static final Set<JoinType> SUPPORTED_JOIN_TYPE_SET = ImmutableSet.of(
JoinType.INNER_JOIN,
JoinType.LEFT_OUTER_JOIN,
JoinType.RIGHT_OUTER_JOIN,
JoinType.FULL_OUTER_JOIN,
JoinType.LEFT_SEMI_JOIN,
JoinType.RIGHT_SEMI_JOIN,
JoinType.LEFT_ANTI_JOIN,
JoinType.RIGHT_ANTI_JOIN);
/**
* The abstract template method for query rewrite, it contains the main logic, try to rewrite query by
* multi materialization every time. if exception it will catch the exception and record it to
* materialization context.
*/
public List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
List<Plan> rewrittenPlans = new ArrayList<>();
// if available materialization list is empty, bail out
if (cascadesContext.getMaterializationContexts().isEmpty()) {
return rewrittenPlans;
}
for (MaterializationContext context : cascadesContext.getMaterializationContexts()) {
if (checkIfRewritten(queryPlan, context)) {
continue;
}
context.tryReGenerateMvScanPlan(cascadesContext);
// check mv plan is valid or not
if (!checkPattern(context.getStructInfo())) {
context.recordFailReason(context.getStructInfo(),
"View struct info is invalid", () -> String.format(", view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
continue;
}
// get query struct infos according to the view strut info, if valid query struct infos is empty, bail out
List<StructInfo> queryStructInfos = getValidQueryStructInfos(queryPlan, cascadesContext,
context.getStructInfo().getTableBitSet());
if (queryStructInfos.isEmpty()) {
continue;
}
for (StructInfo queryStructInfo : queryStructInfos) {
try {
if (rewrittenPlans.size() < cascadesContext.getConnectContext()
.getSessionVariable().getMaterializedViewRewriteSuccessCandidateNum()) {
rewrittenPlans.addAll(doRewrite(queryStructInfo, cascadesContext, context));
}
} catch (Exception exception) {
context.recordFailReason(queryStructInfo,
"Materialized view rule exec fail", exception::toString);
}
}
}
return rewrittenPlans;
}
/**
* Get valid query struct infos, if invalid record the invalid reason
*/
protected List<StructInfo> getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext,
BitSet materializedViewTableSet) {
List<StructInfo> validStructInfos = new ArrayList<>();
List<StructInfo> uncheckedStructInfos = MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext,
materializedViewTableSet);
uncheckedStructInfos.forEach(queryStructInfo -> {
boolean valid = checkPattern(queryStructInfo);
if (!valid) {
cascadesContext.getMaterializationContexts().forEach(ctx ->
ctx.recordFailReason(queryStructInfo, "Query struct info is invalid",
() -> String.format("query table bitmap is %s, plan is %s",
queryStructInfo.getTableBitSet(), queryPlan.treeString())
));
} else {
validStructInfos.add(queryStructInfo);
}
});
return validStructInfos;
}
/**
* The abstract template method for query rewrite, it contains the main logic, try to rewrite query by
* only one materialization every time. Different query pattern should override the sub logic.
*/
protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext cascadesContext,
MaterializationContext materializationContext) {
List<Plan> rewriteResults = new ArrayList<>();
StructInfo viewStructInfo = materializationContext.getStructInfo();
MatchMode matchMode = decideMatchMode(queryStructInfo.getRelations(), viewStructInfo.getRelations());
if (MatchMode.COMPLETE != matchMode) {
materializationContext.recordFailReason(queryStructInfo, "Match mode is invalid",
() -> String.format("matchMode is %s", matchMode));
return rewriteResults;
}
List<RelationMapping> queryToViewTableMappings = RelationMapping.generate(queryStructInfo.getRelations(),
viewStructInfo.getRelations());
// if any relation in query and view can not map, bail out.
if (queryToViewTableMappings == null) {
materializationContext.recordFailReason(queryStructInfo,
"Query to view table mapping is null", () -> "");
return rewriteResults;
}
for (RelationMapping queryToViewTableMapping : queryToViewTableMappings) {
SlotMapping queryToViewSlotMapping = SlotMapping.generate(queryToViewTableMapping);
if (queryToViewSlotMapping == null) {
materializationContext.recordFailReason(queryStructInfo,
"Query to view slot mapping is null", () -> "");
continue;
}
SlotMapping viewToQuerySlotMapping = queryToViewSlotMapping.inverse();
LogicalCompatibilityContext compatibilityContext = LogicalCompatibilityContext.from(
queryToViewTableMapping, queryToViewSlotMapping, queryStructInfo, viewStructInfo);
ComparisonResult comparisonResult = StructInfo.isGraphLogicalEquals(queryStructInfo, viewStructInfo,
compatibilityContext);
if (comparisonResult.isInvalid()) {
materializationContext.recordFailReason(queryStructInfo,
"The graph logic between query and view is not consistent",
comparisonResult::getErrorMessage);
continue;
}
SplitPredicate compensatePredicates = predicatesCompensate(queryStructInfo, viewStructInfo,
viewToQuerySlotMapping, comparisonResult, cascadesContext);
// Can not compensate, bail out
if (compensatePredicates.isInvalid()) {
materializationContext.recordFailReason(queryStructInfo,
"Predicate compensate fail",
() -> String.format("query predicates = %s,\n query equivalenceClass = %s, \n"
+ "view predicates = %s,\n query equivalenceClass = %s\n"
+ "comparisonResult = %s ", queryStructInfo.getPredicates(),
queryStructInfo.getEquivalenceClass(), viewStructInfo.getPredicates(),
viewStructInfo.getEquivalenceClass(), comparisonResult));
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getMvScanPlan();
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
} else {
// Try to rewrite compensate predicates by using mv scan
List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
queryPlan, materializationContext.getMvExprToMvScanExprMapping(),
viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet());
if (rewriteCompensatePredicates.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"Rewrite compensate predicate by view fail",
() -> String.format("compensatePredicates = %s,\n mvExprToMvScanExprMapping = %s,\n"
+ "viewToQuerySlotMapping = %s",
compensatePredicates, materializationContext.getMvExprToMvScanExprMapping(),
viewToQuerySlotMapping));
continue;
}
rewrittenPlan = new LogicalFilter<>(Sets.newLinkedHashSet(rewriteCompensatePredicates), mvScan);
}
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext);
if (rewrittenPlan == null) {
continue;
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
return childContext.getRewritePlan();
}, rewrittenPlan, queryPlan);
// check the partitions used by rewritten plan is valid or not
Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionsQueryUsed =
calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext);
// All partition used by query is valid
if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable()
.isEnableMaterializedViewUnionRewrite()) {
materializationContext.recordFailReason(queryStructInfo,
"Check partition query used validation fail",
() -> String.format("the partition used by query is invalid by materialized view,"
+ "invalid partition info query used is %s",
invalidPartitionsQueryUsed.values().stream()
.map(Partition::getName)
.collect(Collectors.toSet())));
continue;
}
boolean partitionValid = invalidPartitionsQueryUsed.isEmpty();
if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) {
// construct filter on originalPlan
Map<TableIf, Set<Expression>> filterOnOriginPlan;
try {
filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed,
queryToViewSlotMapping);
if (filterOnOriginPlan.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"construct invalid partition filter on query fail",
() -> String.format("the invalid partitions used by query is %s, query plan is %s",
invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
.collect(Collectors.toSet()),
queryStructInfo.getOriginalPlan().treeString()));
continue;
}
} catch (org.apache.doris.common.AnalysisException e) {
materializationContext.recordFailReason(queryStructInfo,
"construct invalid partition filter on query analysis fail",
() -> String.format("the invalid partitions used by query is %s, query plan is %s",
invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
.collect(Collectors.toSet()),
queryStructInfo.getOriginalPlan().treeString()));
continue;
}
// For rewrittenPlan which contains materialized view should remove invalid partition ids
List<Plan> children = Lists.newArrayList(
rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of(materializationContext.getMTMV(),
invalidPartitionsQueryUsed.values().stream()
.map(Partition::getId).collect(Collectors.toSet()))),
StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext));
// Union query materialized view and source table
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()),
children.stream()
.map(plan -> plan.getOutput().stream()
.map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList()))
.collect(Collectors.toList()),
ImmutableList.of(),
false,
children);
partitionValid = true;
}
if (!partitionValid) {
materializationContext.recordFailReason(queryStructInfo,
"materialized view partition is invalid union fail",
() -> String.format("invalidPartitionsQueryUsed = %s,\n query plan = %s",
invalidPartitionsQueryUsed, queryPlan.treeString()));
continue;
}
rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan);
if (!isOutputValid(queryPlan, rewrittenPlan)) {
LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties();
materializationContext.recordFailReason(queryStructInfo,
"RewrittenPlan output logical properties is different with target group",
() -> String.format("planOutput logical"
+ " properties = %s,\n groupOutput logical properties = %s",
logicalProperties, queryPlan.getLogicalProperties()));
continue;
}
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext);
rewriteResults.add(rewrittenPlan);
}
return rewriteResults;
}
private boolean checkCanUnionRewrite(Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition>
invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) {
if (invalidPartitionsQueryUsed.isEmpty()
|| !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) {
return false;
}
// if mv can not offer valid partition data for query, bail out union rewrite
Map<Long, Set<PartitionItem>> mvRelatedTablePartitionMap = new LinkedHashMap<>();
invalidPartitionsQueryUsed.keySet().forEach(invalidPartition ->
mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(),
new HashSet<>()));
queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap);
Set<PartitionKeyDesc> partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream()
.flatMap(Collection::stream)
.map(PartitionItem::toPartitionKeyDesc)
.collect(Collectors.toSet());
Set<PartitionKeyDesc> mvInvalidPartitionKeyDescSet = new HashSet<>();
for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
invalidPartitionsQueryUsed.asMap().entrySet()) {
entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add(
entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc()));
}
return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed);
}
// Normalize expression such as nullable property and output slot id
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
// normalize nullable
List<NamedExpression> normalizeProjects = new ArrayList<>();
for (int i = 0; i < originPlan.getOutput().size(); i++) {
normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i)));
}
return new LogicalProject<>(normalizeProjects, rewrittenPlan);
}
/**
* Check the logical properties of rewritten plan by mv is the same with source plan
* if same return true, if different return false
*/
protected boolean isOutputValid(Plan sourcePlan, Plan rewrittenPlan) {
if (sourcePlan.getGroupExpression().isPresent() && !rewrittenPlan.getLogicalProperties()
.equals(sourcePlan.getGroupExpression().get().getOwnerGroup().getLogicalProperties())) {
return false;
}
return sourcePlan.getLogicalProperties().equals(rewrittenPlan.getLogicalProperties());
}
/**
* Partition will be pruned in query then add the pruned partitions to select partitions field of
* catalog relation.
* Maybe only just some partitions is valid in materialized view, so we should check if the mv can
* offer the partitions which query used or not.
*
* @return the invalid partition name set
*/
protected Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> calcUsedInvalidMvPartitions(
Plan rewrittenPlan,
MaterializationContext materializationContext,
CascadesContext cascadesContext) {
// check partition is valid or not
MTMV mtmv = materializationContext.getMTMV();
PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
// if not partition, if rewrite success, it means mv is available
return ImmutableMultimap.of();
}
// check mv related table partition is valid or not
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo();
if (relatedPartitionTable == null) {
return ImmutableMultimap.of();
}
// get mv valid partitions
Set<Long> mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext(), System.currentTimeMillis()).stream()
.map(Partition::getId)
.collect(Collectors.toSet());
// get partitions query used
Set<Long> mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
&& Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName()))
.stream()
.map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds())
.flatMap(Collection::stream)
.collect(Collectors.toSet());
// get invalid partition ids
Set<Long> invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed);
invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
ImmutableMultimap.Builder<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionMapBuilder =
ImmutableMultimap.builder();
Pair<MTMVPartitionInfo, PartitionInfo> partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo);
invalidMvPartitionIdSet.forEach(invalidPartitionId ->
invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId)));
return invalidPartitionMapBuilder.build();
}
/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInfo, StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan, MaterializationContext materializationContext) {
return tempRewritedPlan;
}
/**
* Use target expression to represent the source expression. Visit the source expression,
* try to replace the source expression with target expression in targetExpressionMapping, if found then
* replace the source expression by target expression mapping value.
*
* @param sourceExpressionsToWrite the source expression to write by target expression
* @param sourcePlan the source plan witch the source expression belong to
* @param targetExpressionMapping target expression mapping, if finding the expression in key set of the mapping
* then use the corresponding value of mapping to replace it
* @param targetExpressionNeedSourceBased if targetExpressionNeedSourceBased is true,
* we should make the target expression map key to source based,
* Note: the key expression in targetExpressionMapping should be shuttled. with the method
* ExpressionUtils.shuttleExpressionWithLineage.
* example as following:
* source target
* project(slot 1, 2) project(slot 3, 2, 1)
* scan(table) scan(table)
* then
* transform source to:
* project(slot 2, 1)
* target
*/
protected List<Expression> rewriteExpression(List<? extends Expression> sourceExpressionsToWrite, Plan sourcePlan,
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping,
boolean targetExpressionNeedSourceBased, BitSet sourcePlanBitSet) {
// Firstly, rewrite the target expression using source with inverse mapping
// then try to use the target expression to represent the query. if any of source expressions
// can not be represented by target expressions, return null.
// generate target to target replacement expression mapping, and change target expression to source based
List<? extends Expression> sourceShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(
sourceExpressionsToWrite, sourcePlan, sourcePlanBitSet);
ExpressionMapping expressionMappingKeySourceBased = targetExpressionNeedSourceBased
? targetExpressionMapping.keyPermute(targetToSourceMapping) : targetExpressionMapping;
// target to target replacement expression mapping, because mv is 1:1 so get first element
List<Map<Expression, Expression>> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap();
Map<? extends Expression, ? extends Expression> targetToTargetReplacementMapping = flattenExpressionMap.get(0);
List<Expression> rewrittenExpressions = new ArrayList<>();
for (Expression expressionShuttledToRewrite : sourceShuttledExpressions) {
if (expressionShuttledToRewrite instanceof Literal) {
rewrittenExpressions.add(expressionShuttledToRewrite);
continue;
}
final Set<Object> slotsToRewrite =
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof Slot);
Expression replacedExpression = ExpressionUtils.replace(expressionShuttledToRewrite,
targetToTargetReplacementMapping);
if (replacedExpression.anyMatch(slotsToRewrite::contains)) {
// if contains any slot to rewrite, which means can not be rewritten by target, bail out
return ImmutableList.of();
}
rewrittenExpressions.add(replacedExpression);
}
return rewrittenExpressions;
}
/**
* Normalize expression with query, keep the consistency of exprId and nullable props with
* query
*/
private NamedExpression normalizeExpression(
NamedExpression sourceExpression, NamedExpression replacedExpression) {
Expression innerExpression = replacedExpression;
if (replacedExpression.nullable() != sourceExpression.nullable()) {
// if enable join eliminate, query maybe inner join and mv maybe outer join.
// If the slot is at null generate side, the nullable maybe different between query and view
// So need to force to consistent.
innerExpression = sourceExpression.nullable()
? new Nullable(replacedExpression) : new NonNullable(replacedExpression);
}
return new Alias(sourceExpression.getExprId(), innerExpression, sourceExpression.getName());
}
/**
* Compensate mv predicates by query predicates, compensate predicate result is query based.
* Such as a > 5 in mv, and a > 10 in query, the compensatory predicate is a > 10.
* For another example as following:
* predicate a = b in mv, and a = b and c = d in query, the compensatory predicate is c = d
*/
protected SplitPredicate predicatesCompensate(
StructInfo queryStructInfo,
StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping,
ComparisonResult comparisonResult,
CascadesContext cascadesContext
) {
// TODO: Use set of list? And consider view expr
List<Expression> queryPulledUpExpressions = ImmutableList.copyOf(comparisonResult.getQueryExpressions());
// set pulled up expression to queryStructInfo predicates and update related predicates
if (!queryPulledUpExpressions.isEmpty()) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(queryPulledUpExpressions));
}
List<Expression> viewPulledUpExpressions = ImmutableList.copyOf(comparisonResult.getViewExpressions());
// set pulled up expression to viewStructInfo predicates and update related predicates
if (!viewPulledUpExpressions.isEmpty()) {
viewStructInfo = viewStructInfo.withPredicates(
viewStructInfo.getPredicates().merge(viewPulledUpExpressions));
}
// if the join type in query and mv plan is different, we should check query is have the
// filters which rejects null
Set<Set<Slot>> requireNoNullableViewSlot = comparisonResult.getViewNoNullableSlot();
// check query is use the null reject slot which view comparison need
if (!requireNoNullableViewSlot.isEmpty()) {
SlotMapping queryToViewMapping = viewToQuerySlotMapping.inverse();
// try to use
boolean valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, cascadesContext);
if (!valid) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(comparisonResult.getQueryAllPulledUpExpressions()));
valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, cascadesContext);
}
if (!valid) {
return SplitPredicate.INVALID_INSTANCE;
}
}
// viewEquivalenceClass to query based
// equal predicate compensate
final Set<Expression> equalCompensateConjunctions = Predicates.compensateEquivalence(
queryStructInfo,
viewStructInfo,
viewToQuerySlotMapping,
comparisonResult);
// range compensate
final Set<Expression> rangeCompensatePredicates = Predicates.compensateRangePredicate(
queryStructInfo,
viewStructInfo,
viewToQuerySlotMapping,
comparisonResult,
cascadesContext);
// residual compensate
final Set<Expression> residualCompensatePredicates = Predicates.compensateResidualPredicate(
queryStructInfo,
viewStructInfo,
viewToQuerySlotMapping,
comparisonResult);
if (equalCompensateConjunctions == null || rangeCompensatePredicates == null
|| residualCompensatePredicates == null) {
return SplitPredicate.INVALID_INSTANCE;
}
if (equalCompensateConjunctions.stream().anyMatch(expr -> expr.containsType(AggregateFunction.class))
|| rangeCompensatePredicates.stream().anyMatch(expr -> expr.containsType(AggregateFunction.class))
|| residualCompensatePredicates.stream().anyMatch(expr ->
expr.containsType(AggregateFunction.class))) {
return SplitPredicate.INVALID_INSTANCE;
}
return SplitPredicate.of(equalCompensateConjunctions.isEmpty() ? BooleanLiteral.TRUE
: ExpressionUtils.and(equalCompensateConjunctions),
rangeCompensatePredicates.isEmpty() ? BooleanLiteral.TRUE
: ExpressionUtils.and(rangeCompensatePredicates),
residualCompensatePredicates.isEmpty() ? BooleanLiteral.TRUE
: ExpressionUtils.and(residualCompensatePredicates));
}
/**
* Check the queryPredicates contains the required nullable slot
*/
private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
Set<Expression> queryPredicates,
SlotMapping queryToViewMapping,
CascadesContext cascadesContext) {
Set<Expression> queryPulledUpPredicates = queryPredicates.stream()
.flatMap(expr -> ExpressionUtils.extractConjunction(expr).stream())
.map(expr -> {
// NOTICE inferNotNull generate Not with isGeneratedIsNotNull = false,
// so, we need set this flag to false before comparison.
if (expr instanceof Not) {
return ((Not) expr).withGeneratedIsNotNull(false);
}
return expr;
})
.collect(Collectors.toSet());
Set<Expression> nullRejectPredicates = ExpressionUtils.inferNotNull(queryPulledUpPredicates, cascadesContext);
Set<Expression> queryUsedNeedRejectNullSlotsViewBased = nullRejectPredicates.stream()
.map(expression -> TypeUtils.isNotNull(expression).orElse(null))
.filter(Objects::nonNull)
.map(expr -> ExpressionUtils.replace((Expression) expr, queryToViewMapping.toSlotReferenceMap()))
.collect(Collectors.toSet());
// query pulledUp predicates should have null reject predicates and contains any require noNullable slot
return !queryPulledUpPredicates.containsAll(nullRejectPredicates)
&& requireNoNullableViewSlot.stream().noneMatch(set ->
Sets.intersection(set, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
}
/**
* Decide the match mode
*
* @see MatchMode
*/
private MatchMode decideMatchMode(List<CatalogRelation> queryRelations, List<CatalogRelation> viewRelations) {
List<TableIf> queryTableRefs = queryRelations.stream().map(CatalogRelation::getTable)
.collect(Collectors.toList());
List<TableIf> viewTableRefs = viewRelations.stream().map(CatalogRelation::getTable)
.collect(Collectors.toList());
boolean sizeSame = viewTableRefs.size() == queryTableRefs.size();
boolean queryPartial = viewTableRefs.containsAll(queryTableRefs);
if (!sizeSame && queryPartial) {
return MatchMode.QUERY_PARTIAL;
}
boolean viewPartial = queryTableRefs.containsAll(viewTableRefs);
if (!sizeSame && viewPartial) {
return MatchMode.VIEW_PARTIAL;
}
if (sizeSame && queryPartial && viewPartial) {
return MatchMode.COMPLETE;
}
return MatchMode.NOT_MATCH;
}
/**
* Check the pattern of query or materializedView is supported or not.
*/
protected boolean checkPattern(StructInfo structInfo) {
if (structInfo.getRelations().isEmpty()) {
return false;
}
return true;
}
protected void recordIfRewritten(Plan plan, MaterializationContext context) {
context.setSuccess(true);
if (plan.getGroupExpression().isPresent()) {
context.addMatchedGroup(plan.getGroupExpression().get().getOwnerGroup().getGroupId(), true);
}
}
protected boolean checkIfRewritten(Plan plan, MaterializationContext context) {
return plan.getGroupExpression().isPresent()
&& context.alreadyRewrite(plan.getGroupExpression().get().getOwnerGroup().getGroupId());
}
/**
* Query and mv match node
*/
protected enum MatchMode {
/**
* The tables in query are same to the tables in view
*/
COMPLETE,
/**
* The tables in query contains all the tables in view
*/
VIEW_PARTIAL,
/**
* The tables in view contains all the tables in query
*/
QUERY_PARTIAL,
/**
* Except for COMPLETE and VIEW_PARTIAL and QUERY_PARTIAL
*/
NOT_MATCH
}
}