blob: dd109ff5d78525b1d3154cf9c90288c3cadd7b76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.algebricks.core.algebra.util;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.api.exceptions.SourceLocation;
public class OperatorManipulationUtil {
// Transforms all NestedTupleSource operators to EmptyTupleSource operators
public static void ntsToEts(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
context.computeAndSetTypeEnvironmentForOperator(ets);
opRef.setValue(ets);
} else {
for (Mutable<ILogicalOperator> i : opRef.getValue().getInputs()) {
ntsToEts(i, context);
}
}
}
public static ILogicalOperator eliminateSingleSubplanOverEts(SubplanOperator subplan) {
if (subplan.getNestedPlans().size() > 1) {
// not a single subplan
List<Mutable<ILogicalOperator>> subInpList = subplan.getInputs();
subInpList.clear();
subInpList.add(new MutableObject<ILogicalOperator>(new EmptyTupleSourceOperator()));
return subplan;
}
ILogicalPlan plan = subplan.getNestedPlans().get(0);
if (plan.getRoots().size() > 1) {
// not a single subplan
List<Mutable<ILogicalOperator>> subInpList = subplan.getInputs();
subInpList.clear();
subInpList.add(new MutableObject<ILogicalOperator>(new EmptyTupleSourceOperator()));
return subplan;
}
return plan.getRoots().get(0).getValue();
}
public static boolean setOperatorMode(AbstractLogicalOperator op) {
AbstractLogicalOperator.ExecutionMode oldMode = op.getExecutionMode();
switch (op.getOperatorTag()) {
case DATASOURCESCAN: {
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
AbstractLogicalOperator currentOp = op;
while (currentOp.getInputs().size() == 1) {
AbstractLogicalOperator child = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
// Empty tuple source is a special case that can be partitioned in the same way as the data scan.
if (child.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
break;
}
child.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
currentOp = child;
}
break;
}
case NESTEDTUPLESOURCE: {
NestedTupleSourceOperator nts = (NestedTupleSourceOperator) op;
AbstractLogicalOperator prevOp =
(AbstractLogicalOperator) nts.getDataSourceReference().getValue().getInputs().get(0).getValue();
if (prevOp.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
nts.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
}
break;
}
default: {
boolean forceUnpartitioned = false;
if (op.getOperatorTag() == LogicalOperatorTag.LIMIT) {
LimitOperator opLim = (LimitOperator) op;
if (opLim.isTopmostLimitOp()) {
opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
forceUnpartitioned = true;
}
}
if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
AggregateOperator aggOp = (AggregateOperator) op;
if (aggOp.isGlobal()) {
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
forceUnpartitioned = true;
}
}
if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
GroupByOperator gbyOp = (GroupByOperator) op;
if (gbyOp.isGroupAll() && gbyOp.isGlobal()) {
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
forceUnpartitioned = true;
}
}
if (op.getOperatorTag() == LogicalOperatorTag.WINDOW) {
WindowOperator winOp = (WindowOperator) op;
if (winOp.getPartitionExpressions().isEmpty()) {
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
forceUnpartitioned = true;
}
}
for (Mutable<ILogicalOperator> i : op.getInputs()) {
boolean exit = false;
AbstractLogicalOperator inputOp = (AbstractLogicalOperator) i.getValue();
switch (inputOp.getExecutionMode()) {
case PARTITIONED: {
if (forceUnpartitioned) {
break;
}
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
exit = true;
break;
}
case LOCAL: {
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
break;
}
}
if (exit) {
break;
}
}
break;
}
}
return oldMode != op.getExecutionMode();
}
public static void substituteVarRec(AbstractLogicalOperator op, LogicalVariable v1, LogicalVariable v2,
boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
VariableUtilities.substituteVariables(op, v1, v2, goThroughNts, ctx);
for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx);
}
if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE && goThroughNts) {
NestedTupleSourceOperator nts = (NestedTupleSourceOperator) op;
if (nts.getDataSourceReference() != null) {
AbstractLogicalOperator op2 =
(AbstractLogicalOperator) nts.getDataSourceReference().getValue().getInputs().get(0).getValue();
substituteVarRec(op2, v1, v2, goThroughNts, ctx);
}
}
if (op.hasNestedPlans()) {
AbstractOperatorWithNestedPlans aonp = (AbstractOperatorWithNestedPlans) op;
for (ILogicalPlan p : aonp.getNestedPlans()) {
for (Mutable<ILogicalOperator> ref : p.getRoots()) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) ref.getValue();
substituteVarRec(aop, v1, v2, goThroughNts, ctx);
}
}
}
}
public static ILogicalPlan deepCopy(ILogicalPlan plan, ILogicalOperator dataSource) throws AlgebricksException {
List<Mutable<ILogicalOperator>> roots = plan.getRoots();
List<Mutable<ILogicalOperator>> newRoots = clonePipeline(roots);
ILogicalPlan newPlan = new ALogicalPlanImpl(newRoots);
setDataSource(newPlan, dataSource);
return newPlan;
}
public static ILogicalPlan deepCopy(ILogicalPlan plan, IOptimizationContext ctx) throws AlgebricksException {
List<Mutable<ILogicalOperator>> roots = plan.getRoots();
List<Mutable<ILogicalOperator>> newRoots = clonePipeline(roots);
cloneTypeEnvironments(ctx, roots, newRoots);
return new ALogicalPlanImpl(newRoots);
}
public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars(
ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException {
return deepCopyWithNewVars(root, ctx, true);
}
public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars(
ILogicalOperator root, IOptimizationContext ctx, boolean computeTypeEnvironment)
throws AlgebricksException {
LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor =
new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, computeTypeEnvironment ? ctx : null, true);
ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
return new Pair<>(newRoot, deepCopyVisitor.getInputToOutputVariableMapping());
}
private static void setDataSource(ILogicalPlan plan, ILogicalOperator dataSource) {
for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
setDataSource(rootRef, dataSource);
}
}
private static void setDataSource(Mutable<ILogicalOperator> opRef, ILogicalOperator dataSource) {
ILogicalOperator op = opRef.getValue();
if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
NestedTupleSourceOperator nts = (NestedTupleSourceOperator) op;
nts.setDataSourceReference(new MutableObject<ILogicalOperator>(dataSource));
}
for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
setDataSource(childRef, dataSource);
}
}
private static List<Mutable<ILogicalOperator>> clonePipeline(List<Mutable<ILogicalOperator>> roots)
throws AlgebricksException {
List<Mutable<ILogicalOperator>> newRoots = new ArrayList<Mutable<ILogicalOperator>>();
for (Mutable<ILogicalOperator> opRef : roots) {
newRoots.add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(opRef.getValue())));
}
return newRoots;
}
private static void cloneTypeEnvironments(IOptimizationContext ctx, List<Mutable<ILogicalOperator>> roots,
List<Mutable<ILogicalOperator>> newRoots) {
for (int i = 0; i < newRoots.size(); i++) {
Mutable<ILogicalOperator> opRef = newRoots.get(i);
Mutable<ILogicalOperator> oldOpRef = roots.get(i);
while (opRef.getValue().getInputs().size() > 0) {
ctx.setOutputTypeEnvironment(opRef.getValue(), ctx.getOutputTypeEnvironment(oldOpRef.getValue()));
opRef = opRef.getValue().getInputs().get(0);
oldOpRef = oldOpRef.getValue().getInputs().get(0);
}
ctx.setOutputTypeEnvironment(opRef.getValue(), ctx.getOutputTypeEnvironment(oldOpRef.getValue()));
}
}
public static ILogicalOperator bottomUpCopyOperators(ILogicalOperator op) throws AlgebricksException {
ILogicalOperator newOp = deepCopy(op);
newOp.getInputs().clear();
for (Mutable<ILogicalOperator> child : op.getInputs()) {
newOp.getInputs().add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(child.getValue())));
}
return newOp;
}
public static ILogicalOperator deepCopy(ILogicalOperator op) throws AlgebricksException {
OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor();
AbstractLogicalOperator copiedOperator = (AbstractLogicalOperator) op.accept(visitor, null);
copiedOperator.setSourceLocation(op.getSourceLocation());
copiedOperator.setExecutionMode(op.getExecutionMode());
copiedOperator.getAnnotations().putAll(op.getAnnotations());
copiedOperator.setSchema(op.getSchema());
AbstractLogicalOperator sourceOp = (AbstractLogicalOperator) op;
copiedOperator.setPhysicalOperator(sourceOp.getPhysicalOperator());
return copiedOperator;
}
/**
* Compute type environment of a newly generated operator {@code op} and its input.
*
* @param op,
* the logical operator.
* @param context,the
* optimization context.
* @throws AlgebricksException
*/
public static void computeTypeEnvironmentBottomUp(ILogicalOperator op, ITypingContext context)
throws AlgebricksException {
for (Mutable<ILogicalOperator> children : op.getInputs()) {
computeTypeEnvironmentBottomUp(children.getValue(), context);
}
AbstractLogicalOperator abstractOp = (AbstractLogicalOperator) op;
if (abstractOp.hasNestedPlans()) {
for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
for (Mutable<ILogicalOperator> rootRef : p.getRoots()) {
computeTypeEnvironmentBottomUp(rootRef.getValue(), context);
}
}
}
context.computeAndSetTypeEnvironmentForOperator(op);
}
/**
* Computes the type environment for a logical query plan.
*
* @param plan,
* the logical plan to consider.
* @param context
* the typing context.
* @throws AlgebricksException
*/
public static void computeTypeEnvironment(ILogicalPlan plan, ITypingContext context) throws AlgebricksException {
for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
computeTypeEnvironmentBottomUp(rootRef.getValue(), context);
}
}
/***
* Is the operator <code>>op</code> an ancestor of any operators with tags in the set <code>tags</code>?
*
* @param op
* @param tags
* @return True if yes; false other wise.
*/
public static boolean ancestorOfOperators(ILogicalOperator op, Set<LogicalOperatorTag> tags) {
LogicalOperatorTag opTag = op.getOperatorTag();
if (tags.contains(opTag)) {
return true;
}
for (Mutable<ILogicalOperator> children : op.getInputs()) {
if (ancestorOfOperators(children.getValue(), tags)) {
return true;
}
}
return false;
}
/**
* Returns all descendants of an operator that are leaf operators
*
* @param opRef given operator
* @return list containing all leaf descendants
*/
public static List<Mutable<ILogicalOperator>> findLeafDescendantsOrSelf(Mutable<ILogicalOperator> opRef) {
List<Mutable<ILogicalOperator>> result = Collections.emptyList();
Deque<Mutable<ILogicalOperator>> queue = new ArrayDeque<>();
queue.add(opRef);
Mutable<ILogicalOperator> currentOpRef;
while ((currentOpRef = queue.pollLast()) != null) {
List<Mutable<ILogicalOperator>> inputs = currentOpRef.getValue().getInputs();
if (inputs.isEmpty()) {
if (result.isEmpty()) {
result = new ArrayList<>();
}
result.add(currentOpRef);
} else {
queue.addAll(inputs);
}
}
return result;
}
/**
* Find operator in a given list of operator references
*
* @param list list to search in
* @param op operator to find
* @return operator position in the given list or {@code -1} if not found
*/
public static int indexOf(List<Mutable<ILogicalOperator>> list, ILogicalOperator op) {
for (int i = 0, ln = list.size(); i < ln; i++) {
if (list.get(i).getValue() == op) {
return i;
}
}
return -1;
}
public static List<Mutable<ILogicalExpression>> cloneExpressions(List<Mutable<ILogicalExpression>> exprList) {
if (exprList == null) {
return null;
}
List<Mutable<ILogicalExpression>> clonedExprList = new ArrayList<>(exprList.size());
for (Mutable<ILogicalExpression> expr : exprList) {
clonedExprList.add(new MutableObject<>(expr.getValue().cloneExpression()));
}
return clonedExprList;
}
public static List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> cloneOrderExpressions(
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprList) {
if (orderExprList == null) {
return null;
}
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> clonedExprList =
new ArrayList<>(orderExprList.size());
for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> orderExpr : orderExprList) {
clonedExprList.add(
new Pair<>(orderExpr.first, new MutableObject<>(orderExpr.second.getValue().cloneExpression())));
}
return clonedExprList;
}
/**
* Finds a variable assigned to a given expression and returns a new {@link VariableReferenceExpression}
* referring to this variable.
* @param assignVarList list of variables
* @param assignExprList list of expressions assigned to those variables
* @param searchExpr expression to search for
* @return said value, {@code null} if a variable is not found
*/
public static VariableReferenceExpression findAssignedVariable(List<LogicalVariable> assignVarList,
List<Mutable<ILogicalExpression>> assignExprList, ILogicalExpression searchExpr) {
for (int i = 0, n = assignExprList.size(); i < n; i++) {
ILogicalExpression expr = assignExprList.get(i).getValue();
if (expr.equals(searchExpr)) {
VariableReferenceExpression result = new VariableReferenceExpression(assignVarList.get(i));
result.setSourceLocation(expr.getSourceLocation());
return result;
}
}
return null;
}
/**
* Retains variables and expressions in provided lists as specified by given bitset
*/
public static void retainAssignVariablesAndExpressions(List<LogicalVariable> assignVarList,
List<Mutable<ILogicalExpression>> assignExprList, BitSet retainIndexes) {
int targetIdx = 0;
for (int sourceIdx = retainIndexes.nextSetBit(0); sourceIdx >= 0; sourceIdx =
retainIndexes.nextSetBit(sourceIdx + 1)) {
if (targetIdx != sourceIdx) {
assignVarList.set(targetIdx, assignVarList.get(sourceIdx));
assignExprList.set(targetIdx, assignExprList.get(sourceIdx));
}
targetIdx++;
}
for (int i = assignVarList.size() - 1; i >= targetIdx; i--) {
assignVarList.remove(i);
assignExprList.remove(i);
}
}
public static int findChild(ILogicalOperator op, LogicalOperatorTag childOpTag) {
List<Mutable<ILogicalOperator>> inputs = op.getInputs();
for (int i = 0, ln = inputs.size(); i < ln; i++) {
if (inputs.get(i).getValue().getOperatorTag() == childOpTag) {
return i;
}
}
return -1;
}
public static List<Mutable<ILogicalExpression>> createVariableReferences(Collection<LogicalVariable> varList,
SourceLocation sourceLoc) {
List<Mutable<ILogicalExpression>> varRefs = new ArrayList<>(varList.size());
for (LogicalVariable var : varList) {
VariableReferenceExpression varRef = new VariableReferenceExpression(var);
varRef.setSourceLocation(sourceLoc);
varRefs.add(new MutableObject<>(varRef));
}
return varRefs;
}
}