blob: eeb2c2a989c8eca7b617d7753f9b3fbc92d94b40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.optimizer.rules.subplan;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
/*
This visitor inlines the input <code>nts</code> in the query plan rooted
at the operator being visited, with the query plan rooted at the input
<code>subplanInputOperator</code>.
The visitor ensures that:
1. live variables at <code>subplanInputOperator</code> are
propagated to the top-most join operator in the query plan rooted
at the operator being visited;
2. no available tuple at <code>subplanInputOperator</code> get lost along the
pipeline to the top-most join operator in the query plan rooted
at the operator being visited.
*/
class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisitor<ILogicalOperator, Void> {
// The optimization context.
private final IOptimizationContext context;
// The input operator to the subplan.
private final ILogicalOperator subplanInputOperator;
// The target Nts operator.
private final ILogicalOperator targetNts;
// The live variables in <code>subplanInputOperator</code> to enforce.
private final Set<LogicalVariable> liveVarsFromSubplanInput = new HashSet<>();
// The state indicate if the operator tree rooted at the current operator is rewritten.
private boolean rewritten = false;
// The state indicate if the operator tree rooted at the current operator contains a rewritten join.
private boolean hasJoinAncestor = false;
// A set of variables that are needed for not-null checks in the final group-by operator.
private Set<LogicalVariable> nullCheckVars = new HashSet<LogicalVariable>();
// The top join reference.
private Mutable<ILogicalOperator> topJoinRef;
/***
* @param context
* the optimization context
* @param subplanInputOperator
* the input operator to the target SubplanOperator
* @param nts
* the NestedTupleSourceOperator to be replaced by <code>subplanInputOperator</code>
* @throws AlgebricksException
*/
public InlineLeftNtsInSubplanJoinFlatteningVisitor(IOptimizationContext context,
ILogicalOperator subplanInputOperator, ILogicalOperator nts) throws AlgebricksException {
this.context = context;
this.subplanInputOperator = subplanInputOperator;
this.targetNts = nts;
VariableUtilities.getSubplanLocalLiveVariables(subplanInputOperator, liveVarsFromSubplanInput);
}
/**
* @return a set of variables indicating whether a tuple from the right
* branch of a left-outer join is a non-match.
*/
public Set<LogicalVariable> getNullCheckVariables() {
return nullCheckVars;
}
/**
* @return the top-most join operator after visiting the query plan rooted
* at the operator being visited.
*/
public Mutable<ILogicalOperator> getTopJoinReference() {
return topJoinRef;
}
@Override
public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
hasJoinAncestor = true;
boolean needToSwitch = false;
for (int i = 0; i < op.getInputs().size(); ++i) {
// Deals with single input operators.
ILogicalOperator newChild = op.getInputs().get(i).getValue().accept(this, null);
op.getInputs().get(i).setValue(newChild);
if (i == 1) {
needToSwitch = true;
}
if (rewritten) {
break;
}
}
// Checks whether there is a need to switch two join branches.
if (rewritten && needToSwitch) {
Mutable<ILogicalOperator> leftBranch = op.getInputs().get(0);
Mutable<ILogicalOperator> rightBranch = op.getInputs().get(1);
op.getInputs().set(0, rightBranch);
op.getInputs().set(1, leftBranch);
}
AbstractBinaryJoinOperator returnOp = op;
// After rewriting, the original inner join should become an left outer join.
if (rewritten) {
returnOp = new LeftOuterJoinOperator(op.getCondition());
returnOp.getInputs().addAll(op.getInputs());
injectNullCheckVars(returnOp);
}
return returnOp;
}
@Override
public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
hasJoinAncestor = true;
// Only rewrites the left child.
ILogicalOperator newChild = op.getInputs().get(0).getValue().accept(this, null);
op.getInputs().get(0).setValue(newChild);
return op;
}
@Override
public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
throws AlgebricksException {
if (op == targetNts) {
// Inlines the actual <code>subplanInputOperator</code>.
rewritten = true;
return subplanInputOperator;
} else {
return op;
}
}
@Override
public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
boolean underJoin = hasJoinAncestor;
visitSingleInputOperator(op);
if (!rewritten || !underJoin) {
return op;
}
// Adjust the ordering if its input operator pipeline has been rewritten.
List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprList = new ArrayList<>();
// Adds keyVars to the prefix of sorting columns.
for (LogicalVariable liveVar : liveVarsFromSubplanInput) {
orderExprList.add(new Pair<IOrder, Mutable<ILogicalExpression>>(OrderOperator.ASC_ORDER,
new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
}
orderExprList.addAll(op.getOrderExpressions());
// Creates an order operator with the new expression list.
OrderOperator orderOp = new OrderOperator(orderExprList);
orderOp.getInputs().addAll(op.getInputs());
context.computeAndSetTypeEnvironmentForOperator(orderOp);
return orderOp;
}
@Override
public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
visitSingleInputOperator(op);
return op;
}
@Override
public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
boolean underJoin = hasJoinAncestor;
visitSingleInputOperator(op);
if (!rewritten || !underJoin) {
return op;
}
// Adds all missing variables that should propagates up.
for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
if (!op.getVariables().contains(keyVar)) {
op.getVariables().add(keyVar);
}
}
return op;
}
@Override
public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
}
@Override
public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
throw new UnsupportedOperationException(
"Nested subplans with a union operator should have been disqualified for this rewriting!");
}
@Override
public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
throw new UnsupportedOperationException(
"Nested subplans with a intersect operator should have been disqualified for this rewriting!");
}
@Override
public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg)
throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
visitSingleInputOperator(op);
if (!rewritten) {
return op;
}
Set<LogicalVariable> liveVars = new HashSet<>();
VariableUtilities.getLiveVariables(op, liveVars);
if (!liveVars.containsAll(liveVarsFromSubplanInput)) {
op.setPropagatesInput(true);
}
return op;
}
@Override
public ILogicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg)
throws AlgebricksException {
throw new AlgebricksException(
"The subquery de-correlation rule should always be applied before index-access-method related rules.");
}
@Override
public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
boolean underJoin = hasJoinAncestor;
visitSingleInputOperator(op);
if (!rewritten || !underJoin) {
return op;
}
List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
if (!distinctVarList.contains(keyVar)) {
distinctVarList.add(keyVar);
}
}
context.computeAndSetTypeEnvironmentForOperator(op);
return op;
}
@Override
public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
private ILogicalOperator visitSingleInputOperator(ILogicalOperator op) throws AlgebricksException {
if (op.getInputs().size() == 1) {
// Deals with single input operators.
Mutable<ILogicalOperator> childRef = op.getInputs().get(0);
ILogicalOperator newChild = childRef.getValue().accept(this, null);
if (topJoinRef == null) {
LogicalOperatorTag childOpTag = newChild.getOperatorTag();
if (childOpTag == LogicalOperatorTag.INNERJOIN || childOpTag == LogicalOperatorTag.LEFTOUTERJOIN) {
topJoinRef = childRef;
}
}
op.getInputs().get(0).setValue(newChild);
}
return op;
}
/**
* Inject varaibles to indicate non-matches for the right branch of
* a left-outer join.
*
* @param joinOp
* the leftouter join operator.
*/
private void injectNullCheckVars(AbstractBinaryJoinOperator joinOp) {
LogicalVariable assignVar = context.newVar();
ILogicalOperator assignOp = new AssignOperator(assignVar,
new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
assignOp.getInputs().add(joinOp.getInputs().get(1));
joinOp.getInputs().set(1, new MutableObject<ILogicalOperator>(assignOp));
nullCheckVars.add(assignVar);
}
}