package edu.uci.ics.hivesterix.logical.plan.visitor; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.commons.lang3.mutable.Mutable; | |
import org.apache.commons.lang3.mutable.MutableObject; | |
import org.apache.hadoop.hive.ql.exec.ExtractOperator; | |
import org.apache.hadoop.hive.ql.exec.Operator; | |
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; | |
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator; | |
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; | |
import edu.uci.ics.hyracks.algebricks.common.utils.Pair; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn; | |
public class SortVisitor extends DefaultVisitor { | |
@SuppressWarnings("rawtypes") | |
@Override | |
public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator, | |
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException { | |
ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf(); | |
Operator downStream = (Operator) operator.getChildOperators().get(0); | |
List<ExprNodeDesc> keys = desc.getKeyCols(); | |
if (!(downStream instanceof ExtractOperator && desc.getNumReducers() == 1 && keys.size() > 0)) { | |
return null; | |
} | |
List<ExprNodeDesc> schema = new ArrayList<ExprNodeDesc>(); | |
List<ExprNodeDesc> values = desc.getValueCols(); | |
List<ExprNodeDesc> partitionCols = desc.getPartitionCols(); | |
for (ExprNodeDesc key : keys) { | |
t.rewriteExpression(key); | |
} | |
for (ExprNodeDesc value : values) { | |
t.rewriteExpression(value); | |
} | |
for (ExprNodeDesc col : partitionCols) { | |
t.rewriteExpression(col); | |
} | |
// add a order-by operator and limit if any | |
List<Pair<IOrder, Mutable<ILogicalExpression>>> pairs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>(); | |
char[] orders = desc.getOrder().toCharArray(); | |
int i = 0; | |
for (ExprNodeDesc key : keys) { | |
Mutable<ILogicalExpression> expr = t.translateScalarFucntion(key); | |
IOrder order = orders[i] == '+' ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER; | |
Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(order, expr); | |
pairs.add(pair); | |
i++; | |
} | |
// get input variables | |
ArrayList<LogicalVariable> inputVariables = new ArrayList<LogicalVariable>(); | |
VariableUtilities.getProducedVariables(AlgebricksParentOperatorRef.getValue(), inputVariables); | |
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>(); | |
ILogicalOperator currentOperator; | |
ILogicalOperator assignOp = t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables); | |
if (assignOp != null) { | |
currentOperator = assignOp; | |
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator); | |
} | |
OrderColumn[] keyColumns = new OrderColumn[keyVariables.size()]; | |
for (int j = 0; j < keyColumns.length; j++) | |
keyColumns[j] = new OrderColumn(keyVariables.get(j), pairs.get(j).first.getKind()); | |
// handle order operator | |
currentOperator = new OrderOperator(pairs); | |
currentOperator.getInputs().add(AlgebricksParentOperatorRef); | |
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator); | |
// project back, remove generated sort-key columns if any | |
if (assignOp != null) { | |
currentOperator = new ProjectOperator(inputVariables); | |
currentOperator.getInputs().add(AlgebricksParentOperatorRef); | |
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator); | |
} | |
/** | |
* a special rule for hive's order by output schema of reduce sink | |
* operator only contains the columns | |
*/ | |
for (ExprNodeDesc value : values) { | |
schema.add(value); | |
} | |
ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>(); | |
ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, schema, variables); | |
t.rewriteOperatorOutputSchema(variables, operator); | |
if (assignOperator != null) { | |
currentOperator = assignOperator; | |
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator); | |
} | |
return new MutableObject<ILogicalOperator>(currentOperator); | |
} | |
} |