blob: 2d2619e86fe20c36f308c598f727bb1bd7e7fc83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.algebricks.rewriter.rules;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
public abstract class AbstractIntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (context.checkIfInDontApplySet(this, op)) {
return false;
}
if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
return false;
}
GroupByOperator gbyOp = (GroupByOperator) op;
if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
return false;
}
BookkeepingInfo bi = new BookkeepingInfo();
GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
if (newGbyOp == null) {
return false;
}
Set<LogicalVariable> newGbyLiveVars = new ListSet<LogicalVariable>();
VariableUtilities.getLiveVariables(newGbyOp, newGbyLiveVars);
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
List<LogicalVariable> usedDecorVars = new ArrayList<LogicalVariable>();
// p.second.getValue() should always return a VariableReferenceExpression, hence
// usedDecorVars should always contain only one variable.
p.second.getValue().getUsedVariables(usedDecorVars);
if (!newGbyLiveVars.contains(usedDecorVars.get(0))) {
// Let the left-hand side of gbyOp's decoration expressions populated through the combiner group-by without
// any intermediate assignment.
newGbyOp.addDecorExpression(null, p.second.getValue());
}
}
newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
for (LogicalVariable var : freeVars) {
if (!propagatedVars.contains(var)) {
LogicalVariable newDecorVar = context.newVar();
newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
newDecorVar, context);
}
}
Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
opRef3.setValue(newGbyOp);
typeGby(newGbyOp, context);
typeGby(gbyOp, context);
context.addToDontApplySet(this, op);
return true;
}
private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
for (ILogicalPlan p : op.getNestedPlans()) {
OperatorPropertiesUtil.typePlan(p, context);
}
context.computeAndSetTypeEnvironmentForOperator(op);
}
private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
throws AlgebricksException {
// Hook up input to new group-by.
Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
ILogicalOperator op3 = opRef3.getValue();
GroupByOperator newGbyOp = new GroupByOperator();
newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
// Copy annotations.
Map<String, Object> annotations = newGbyOp.getAnnotations();
annotations.putAll(gbyOp.getAnnotations());
List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
for (ILogicalPlan p : gbyOp.getNestedPlans()) {
Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
if (!bip.first) {
// For now, if we cannot push everything, give up.
return null;
}
ILogicalPlan pushedSubplan = bip.second;
if (pushedSubplan != null) {
newGbyOp.getNestedPlans().add(pushedSubplan);
}
}
ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
// Find maximal sequence of variable.
for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
List<LogicalVariable> varList = e.getValue();
boolean see1 = true;
int sz1 = newOpGbyList.size();
int i = 0;
for (LogicalVariable v : varList) {
if (see1) {
if (i < sz1) {
LogicalVariable v2 = newOpGbyList.get(i);
if (v != v2) {
// cannot linearize
return null;
}
} else {
see1 = false;
newOpGbyList.add(v);
replGbyList.add(context.newVar());
}
i++;
} else {
newOpGbyList.add(v);
replGbyList.add(context.newVar());
}
}
}
// set the vars in the new op
int n = newOpGbyList.size();
for (int i = 0; i < n; i++) {
newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
}
return newGbyOp;
}
private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
throws AlgebricksException {
List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
Set<SimilarAggregatesInfo> toReplaceSet = new HashSet<SimilarAggregatesInfo>();
for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots, toReplaceSet)) {
// For now, if we cannot push everything, give up.
return new Pair<Boolean, ILogicalPlan>(false, null);
}
}
if (pushedRoots.isEmpty()) {
return new Pair<Boolean, ILogicalPlan>(true, null);
} else {
// Replaces the aggregation expressions in the original group-by op with new ones.
ILogicalPlan newPlan = new ALogicalPlanImpl(pushedRoots);
ILogicalPlan plan = fingIdenticalPlan(newGbyOp, newPlan);
replaceOriginalAggFuncs(toReplaceSet);
if (plan == null) {
return new Pair<Boolean, ILogicalPlan>(true, newPlan);
} else {
// Does not add a nested subplan to newGbyOp if there already exists an isomorphic plan.
Set<LogicalVariable> originalVars = new ListSet<LogicalVariable>();
Set<LogicalVariable> newVars = new ListSet<LogicalVariable>();
for (Mutable<ILogicalOperator> rootRef : pushedRoots) {
VariableUtilities.getProducedVariables(rootRef.getValue(), originalVars);
}
for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
VariableUtilities.getProducedVariables(rootRef.getValue(), newVars);
}
// Replaces variable exprs referring to the variables produced by newPlan by
// those produced by plan.
Iterator<LogicalVariable> originalVarIter = originalVars.iterator();
Iterator<LogicalVariable> newVarIter = newVars.iterator();
while (originalVarIter.hasNext()) {
LogicalVariable originalVar = originalVarIter.next();
LogicalVariable newVar = newVarIter.next();
for (SimilarAggregatesInfo sai : toReplaceSet) {
for (AggregateExprInfo aei : sai.simAggs) {
ILogicalExpression afce = aei.aggExprRef.getValue();
afce.substituteVar(originalVar, newVar);
}
}
}
return new Pair<Boolean, ILogicalPlan>(true, null);
}
}
}
private ILogicalPlan fingIdenticalPlan(GroupByOperator newGbyOp, ILogicalPlan plan) throws AlgebricksException {
for (ILogicalPlan nestedPlan : newGbyOp.getNestedPlans()) {
if (IsomorphismUtilities.isOperatorIsomorphicPlan(plan, nestedPlan)) {
return nestedPlan;
}
}
return null;
}
private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
List<Mutable<ILogicalOperator>> toPushAccumulate, Set<SimilarAggregatesInfo> toReplaceSet)
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
return false;
}
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
// Finds nested group-by if any.
AbstractLogicalOperator op3 = op2;
while (op3.getOperatorTag() != LogicalOperatorTag.GROUP && op3.getInputs().size() == 1) {
op3 = (AbstractLogicalOperator) op3.getInputs().get(0).getValue();
}
if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) {
AggregateOperator initAgg = (AggregateOperator) op1;
Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, toReplaceSet, context);
if (!pOpRef.first) {
return false;
}
Mutable<ILogicalOperator> opRef = pOpRef.second;
if (opRef != null) {
toPushAccumulate.add(opRef);
}
bi.modifyGbyMap.put(oldGbyOp, gbyVars);
return true;
} else {
GroupByOperator nestedGby = (GroupByOperator) op3;
List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
Set<LogicalVariable> freeVars = new HashSet<>();
// Removes non-free variables defined in the nested plan.
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(nestedGby, freeVars);
gbyVars2.retainAll(freeVars);
List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
concatGbyVars.addAll(gbyVars2);
for (ILogicalPlan p : nestedGby.getNestedPlans()) {
for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate,
toReplaceSet)) {
return false;
}
}
}
/***
* Push the nested pipeline which provides the input to the nested group operator into newGbyOp (the combined gby op).
* The change is to fix asterixdb issue 782.
*/
// Finds the reference of the bottom-most operator in the pipeline that
// should not be pushed to the combiner group-by.
Mutable<ILogicalOperator> currentOpRef = new MutableObject<ILogicalOperator>(nestedGby);
Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(currentOpRef);
// Adds the used variables in the pipeline from <code>currentOpRef</code> to <code>bottomOpRef</code>
// into the group-by keys for the introduced combiner group-by operator.
Set<LogicalVariable> usedVars = collectUsedFreeVariables(currentOpRef, bottomOpRef);
for (LogicalVariable usedVar : usedVars) {
if (!concatGbyVars.contains(usedVar)) {
concatGbyVars.add(usedVar);
}
}
// Retains the nested pipeline above the identified operator in the old group-by operator.
// Pushes the nested pipeline under the select operator into the new group-by operator.
Mutable<ILogicalOperator> oldNtsRef = findNtsRef(currentOpRef);
ILogicalOperator opToCombiner = bottomOpRef.getValue().getInputs().get(0).getValue();
if (opToCombiner.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
// No pipeline other than the aggregate operator needs to push to combiner.
return true;
}
bottomOpRef.getValue().getInputs().set(0, new MutableObject<ILogicalOperator>(oldNtsRef.getValue()));
Mutable<ILogicalOperator> newGbyNestedOpRef = findNtsRef(toPushAccumulate.get(0));
NestedTupleSourceOperator newNts = (NestedTupleSourceOperator) newGbyNestedOpRef.getValue();
newGbyNestedOpRef.setValue(opToCombiner);
oldNtsRef.setValue(newNts);
return true;
}
}
/**
* Find the set of used free variables along the pipeline from <code>topOpRef</code> (exclusive)
* to <code>bottomOpRef</code> (inclusive).
*
* @param topOpRef,
* the top root of the pipeline.
* @param bottomOpRef,
* the bottom of the pipeline.
* @return the set of used variables.
* @throws AlgebricksException
*/
private Set<LogicalVariable> collectUsedFreeVariables(Mutable<ILogicalOperator> topOpRef,
Mutable<ILogicalOperator> bottomOpRef) throws AlgebricksException {
Set<LogicalVariable> usedVars = new HashSet<>();
Mutable<ILogicalOperator> currentOpRef = topOpRef;
while (currentOpRef != bottomOpRef) {
currentOpRef = currentOpRef.getValue().getInputs().get(0);
VariableUtilities.getUsedVariables(currentOpRef.getValue(), usedVars);
}
Set<LogicalVariable> freeVars = new HashSet<>();
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) topOpRef.getValue(), freeVars);
usedVars.retainAll(freeVars);
return usedVars;
}
/**
* Find the reference of a nested tuple source operator in the query pipeline rooted at <code>currentOpRef</code>
*
* @param currentOpRef
* @return the reference of a nested tuple source operator
*/
private Mutable<ILogicalOperator> findNtsRef(Mutable<ILogicalOperator> currentOpRef) {
while (currentOpRef.getValue().getInputs().size() > 0) {
currentOpRef = currentOpRef.getValue().getInputs().get(0);
}
return currentOpRef;
}
/**
* Find the bottom-most nested operator reference in the query pipeline rooted at <code>currentOpRef</code>
* that cannot be pushed into the combiner group-by operator.
*
* @param currentOpRef
* @return the bottom-most reference of a select operator
*/
private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
throws AlgebricksException {
Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
while (currentOpRef.getValue().getInputs().size() > 0) {
Set<LogicalVariable> producedVars = new HashSet<>();
VariableUtilities.getProducedVariables(currentOpRef.getValue(), producedVars);
if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
bottomOpRef = currentOpRef;
}
currentOpRef = currentOpRef.getValue().getInputs().get(0);
}
return bottomOpRef;
}
/**
* Deal with the case where the nested plan in the combiner gby operator has a null-test before invoking aggregation functions.
*
* @param context
* The optimization context.
* @param nestedGby
* The nested gby operator in the global gby operator's subplan.
* @param firstAggVar
* The first aggregation variable produced by the combiner gby.
*/
protected abstract void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
List<LogicalVariable> aggregateVarsProducedByCombiner);
}