blob: 80b3fef52ede47a93433ce730a5f399cdeefc3c9 [file] [log] [blame]
package edu.uci.ics.hivesterix.logical.plan;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.UDTFOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant;
import edu.uci.ics.hivesterix.logical.expression.HiveAlgebricksBuiltInFunctionMap;
import edu.uci.ics.hivesterix.logical.expression.HiveFunctionInfo;
import edu.uci.ics.hivesterix.logical.expression.HivesterixConstantValue;
import edu.uci.ics.hivesterix.logical.plan.visitor.ExtractVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.FilterVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.GroupByVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.JoinVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.LateralViewJoinVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.LimitVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.MapJoinVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.ProjectVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.SortVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.TableScanWriteVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.UnionVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Visitor;
import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
@SuppressWarnings("rawtypes")
public class HiveAlgebricksTranslator implements Translator {
private int currentVariable = 0;
private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();
private boolean continueTraverse = true;
private IMetadataProvider<PartitionDesc, Object> metaData;
/**
* map variable name to the logical variable
*/
private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();
/**
* map field name to LogicalVariable
*/
private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();
/**
* map logical variable to name
*/
private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();
/**
* asterix root operators
*/
private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();
/**
* a list of visitors
*/
private List<Visitor> visitors = new ArrayList<Visitor>();
/**
* output writer to print things out
*/
private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out));
/**
* map a logical variable to type info
*/
private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();
@Override
public LogicalVariable getVariable(String fieldName, TypeInfo type) {
LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
if (var == null) {
currentVariable++;
var = new LogicalVariable(currentVariable);
fieldToLogicalVariableMap.put(fieldName, var);
nameToLogicalVariableMap.put(var.toString(), var);
variableToType.put(var, type);
logicalVariableToFieldMap.put(var, fieldName);
}
return var;
}
@Override
public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {
currentVariable++;
LogicalVariable var = new LogicalVariable(currentVariable);
fieldToLogicalVariableMap.put(fieldName, var);
nameToLogicalVariableMap.put(var.toString(), var);
variableToType.put(var, type);
logicalVariableToFieldMap.put(var, fieldName);
return var;
}
@Override
public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {
String name = this.logicalVariableToFieldMap.get(oldVar);
if (name != null) {
fieldToLogicalVariableMap.put(name, newVar);
nameToLogicalVariableMap.put(newVar.toString(), newVar);
nameToLogicalVariableMap.put(oldVar.toString(), newVar);
logicalVariableToFieldMap.put(newVar, name);
}
}
@Override
public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {
return metaData;
}
/**
* only get an variable, without rewriting it
*
* @param fieldName
* @return
*/
private LogicalVariable getVariableOnly(String fieldName) {
return fieldToLogicalVariableMap.get(fieldName);
}
private void updateVariable(String fieldName, LogicalVariable variable) {
LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
if (var == null) {
fieldToLogicalVariableMap.put(fieldName, variable);
nameToLogicalVariableMap.put(fieldName, variable);
} else if (!var.equals(variable)) {
fieldToLogicalVariableMap.put(fieldName, variable);
nameToLogicalVariableMap.put(fieldName, variable);
}
}
/**
* get a list of logical variables from the schema
*
* @param schema
* @return
*/
@Override
public List<LogicalVariable> getVariablesFromSchema(Schema schema) {
List<LogicalVariable> variables = new ArrayList<LogicalVariable>();
List<String> names = schema.getNames();
for (String name : names)
variables.add(nameToLogicalVariableMap.get(name));
return variables;
}
/**
* get variable to typeinfo map
*
* @return
*/
public HashMap<LogicalVariable, TypeInfo> getVariableContext() {
return this.variableToType;
}
/**
* get the number of variables s
*
* @return
*/
public int getVariableCounter() {
return currentVariable + 1;
}
/**
* translate from hive operator tree to asterix operator tree
*
* @param hive
* roots
* @return Algebricks roots
*/
public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator,
HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException {
/**
* register visitors
*/
visitors.add(new FilterVisitor());
visitors.add(new GroupByVisitor());
visitors.add(new JoinVisitor());
visitors.add(new LateralViewJoinVisitor());
visitors.add(new UnionVisitor());
visitors.add(new LimitVisitor());
visitors.add(new MapJoinVisitor());
visitors.add(new ProjectVisitor());
visitors.add(new SortVisitor());
visitors.add(new ExtractVisitor());
visitors.add(new TableScanWriteVisitor(aliasToPathMap));
List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>(
parentOperator));
insertReplicateOperator(refList);
if (refList != null)
rootOperators.addAll(refList);
}
/**
* translate operator DAG
*
* @param hiveRoot
* @param AlgebricksParentOperator
* @return
*/
private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,
Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException {
for (Operator hiveOperator : hiveRoot) {
continueTraverse = true;
Mutable<ILogicalOperator> currentOperatorRef = null;
if (hiveOperator.getType() == OperatorType.FILTER) {
FilterOperator fop = (FilterOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.REDUCESINK) {
ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.JOIN) {
JoinOperator fop = (JoinOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null) {
continueTraverse = true;
break;
} else
continueTraverse = false;
}
if (currentOperatorRef == null)
return null;
} else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {
LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
if (currentOperatorRef == null)
return null;
} else if (hiveOperator.getType() == OperatorType.MAPJOIN) {
MapJoinOperator fop = (MapJoinOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null) {
continueTraverse = true;
break;
} else
continueTraverse = false;
}
if (currentOperatorRef == null)
return null;
} else if (hiveOperator.getType() == OperatorType.SELECT) {
SelectOperator fop = (SelectOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.EXTRACT) {
ExtractOperator fop = (ExtractOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.GROUPBY) {
GroupByOperator fop = (GroupByOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.TABLESCAN) {
TableScanOperator fop = (TableScanOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.FILESINK) {
FileSinkOperator fop = (FileSinkOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.LIMIT) {
LimitOperator lop = (LimitOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.UDTF) {
UDTFOperator lop = (UDTFOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
if (currentOperatorRef != null)
break;
}
} else if (hiveOperator.getType() == OperatorType.UNION) {
UnionOperator lop = (UnionOperator) hiveOperator;
for (Visitor visitor : visitors) {
currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
if (currentOperatorRef != null) {
continueTraverse = true;
break;
} else
continueTraverse = false;
}
} else
;
if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0
&& continueTraverse) {
@SuppressWarnings("unchecked")
List<Operator> children = hiveOperator.getChildOperators();
if (currentOperatorRef == null)
currentOperatorRef = AlgebricksParentOperator;
translate(children, currentOperatorRef);
}
if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0)
logicalOp.add(currentOperatorRef);
}
return logicalOp;
}
/**
* used in select, group by to get no-column-expression columns
*
* @param cols
* @return
*/
public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,
ArrayList<LogicalVariable> variables) {
ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
/**
* variables to be appended in the assign operator
*/
ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();
// one variable can only be assigned once
for (ExprNodeDesc hiveExpr : cols) {
rewriteExpression(hiveExpr);
if (hiveExpr instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;
String fieldName = desc2.getTabAlias() + "." + desc2.getColumn();
// System.out.println("project expr: " + fieldName);
if (fieldName.indexOf("$$") < 0) {
LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo());
desc2.setColumn(var.toString());
desc2.setTabAlias("");
variables.add(var);
} else {
LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn());
String name = this.logicalVariableToFieldMap.get(var);
var = this.getVariableOnly(name);
variables.add(var);
}
} else {
Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);
expressions.add(asterixExpr);
LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(),
hiveExpr.getTypeInfo());
variables.add(var);
appendedVariables.add(var);
}
}
/**
* create an assign operator to deal with appending
*/
ILogicalOperator assignOp = null;
if (appendedVariables.size() > 0) {
assignOp = new AssignOperator(appendedVariables, expressions);
assignOp.getInputs().add(parent);
}
return assignOp;
}
private ILogicalPlan plan;
public ILogicalPlan genLogicalPlan() {
plan = new ALogicalPlanImpl(rootOperators);
return plan;
}
public void printOperators() throws AlgebricksException {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
StringBuilder buffer = new StringBuilder();
PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
outputWriter.println(buffer);
outputWriter.println("rewritten variables: ");
outputWriter.flush();
printVariables();
}
public static void setOutputPrinter(PrintWriter writer) {
outputWriter = writer;
}
private void printVariables() {
Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet();
for (Entry<String, LogicalVariable> entry : entries) {
outputWriter.println(entry.getKey() + " -> " + entry.getValue());
}
outputWriter.flush();
}
/**
* generate the object inspector for the output of an operator
*
* @param operator
* The Hive operator
* @return an ObjectInspector object
*/
public Schema generateInputSchema(Operator operator) {
List<String> variableNames = new ArrayList<String>();
List<TypeInfo> typeList = new ArrayList<TypeInfo>();
List<ColumnInfo> columns = operator.getSchema().getSignature();
for (ColumnInfo col : columns) {
// typeList.add();
TypeInfo type = col.getType();
typeList.add(type);
String fieldName = col.getInternalName();
variableNames.add(fieldName);
}
return new Schema(variableNames, typeList);
}
/**
* rewrite the names of output columns for feature expression evaluators to
* use
*
* @param operator
*/
public void rewriteOperatorOutputSchema(Operator operator) {
List<ColumnInfo> columns = operator.getSchema().getSignature();
for (ColumnInfo column : columns) {
String columnName = column.getTabAlias() + "." + column.getInternalName();
if (columnName.indexOf("$$") < 0) {
LogicalVariable var = getVariable(columnName, column.getType());
column.setInternalName(var.toString());
}
}
}
@Override
public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) {
//printOperatorSchema(operator);
List<ColumnInfo> columns = operator.getSchema().getSignature();
if (variables.size() != columns.size()) {
throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: "
+ variables.size() + " expected " + columns.size());
}
for (int i = 0; i < variables.size(); i++) {
LogicalVariable var = variables.get(i);
ColumnInfo column = columns.get(i);
String fieldName = column.getTabAlias() + "." + column.getInternalName();
if (fieldName.indexOf("$$") < 0) {
updateVariable(fieldName, var);
column.setInternalName(var.toString());
}
}
//printOperatorSchema(operator);
}
/**
* rewrite an expression and substitute variables
*
* @param expr
* hive expression
*/
public void rewriteExpression(ExprNodeDesc expr) {
if (expr instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
String fieldName = desc.getTabAlias() + "." + desc.getColumn();
if (fieldName.indexOf("$$") < 0) {
LogicalVariable var = getVariableOnly(fieldName);
if (var == null) {
fieldName = "." + desc.getColumn();
var = getVariableOnly(fieldName);
if (var == null) {
fieldName = "null." + desc.getColumn();
var = getVariableOnly(fieldName);
if (var == null) {
throw new IllegalStateException(fieldName + " is wrong!!! ");
}
}
}
String name = this.logicalVariableToFieldMap.get(var);
var = getVariableOnly(name);
desc.setColumn(var.toString());
}
} else {
if (expr.getChildren() != null && expr.getChildren().size() > 0) {
List<ExprNodeDesc> children = expr.getChildren();
for (ExprNodeDesc desc : children)
rewriteExpression(desc);
}
}
}
/**
* rewrite an expression and substitute variables
*
* @param expr
* hive expression
*/
public void rewriteExpressionPartial(ExprNodeDesc expr) {
if (expr instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
String fieldName = desc.getTabAlias() + "." + desc.getColumn();
if (fieldName.indexOf("$$") < 0) {
LogicalVariable var = getVariableOnly(fieldName);
desc.setColumn(var.toString());
}
} else {
if (expr.getChildren() != null && expr.getChildren().size() > 0) {
List<ExprNodeDesc> children = expr.getChildren();
for (ExprNodeDesc desc : children)
rewriteExpressionPartial(desc);
}
}
}
// private void printOperatorSchema(Operator operator) {
// // System.out.println(operator.getName());
// // List<ColumnInfo> columns = operator.getSchema().getSignature();
// // for (ColumnInfo column : columns) {
// // System.out.print(column.getTabAlias() + "." +
// // column.getInternalName() + " ");
// // }
// // System.out.println();
// }
/**
* translate scalar function expression
*
* @param hiveExpr
* @return
*/
public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) {
ILogicalExpression AlgebricksExpr;
if (hiveExpr instanceof ExprNodeGenericFuncDesc) {
List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
List<ExprNodeDesc> children = hiveExpr.getChildren();
for (ExprNodeDesc child : children)
arguments.add(translateScalarFucntion(child));
ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;
GenericUDF genericUdf = funcExpr.getGenericUDF();
UDF udf = null;
if (genericUdf instanceof GenericUDFBridge) {
GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;
try {
udf = bridge.getUdfClass().newInstance();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* set up the hive function
*/
Object hiveFunction = genericUdf;
if (udf != null)
hiveFunction = udf;
FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction
.getClass());
if (funcId == null) {
funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName());
}
Object functionInfo = null;
if (genericUdf instanceof GenericUDFBridge) {
functionInfo = funcExpr;
}
/**
* generate the function call expression
*/
ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
funcId, functionInfo), arguments);
AlgebricksExpr = AlgebricksFuncExpr;
} else if (hiveExpr instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;
LogicalVariable var = this.getVariable(column.getColumn());
AlgebricksExpr = new VariableReferenceExpression(var);
} else if (hiveExpr instanceof ExprNodeFieldDesc) {
FunctionIdentifier funcId;
funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS);
ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
funcId, hiveExpr));
AlgebricksExpr = AlgebricksFuncExpr;
} else if (hiveExpr instanceof ExprNodeConstantDesc) {
ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;
Object value = hiveConst.getValue();
AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value));
} else if (hiveExpr instanceof ExprNodeNullDesc) {
FunctionIdentifier funcId;
funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL);
ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
funcId, hiveExpr));
AlgebricksExpr = AlgebricksFuncExpr;
} else {
throw new IllegalStateException("unknown hive expression");
}
return new MutableObject<ILogicalExpression>(AlgebricksExpr);
}
/**
* translate aggregation function expression
*
* @param aggregateDesc
* @return
*/
public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) {
String UDAFName = aggregateDesc.getGenericUDAFName();
List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
List<ExprNodeDesc> children = aggregateDesc.getParameters();
for (ExprNodeDesc child : children)
arguments.add(translateScalarFucntion(child));
FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "("
+ aggregateDesc.getMode() + ")");
HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);
AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false,
arguments);
return new MutableObject<ILogicalExpression>(aggregationExpression);
}
/**
* translate aggregation function expression
*
* @param aggregator
* @return
*/
public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {
String UDTFName = udtfDesc.getUDTFName();
FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName);
UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo(
funcId, udtfDesc));
unnestingExpression.getArguments().add(argument);
return new MutableObject<ILogicalExpression>(unnestingExpression);
}
/**
* get typeinfo
*/
@Override
public TypeInfo getType(LogicalVariable var) {
return variableToType.get(var);
}
/**
* get variable from variable name
*/
@Override
public LogicalVariable getVariable(String name) {
return nameToLogicalVariableMap.get(name);
}
@Override
public LogicalVariable getVariableFromFieldName(String fieldName) {
return this.getVariableOnly(fieldName);
}
/**
* set the metadata provider
*/
@Override
public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) {
this.metaData = metadata;
}
/**
* insert ReplicateOperator when necessary
*/
private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {
Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
buildChildToParentsMapping(roots, childToParentsMap);
for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) {
List<Mutable<ILogicalOperator>> pList = entry.getValue();
if (pList.size() > 1) {
ILogicalOperator rop = new ReplicateOperator(pList.size());
Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
Mutable<ILogicalOperator> childRef = entry.getKey();
rop.getInputs().add(childRef);
for (Mutable<ILogicalOperator> parentRef : pList) {
ILogicalOperator parentOp = parentRef.getValue();
int index = parentOp.getInputs().indexOf(childRef);
parentOp.getInputs().set(index, ropRef);
}
}
}
}
/**
* build the mapping from child to Parents
*
* @param roots
* @param childToParentsMap
*/
private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots,
Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {
for (Mutable<ILogicalOperator> opRef : roots) {
List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs();
for (Mutable<ILogicalOperator> childRef : childRefs) {
List<Mutable<ILogicalOperator>> parentList = map.get(childRef);
if (parentList == null) {
parentList = new ArrayList<Mutable<ILogicalOperator>>();
map.put(childRef, parentList);
}
if (!parentList.contains(opRef))
parentList.add(opRef);
}
buildChildToParentsMapping(childRefs, map);
}
}
}