blob: 325b6325dd40d8176a38900752e3ae1643409f03 [file] [log] [blame]
package edu.uci.ics.hivesterix.logical.plan.visitor;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
public class SortVisitor extends DefaultVisitor {
@SuppressWarnings("rawtypes")
@Override
public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator,
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException {
ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
Operator downStream = (Operator) operator.getChildOperators().get(0);
List<ExprNodeDesc> keys = desc.getKeyCols();
if (!(downStream instanceof ExtractOperator && desc.getNumReducers() == 1 && keys.size() > 0)) {
return null;
}
List<ExprNodeDesc> schema = new ArrayList<ExprNodeDesc>();
List<ExprNodeDesc> values = desc.getValueCols();
List<ExprNodeDesc> partitionCols = desc.getPartitionCols();
for (ExprNodeDesc key : keys) {
t.rewriteExpression(key);
}
for (ExprNodeDesc value : values) {
t.rewriteExpression(value);
}
for (ExprNodeDesc col : partitionCols) {
t.rewriteExpression(col);
}
// add a order-by operator and limit if any
List<Pair<IOrder, Mutable<ILogicalExpression>>> pairs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
char[] orders = desc.getOrder().toCharArray();
int i = 0;
for (ExprNodeDesc key : keys) {
Mutable<ILogicalExpression> expr = t.translateScalarFucntion(key);
IOrder order = orders[i] == '+' ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(order, expr);
pairs.add(pair);
i++;
}
// get input variables
ArrayList<LogicalVariable> inputVariables = new ArrayList<LogicalVariable>();
VariableUtilities.getProducedVariables(AlgebricksParentOperatorRef.getValue(), inputVariables);
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
ILogicalOperator currentOperator;
ILogicalOperator assignOp = t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables);
if (assignOp != null) {
currentOperator = assignOp;
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
}
OrderColumn[] keyColumns = new OrderColumn[keyVariables.size()];
for (int j = 0; j < keyColumns.length; j++)
keyColumns[j] = new OrderColumn(keyVariables.get(j), pairs.get(j).first.getKind());
// handle order operator
currentOperator = new OrderOperator(pairs);
currentOperator.getInputs().add(AlgebricksParentOperatorRef);
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
// project back, remove generated sort-key columns if any
if (assignOp != null) {
currentOperator = new ProjectOperator(inputVariables);
currentOperator.getInputs().add(AlgebricksParentOperatorRef);
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
}
/**
* a special rule for hive's order by output schema of reduce sink
* operator only contains the columns
*/
for (ExprNodeDesc value : values) {
schema.add(value);
}
ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, schema, variables);
t.rewriteOperatorOutputSchema(variables, operator);
if (assignOperator != null) {
currentOperator = assignOperator;
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
}
return new MutableObject<ILogicalOperator>(currentOperator);
}
}