package edu.uci.ics.hivesterix.logical.plan; | |
import java.io.OutputStreamWriter; | |
import java.io.PrintWriter; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.Set; | |
import org.apache.commons.lang3.mutable.Mutable; | |
import org.apache.commons.lang3.mutable.MutableObject; | |
import org.apache.hadoop.hive.ql.exec.ColumnInfo; | |
import org.apache.hadoop.hive.ql.exec.ExtractOperator; | |
import org.apache.hadoop.hive.ql.exec.FileSinkOperator; | |
import org.apache.hadoop.hive.ql.exec.FilterOperator; | |
import org.apache.hadoop.hive.ql.exec.GroupByOperator; | |
import org.apache.hadoop.hive.ql.exec.JoinOperator; | |
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; | |
import org.apache.hadoop.hive.ql.exec.LimitOperator; | |
import org.apache.hadoop.hive.ql.exec.MapJoinOperator; | |
import org.apache.hadoop.hive.ql.exec.Operator; | |
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; | |
import org.apache.hadoop.hive.ql.exec.SelectOperator; | |
import org.apache.hadoop.hive.ql.exec.TableScanOperator; | |
import org.apache.hadoop.hive.ql.exec.UDF; | |
import org.apache.hadoop.hive.ql.exec.UDTFOperator; | |
import org.apache.hadoop.hive.ql.exec.UnionOperator; | |
import org.apache.hadoop.hive.ql.plan.AggregationDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; | |
import org.apache.hadoop.hive.ql.plan.PartitionDesc; | |
import org.apache.hadoop.hive.ql.plan.UDTFDesc; | |
import org.apache.hadoop.hive.ql.plan.api.OperatorType; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; | |
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant; | |
import edu.uci.ics.hivesterix.logical.expression.HiveAlgebricksBuiltInFunctionMap; | |
import edu.uci.ics.hivesterix.logical.expression.HiveFunctionInfo; | |
import edu.uci.ics.hivesterix.logical.expression.HivesterixConstantValue; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.ExtractVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.FilterVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.GroupByVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.JoinVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.LateralViewJoinVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.LimitVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.MapJoinVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.ProjectVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.SortVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.TableScanWriteVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.UnionVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Visitor; | |
import edu.uci.ics.hivesterix.runtime.jobgen.Schema; | |
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter; | |
@SuppressWarnings("rawtypes") | |
public class HiveAlgebricksTranslator implements Translator { | |
private int currentVariable = 0; | |
private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>(); | |
private boolean continueTraverse = true; | |
private IMetadataProvider<PartitionDesc, Object> metaData; | |
/** | |
* map variable name to the logical variable | |
*/ | |
private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>(); | |
/** | |
* map field name to LogicalVariable | |
*/ | |
private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>(); | |
/** | |
* map logical variable to name | |
*/ | |
private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>(); | |
/** | |
* asterix root operators | |
*/ | |
private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>(); | |
/** | |
* a list of visitors | |
*/ | |
private List<Visitor> visitors = new ArrayList<Visitor>(); | |
/** | |
* output writer to print things out | |
*/ | |
private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out)); | |
/** | |
* map a logical variable to type info | |
*/ | |
private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>(); | |
@Override | |
public LogicalVariable getVariable(String fieldName, TypeInfo type) { | |
LogicalVariable var = fieldToLogicalVariableMap.get(fieldName); | |
if (var == null) { | |
currentVariable++; | |
var = new LogicalVariable(currentVariable); | |
fieldToLogicalVariableMap.put(fieldName, var); | |
nameToLogicalVariableMap.put(var.toString(), var); | |
variableToType.put(var, type); | |
logicalVariableToFieldMap.put(var, fieldName); | |
} | |
return var; | |
} | |
@Override | |
public LogicalVariable getNewVariable(String fieldName, TypeInfo type) { | |
currentVariable++; | |
LogicalVariable var = new LogicalVariable(currentVariable); | |
fieldToLogicalVariableMap.put(fieldName, var); | |
nameToLogicalVariableMap.put(var.toString(), var); | |
variableToType.put(var, type); | |
logicalVariableToFieldMap.put(var, fieldName); | |
return var; | |
} | |
@Override | |
public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) { | |
String name = this.logicalVariableToFieldMap.get(oldVar); | |
if (name != null) { | |
fieldToLogicalVariableMap.put(name, newVar); | |
nameToLogicalVariableMap.put(newVar.toString(), newVar); | |
nameToLogicalVariableMap.put(oldVar.toString(), newVar); | |
logicalVariableToFieldMap.put(newVar, name); | |
} | |
} | |
@Override | |
public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() { | |
return metaData; | |
} | |
/** | |
* only get an variable, without rewriting it | |
* | |
* @param fieldName | |
* @return | |
*/ | |
private LogicalVariable getVariableOnly(String fieldName) { | |
return fieldToLogicalVariableMap.get(fieldName); | |
} | |
private void updateVariable(String fieldName, LogicalVariable variable) { | |
LogicalVariable var = fieldToLogicalVariableMap.get(fieldName); | |
if (var == null) { | |
fieldToLogicalVariableMap.put(fieldName, variable); | |
nameToLogicalVariableMap.put(fieldName, variable); | |
} else if (!var.equals(variable)) { | |
fieldToLogicalVariableMap.put(fieldName, variable); | |
nameToLogicalVariableMap.put(fieldName, variable); | |
} | |
} | |
/** | |
* get a list of logical variables from the schema | |
* | |
* @param schema | |
* @return | |
*/ | |
@Override | |
public List<LogicalVariable> getVariablesFromSchema(Schema schema) { | |
List<LogicalVariable> variables = new ArrayList<LogicalVariable>(); | |
List<String> names = schema.getNames(); | |
for (String name : names) | |
variables.add(nameToLogicalVariableMap.get(name)); | |
return variables; | |
} | |
/** | |
* get variable to typeinfo map | |
* | |
* @return | |
*/ | |
public HashMap<LogicalVariable, TypeInfo> getVariableContext() { | |
return this.variableToType; | |
} | |
/** | |
* get the number of variables s | |
* | |
* @return | |
*/ | |
public int getVariableCounter() { | |
return currentVariable + 1; | |
} | |
/** | |
* translate from hive operator tree to asterix operator tree | |
* | |
* @param hive | |
* roots | |
* @return Algebricks roots | |
*/ | |
public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator, | |
HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException { | |
/** | |
* register visitors | |
*/ | |
visitors.add(new FilterVisitor()); | |
visitors.add(new GroupByVisitor()); | |
visitors.add(new JoinVisitor()); | |
visitors.add(new LateralViewJoinVisitor()); | |
visitors.add(new UnionVisitor()); | |
visitors.add(new LimitVisitor()); | |
visitors.add(new MapJoinVisitor()); | |
visitors.add(new ProjectVisitor()); | |
visitors.add(new SortVisitor()); | |
visitors.add(new ExtractVisitor()); | |
visitors.add(new TableScanWriteVisitor(aliasToPathMap)); | |
List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>( | |
parentOperator)); | |
insertReplicateOperator(refList); | |
if (refList != null) | |
rootOperators.addAll(refList); | |
} | |
/** | |
* translate operator DAG | |
* | |
* @param hiveRoot | |
* @param AlgebricksParentOperator | |
* @return | |
*/ | |
private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot, | |
Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException { | |
for (Operator hiveOperator : hiveRoot) { | |
continueTraverse = true; | |
Mutable<ILogicalOperator> currentOperatorRef = null; | |
if (hiveOperator.getType() == OperatorType.FILTER) { | |
FilterOperator fop = (FilterOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.REDUCESINK) { | |
ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.JOIN) { | |
JoinOperator fop = (JoinOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) { | |
continueTraverse = true; | |
break; | |
} else | |
continueTraverse = false; | |
} | |
if (currentOperatorRef == null) | |
return null; | |
} else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) { | |
LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
if (currentOperatorRef == null) | |
return null; | |
} else if (hiveOperator.getType() == OperatorType.MAPJOIN) { | |
MapJoinOperator fop = (MapJoinOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) { | |
continueTraverse = true; | |
break; | |
} else | |
continueTraverse = false; | |
} | |
if (currentOperatorRef == null) | |
return null; | |
} else if (hiveOperator.getType() == OperatorType.SELECT) { | |
SelectOperator fop = (SelectOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.EXTRACT) { | |
ExtractOperator fop = (ExtractOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.GROUPBY) { | |
GroupByOperator fop = (GroupByOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.TABLESCAN) { | |
TableScanOperator fop = (TableScanOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.FILESINK) { | |
FileSinkOperator fop = (FileSinkOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.LIMIT) { | |
LimitOperator lop = (LimitOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.UDTF) { | |
UDTFOperator lop = (UDTFOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) | |
break; | |
} | |
} else if (hiveOperator.getType() == OperatorType.UNION) { | |
UnionOperator lop = (UnionOperator) hiveOperator; | |
for (Visitor visitor : visitors) { | |
currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this); | |
if (currentOperatorRef != null) { | |
continueTraverse = true; | |
break; | |
} else | |
continueTraverse = false; | |
} | |
} else | |
; | |
if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0 | |
&& continueTraverse) { | |
@SuppressWarnings("unchecked") | |
List<Operator> children = hiveOperator.getChildOperators(); | |
if (currentOperatorRef == null) | |
currentOperatorRef = AlgebricksParentOperator; | |
translate(children, currentOperatorRef); | |
} | |
if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0) | |
logicalOp.add(currentOperatorRef); | |
} | |
return logicalOp; | |
} | |
/** | |
* used in select, group by to get no-column-expression columns | |
* | |
* @param cols | |
* @return | |
*/ | |
public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols, | |
ArrayList<LogicalVariable> variables) { | |
ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>(); | |
/** | |
* variables to be appended in the assign operator | |
*/ | |
ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>(); | |
// one variable can only be assigned once | |
for (ExprNodeDesc hiveExpr : cols) { | |
rewriteExpression(hiveExpr); | |
if (hiveExpr instanceof ExprNodeColumnDesc) { | |
ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr; | |
String fieldName = desc2.getTabAlias() + "." + desc2.getColumn(); | |
// System.out.println("project expr: " + fieldName); | |
if (fieldName.indexOf("$$") < 0) { | |
LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo()); | |
desc2.setColumn(var.toString()); | |
desc2.setTabAlias(""); | |
variables.add(var); | |
} else { | |
LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn()); | |
String name = this.logicalVariableToFieldMap.get(var); | |
var = this.getVariableOnly(name); | |
variables.add(var); | |
} | |
} else { | |
Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr); | |
expressions.add(asterixExpr); | |
LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(), | |
hiveExpr.getTypeInfo()); | |
variables.add(var); | |
appendedVariables.add(var); | |
} | |
} | |
/** | |
* create an assign operator to deal with appending | |
*/ | |
ILogicalOperator assignOp = null; | |
if (appendedVariables.size() > 0) { | |
assignOp = new AssignOperator(appendedVariables, expressions); | |
assignOp.getInputs().add(parent); | |
} | |
return assignOp; | |
} | |
private ILogicalPlan plan; | |
public ILogicalPlan genLogicalPlan() { | |
plan = new ALogicalPlanImpl(rootOperators); | |
return plan; | |
} | |
public void printOperators() throws AlgebricksException { | |
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(); | |
StringBuilder buffer = new StringBuilder(); | |
PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0); | |
outputWriter.println(buffer); | |
outputWriter.println("rewritten variables: "); | |
outputWriter.flush(); | |
printVariables(); | |
} | |
public static void setOutputPrinter(PrintWriter writer) { | |
outputWriter = writer; | |
} | |
private void printVariables() { | |
Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet(); | |
for (Entry<String, LogicalVariable> entry : entries) { | |
outputWriter.println(entry.getKey() + " -> " + entry.getValue()); | |
} | |
outputWriter.flush(); | |
} | |
/** | |
* generate the object inspector for the output of an operator | |
* | |
* @param operator | |
* The Hive operator | |
* @return an ObjectInspector object | |
*/ | |
public Schema generateInputSchema(Operator operator) { | |
List<String> variableNames = new ArrayList<String>(); | |
List<TypeInfo> typeList = new ArrayList<TypeInfo>(); | |
List<ColumnInfo> columns = operator.getSchema().getSignature(); | |
for (ColumnInfo col : columns) { | |
// typeList.add(); | |
TypeInfo type = col.getType(); | |
typeList.add(type); | |
String fieldName = col.getInternalName(); | |
variableNames.add(fieldName); | |
} | |
return new Schema(variableNames, typeList); | |
} | |
/** | |
* rewrite the names of output columns for feature expression evaluators to | |
* use | |
* | |
* @param operator | |
*/ | |
public void rewriteOperatorOutputSchema(Operator operator) { | |
List<ColumnInfo> columns = operator.getSchema().getSignature(); | |
for (ColumnInfo column : columns) { | |
String columnName = column.getTabAlias() + "." + column.getInternalName(); | |
if (columnName.indexOf("$$") < 0) { | |
LogicalVariable var = getVariable(columnName, column.getType()); | |
column.setInternalName(var.toString()); | |
} | |
} | |
} | |
@Override | |
public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) { | |
//printOperatorSchema(operator); | |
List<ColumnInfo> columns = operator.getSchema().getSignature(); | |
if (variables.size() != columns.size()) { | |
throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: " | |
+ variables.size() + " expected " + columns.size()); | |
} | |
for (int i = 0; i < variables.size(); i++) { | |
LogicalVariable var = variables.get(i); | |
ColumnInfo column = columns.get(i); | |
String fieldName = column.getTabAlias() + "." + column.getInternalName(); | |
if (fieldName.indexOf("$$") < 0) { | |
updateVariable(fieldName, var); | |
column.setInternalName(var.toString()); | |
} | |
} | |
//printOperatorSchema(operator); | |
} | |
/** | |
* rewrite an expression and substitute variables | |
* | |
* @param expr | |
* hive expression | |
*/ | |
public void rewriteExpression(ExprNodeDesc expr) { | |
if (expr instanceof ExprNodeColumnDesc) { | |
ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr; | |
String fieldName = desc.getTabAlias() + "." + desc.getColumn(); | |
if (fieldName.indexOf("$$") < 0) { | |
LogicalVariable var = getVariableOnly(fieldName); | |
if (var == null) { | |
fieldName = "." + desc.getColumn(); | |
var = getVariableOnly(fieldName); | |
if (var == null) { | |
fieldName = "null." + desc.getColumn(); | |
var = getVariableOnly(fieldName); | |
if (var == null) { | |
throw new IllegalStateException(fieldName + " is wrong!!! "); | |
} | |
} | |
} | |
String name = this.logicalVariableToFieldMap.get(var); | |
var = getVariableOnly(name); | |
desc.setColumn(var.toString()); | |
} | |
} else { | |
if (expr.getChildren() != null && expr.getChildren().size() > 0) { | |
List<ExprNodeDesc> children = expr.getChildren(); | |
for (ExprNodeDesc desc : children) | |
rewriteExpression(desc); | |
} | |
} | |
} | |
/** | |
* rewrite an expression and substitute variables | |
* | |
* @param expr | |
* hive expression | |
*/ | |
public void rewriteExpressionPartial(ExprNodeDesc expr) { | |
if (expr instanceof ExprNodeColumnDesc) { | |
ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr; | |
String fieldName = desc.getTabAlias() + "." + desc.getColumn(); | |
if (fieldName.indexOf("$$") < 0) { | |
LogicalVariable var = getVariableOnly(fieldName); | |
desc.setColumn(var.toString()); | |
} | |
} else { | |
if (expr.getChildren() != null && expr.getChildren().size() > 0) { | |
List<ExprNodeDesc> children = expr.getChildren(); | |
for (ExprNodeDesc desc : children) | |
rewriteExpressionPartial(desc); | |
} | |
} | |
} | |
// private void printOperatorSchema(Operator operator) { | |
// // System.out.println(operator.getName()); | |
// // List<ColumnInfo> columns = operator.getSchema().getSignature(); | |
// // for (ColumnInfo column : columns) { | |
// // System.out.print(column.getTabAlias() + "." + | |
// // column.getInternalName() + " "); | |
// // } | |
// // System.out.println(); | |
// } | |
/** | |
* translate scalar function expression | |
* | |
* @param hiveExpr | |
* @return | |
*/ | |
public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) { | |
ILogicalExpression AlgebricksExpr; | |
if (hiveExpr instanceof ExprNodeGenericFuncDesc) { | |
List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>(); | |
List<ExprNodeDesc> children = hiveExpr.getChildren(); | |
for (ExprNodeDesc child : children) | |
arguments.add(translateScalarFucntion(child)); | |
ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr; | |
GenericUDF genericUdf = funcExpr.getGenericUDF(); | |
UDF udf = null; | |
if (genericUdf instanceof GenericUDFBridge) { | |
GenericUDFBridge bridge = (GenericUDFBridge) genericUdf; | |
try { | |
udf = bridge.getUdfClass().newInstance(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* set up the hive function | |
*/ | |
Object hiveFunction = genericUdf; | |
if (udf != null) | |
hiveFunction = udf; | |
FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction | |
.getClass()); | |
if (funcId == null) { | |
funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName()); | |
} | |
Object functionInfo = null; | |
if (genericUdf instanceof GenericUDFBridge) { | |
functionInfo = funcExpr; | |
} | |
/** | |
* generate the function call expression | |
*/ | |
ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo( | |
funcId, functionInfo), arguments); | |
AlgebricksExpr = AlgebricksFuncExpr; | |
} else if (hiveExpr instanceof ExprNodeColumnDesc) { | |
ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr; | |
LogicalVariable var = this.getVariable(column.getColumn()); | |
AlgebricksExpr = new VariableReferenceExpression(var); | |
} else if (hiveExpr instanceof ExprNodeFieldDesc) { | |
FunctionIdentifier funcId; | |
funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS); | |
ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo( | |
funcId, hiveExpr)); | |
AlgebricksExpr = AlgebricksFuncExpr; | |
} else if (hiveExpr instanceof ExprNodeConstantDesc) { | |
ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr; | |
Object value = hiveConst.getValue(); | |
AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value)); | |
} else if (hiveExpr instanceof ExprNodeNullDesc) { | |
FunctionIdentifier funcId; | |
funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL); | |
ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo( | |
funcId, hiveExpr)); | |
AlgebricksExpr = AlgebricksFuncExpr; | |
} else { | |
throw new IllegalStateException("unknown hive expression"); | |
} | |
return new MutableObject<ILogicalExpression>(AlgebricksExpr); | |
} | |
/** | |
* translate aggregation function expression | |
* | |
* @param aggregateDesc | |
* @return | |
*/ | |
public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) { | |
String UDAFName = aggregateDesc.getGenericUDAFName(); | |
List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>(); | |
List<ExprNodeDesc> children = aggregateDesc.getParameters(); | |
for (ExprNodeDesc child : children) | |
arguments.add(translateScalarFucntion(child)); | |
FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "(" | |
+ aggregateDesc.getMode() + ")"); | |
HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc); | |
AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false, | |
arguments); | |
return new MutableObject<ILogicalExpression>(aggregationExpression); | |
} | |
/** | |
* translate aggregation function expression | |
* | |
* @param aggregator | |
* @return | |
*/ | |
public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) { | |
String UDTFName = udtfDesc.getUDTFName(); | |
FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName); | |
UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo( | |
funcId, udtfDesc)); | |
unnestingExpression.getArguments().add(argument); | |
return new MutableObject<ILogicalExpression>(unnestingExpression); | |
} | |
/** | |
* get typeinfo | |
*/ | |
@Override | |
public TypeInfo getType(LogicalVariable var) { | |
return variableToType.get(var); | |
} | |
/** | |
* get variable from variable name | |
*/ | |
@Override | |
public LogicalVariable getVariable(String name) { | |
return nameToLogicalVariableMap.get(name); | |
} | |
@Override | |
public LogicalVariable getVariableFromFieldName(String fieldName) { | |
return this.getVariableOnly(fieldName); | |
} | |
/** | |
* set the metadata provider | |
*/ | |
@Override | |
public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) { | |
this.metaData = metadata; | |
} | |
/** | |
* insert ReplicateOperator when necessary | |
*/ | |
private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) { | |
Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>(); | |
buildChildToParentsMapping(roots, childToParentsMap); | |
for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) { | |
List<Mutable<ILogicalOperator>> pList = entry.getValue(); | |
if (pList.size() > 1) { | |
ILogicalOperator rop = new ReplicateOperator(pList.size()); | |
Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop); | |
Mutable<ILogicalOperator> childRef = entry.getKey(); | |
rop.getInputs().add(childRef); | |
for (Mutable<ILogicalOperator> parentRef : pList) { | |
ILogicalOperator parentOp = parentRef.getValue(); | |
int index = parentOp.getInputs().indexOf(childRef); | |
parentOp.getInputs().set(index, ropRef); | |
} | |
} | |
} | |
} | |
/** | |
* build the mapping from child to Parents | |
* | |
* @param roots | |
* @param childToParentsMap | |
*/ | |
private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots, | |
Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) { | |
for (Mutable<ILogicalOperator> opRef : roots) { | |
List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs(); | |
for (Mutable<ILogicalOperator> childRef : childRefs) { | |
List<Mutable<ILogicalOperator>> parentList = map.get(childRef); | |
if (parentList == null) { | |
parentList = new ArrayList<Mutable<ILogicalOperator>>(); | |
map.put(childRef, parentList); | |
} | |
if (!parentList.contains(opRef)) | |
parentList.add(opRef); | |
} | |
buildChildToParentsMapping(childRefs, map); | |
} | |
} | |
} |