| /* |
| * Copyright 2009-2013 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hivesterix.logical.plan;
|
|
|
| import java.io.OutputStreamWriter;
|
| import java.io.PrintWriter;
|
| import java.util.ArrayList;
|
| import java.util.HashMap;
|
| import java.util.List;
|
| import java.util.Map;
|
| import java.util.Map.Entry;
|
| import java.util.Set;
|
|
|
| import org.apache.commons.lang3.mutable.Mutable;
|
| import org.apache.commons.lang3.mutable.MutableObject;
|
| import org.apache.hadoop.hive.ql.exec.ColumnInfo;
|
| import org.apache.hadoop.hive.ql.exec.ExtractOperator;
|
| import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
|
| import org.apache.hadoop.hive.ql.exec.FilterOperator;
|
| import org.apache.hadoop.hive.ql.exec.GroupByOperator;
|
| import org.apache.hadoop.hive.ql.exec.JoinOperator;
|
| import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
|
| import org.apache.hadoop.hive.ql.exec.LimitOperator;
|
| import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
|
| import org.apache.hadoop.hive.ql.exec.Operator;
|
| import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
|
| import org.apache.hadoop.hive.ql.exec.SelectOperator;
|
| import org.apache.hadoop.hive.ql.exec.TableScanOperator;
|
| import org.apache.hadoop.hive.ql.exec.UDF;
|
| import org.apache.hadoop.hive.ql.exec.UDTFOperator;
|
| import org.apache.hadoop.hive.ql.exec.UnionOperator;
|
| import org.apache.hadoop.hive.ql.plan.AggregationDesc;
|
| import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
|
| import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
|
| import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
|
| import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
|
| import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
|
| import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
|
| import org.apache.hadoop.hive.ql.plan.PartitionDesc;
|
| import org.apache.hadoop.hive.ql.plan.UDTFDesc;
|
| import org.apache.hadoop.hive.ql.plan.api.OperatorType;
|
| import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
|
| import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
|
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
|
|
|
| import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant;
|
| import edu.uci.ics.hivesterix.logical.expression.HiveAlgebricksBuiltInFunctionMap;
|
| import edu.uci.ics.hivesterix.logical.expression.HiveFunctionInfo;
|
| import edu.uci.ics.hivesterix.logical.expression.HivesterixConstantValue;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.ExtractVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.FilterVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.GroupByVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.JoinVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.LateralViewJoinVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.LimitVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.MapJoinVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.ProjectVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.SortVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.TableScanWriteVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.UnionVisitor;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
|
| import edu.uci.ics.hivesterix.logical.plan.visitor.base.Visitor;
|
| import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
|
| import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
|
|
|
| @SuppressWarnings("rawtypes")
|
| public class HiveAlgebricksTranslator implements Translator {
|
|
|
| private int currentVariable = 0;
|
|
|
| private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();
|
|
|
| private boolean continueTraverse = true;
|
|
|
| private IMetadataProvider<PartitionDesc, Object> metaData;
|
|
|
| /**
|
| * map variable name to the logical variable
|
| */
|
| private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();
|
|
|
| /**
|
| * map field name to LogicalVariable
|
| */
|
| private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();
|
|
|
| /**
|
| * map logical variable to name
|
| */
|
| private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();
|
|
|
| /**
|
| * asterix root operators
|
| */
|
| private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();
|
|
|
| /**
|
| * a list of visitors
|
| */
|
| private List<Visitor> visitors = new ArrayList<Visitor>();
|
|
|
| /**
|
| * output writer to print things out
|
| */
|
| private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out));
|
|
|
| /**
|
| * map a logical variable to type info
|
| */
|
| private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();
|
|
|
| @Override
|
| public LogicalVariable getVariable(String fieldName, TypeInfo type) {
|
| LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
|
| if (var == null) {
|
| currentVariable++;
|
| var = new LogicalVariable(currentVariable);
|
| fieldToLogicalVariableMap.put(fieldName, var);
|
| nameToLogicalVariableMap.put(var.toString(), var);
|
| variableToType.put(var, type);
|
| logicalVariableToFieldMap.put(var, fieldName);
|
| }
|
| return var;
|
| }
|
|
|
| @Override
|
| public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {
|
| currentVariable++;
|
| LogicalVariable var = new LogicalVariable(currentVariable);
|
| fieldToLogicalVariableMap.put(fieldName, var);
|
| nameToLogicalVariableMap.put(var.toString(), var);
|
| variableToType.put(var, type);
|
| logicalVariableToFieldMap.put(var, fieldName);
|
| return var;
|
| }
|
|
|
| @Override
|
| public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {
|
| String name = this.logicalVariableToFieldMap.get(oldVar);
|
| if (name != null) {
|
| fieldToLogicalVariableMap.put(name, newVar);
|
| nameToLogicalVariableMap.put(newVar.toString(), newVar);
|
| nameToLogicalVariableMap.put(oldVar.toString(), newVar);
|
| logicalVariableToFieldMap.put(newVar, name);
|
| }
|
| }
|
|
|
| @Override
|
| public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {
|
| return metaData;
|
| }
|
|
|
| /**
|
| * only get an variable, without rewriting it
|
| *
|
| * @param fieldName
|
| * @return
|
| */
|
| private LogicalVariable getVariableOnly(String fieldName) {
|
| return fieldToLogicalVariableMap.get(fieldName);
|
| }
|
|
|
| private void updateVariable(String fieldName, LogicalVariable variable) {
|
| LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
|
| if (var == null) {
|
| fieldToLogicalVariableMap.put(fieldName, variable);
|
| nameToLogicalVariableMap.put(fieldName, variable);
|
| } else if (!var.equals(variable)) {
|
| fieldToLogicalVariableMap.put(fieldName, variable);
|
| nameToLogicalVariableMap.put(fieldName, variable);
|
| }
|
| }
|
|
|
| /**
|
| * get a list of logical variables from the schema
|
| *
|
| * @param schema
|
| * @return
|
| */
|
| @Override
|
| public List<LogicalVariable> getVariablesFromSchema(Schema schema) {
|
| List<LogicalVariable> variables = new ArrayList<LogicalVariable>();
|
| List<String> names = schema.getNames();
|
|
|
| for (String name : names)
|
| variables.add(nameToLogicalVariableMap.get(name));
|
| return variables;
|
| }
|
|
|
| /**
|
| * get variable to typeinfo map
|
| *
|
| * @return
|
| */
|
| public HashMap<LogicalVariable, TypeInfo> getVariableContext() {
|
| return this.variableToType;
|
| }
|
|
|
| /**
|
| * get the number of variables s
|
| *
|
| * @return
|
| */
|
| public int getVariableCounter() {
|
| return currentVariable + 1;
|
| }
|
|
|
| /**
|
| * translate from hive operator tree to asterix operator tree
|
| *
|
| * @param hive
|
| * roots
|
| * @return Algebricks roots
|
| */
|
| public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator,
|
| HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException {
|
| /**
|
| * register visitors
|
| */
|
| visitors.add(new FilterVisitor());
|
| visitors.add(new GroupByVisitor());
|
| visitors.add(new JoinVisitor());
|
| visitors.add(new LateralViewJoinVisitor());
|
| visitors.add(new UnionVisitor());
|
| visitors.add(new LimitVisitor());
|
| visitors.add(new MapJoinVisitor());
|
| visitors.add(new ProjectVisitor());
|
| visitors.add(new SortVisitor());
|
| visitors.add(new ExtractVisitor());
|
| visitors.add(new TableScanWriteVisitor(aliasToPathMap));
|
|
|
| List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>(
|
| parentOperator));
|
| insertReplicateOperator(refList);
|
| if (refList != null)
|
| rootOperators.addAll(refList);
|
| }
|
|
|
| /**
|
| * translate operator DAG
|
| *
|
| * @param hiveRoot
|
| * @param AlgebricksParentOperator
|
| * @return
|
| */
|
| private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,
|
| Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException {
|
|
|
| for (Operator hiveOperator : hiveRoot) {
|
| continueTraverse = true;
|
| Mutable<ILogicalOperator> currentOperatorRef = null;
|
| if (hiveOperator.getType() == OperatorType.FILTER) {
|
| FilterOperator fop = (FilterOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.REDUCESINK) {
|
| ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.JOIN) {
|
| JoinOperator fop = (JoinOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null) {
|
| continueTraverse = true;
|
| break;
|
| } else
|
| continueTraverse = false;
|
| }
|
| if (currentOperatorRef == null)
|
| return null;
|
| } else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {
|
| LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| if (currentOperatorRef == null)
|
| return null;
|
| } else if (hiveOperator.getType() == OperatorType.MAPJOIN) {
|
| MapJoinOperator fop = (MapJoinOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null) {
|
| continueTraverse = true;
|
| break;
|
| } else
|
| continueTraverse = false;
|
| }
|
| if (currentOperatorRef == null)
|
| return null;
|
| } else if (hiveOperator.getType() == OperatorType.SELECT) {
|
| SelectOperator fop = (SelectOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.EXTRACT) {
|
| ExtractOperator fop = (ExtractOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.GROUPBY) {
|
| GroupByOperator fop = (GroupByOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.TABLESCAN) {
|
| TableScanOperator fop = (TableScanOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.FILESINK) {
|
| FileSinkOperator fop = (FileSinkOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.LIMIT) {
|
| LimitOperator lop = (LimitOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.UDTF) {
|
| UDTFOperator lop = (UDTFOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null)
|
| break;
|
| }
|
| } else if (hiveOperator.getType() == OperatorType.UNION) {
|
| UnionOperator lop = (UnionOperator) hiveOperator;
|
| for (Visitor visitor : visitors) {
|
| currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
|
| if (currentOperatorRef != null) {
|
| continueTraverse = true;
|
| break;
|
| } else
|
| continueTraverse = false;
|
| }
|
| } else
|
| ;
|
| if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0
|
| && continueTraverse) {
|
| @SuppressWarnings("unchecked")
|
| List<Operator> children = hiveOperator.getChildOperators();
|
| if (currentOperatorRef == null)
|
| currentOperatorRef = AlgebricksParentOperator;
|
| translate(children, currentOperatorRef);
|
| }
|
| if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0)
|
| logicalOp.add(currentOperatorRef);
|
| }
|
| return logicalOp;
|
| }
|
|
|
| /**
|
| * used in select, group by to get no-column-expression columns
|
| *
|
| * @param cols
|
| * @return
|
| */
|
| public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,
|
| ArrayList<LogicalVariable> variables) {
|
|
|
| ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
|
|
|
| /**
|
| * variables to be appended in the assign operator
|
| */
|
| ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();
|
|
|
| // one variable can only be assigned once
|
| for (ExprNodeDesc hiveExpr : cols) {
|
| rewriteExpression(hiveExpr);
|
|
|
| if (hiveExpr instanceof ExprNodeColumnDesc) {
|
| ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;
|
| String fieldName = desc2.getTabAlias() + "." + desc2.getColumn();
|
|
|
| // System.out.println("project expr: " + fieldName);
|
|
|
| if (fieldName.indexOf("$$") < 0) {
|
| LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo());
|
| desc2.setColumn(var.toString());
|
| desc2.setTabAlias("");
|
| variables.add(var);
|
| } else {
|
| LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn());
|
| String name = this.logicalVariableToFieldMap.get(var);
|
| var = this.getVariableOnly(name);
|
| variables.add(var);
|
| }
|
| } else {
|
| Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);
|
| expressions.add(asterixExpr);
|
| LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(),
|
| hiveExpr.getTypeInfo());
|
| variables.add(var);
|
| appendedVariables.add(var);
|
| }
|
| }
|
|
|
| /**
|
| * create an assign operator to deal with appending
|
| */
|
| ILogicalOperator assignOp = null;
|
| if (appendedVariables.size() > 0) {
|
| assignOp = new AssignOperator(appendedVariables, expressions);
|
| assignOp.getInputs().add(parent);
|
| }
|
| return assignOp;
|
| }
|
|
|
| private ILogicalPlan plan;
|
|
|
| public ILogicalPlan genLogicalPlan() {
|
| plan = new ALogicalPlanImpl(rootOperators);
|
| return plan;
|
| }
|
|
|
| public void printOperators() throws AlgebricksException {
|
| LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
|
| StringBuilder buffer = new StringBuilder();
|
| PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
|
| outputWriter.println(buffer);
|
| outputWriter.println("rewritten variables: ");
|
| outputWriter.flush();
|
| printVariables();
|
|
|
| }
|
|
|
| public static void setOutputPrinter(PrintWriter writer) {
|
| outputWriter = writer;
|
| }
|
|
|
| private void printVariables() {
|
| Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet();
|
|
|
| for (Entry<String, LogicalVariable> entry : entries) {
|
| outputWriter.println(entry.getKey() + " -> " + entry.getValue());
|
| }
|
| outputWriter.flush();
|
| }
|
|
|
| /**
|
| * generate the object inspector for the output of an operator
|
| *
|
| * @param operator
|
| * The Hive operator
|
| * @return an ObjectInspector object
|
| */
|
| public Schema generateInputSchema(Operator operator) {
|
| List<String> variableNames = new ArrayList<String>();
|
| List<TypeInfo> typeList = new ArrayList<TypeInfo>();
|
| List<ColumnInfo> columns = operator.getSchema().getSignature();
|
|
|
| for (ColumnInfo col : columns) {
|
| // typeList.add();
|
| TypeInfo type = col.getType();
|
| typeList.add(type);
|
|
|
| String fieldName = col.getInternalName();
|
| variableNames.add(fieldName);
|
| }
|
|
|
| return new Schema(variableNames, typeList);
|
| }
|
|
|
| /**
|
| * rewrite the names of output columns for feature expression evaluators to
|
| * use
|
| *
|
| * @param operator
|
| */
|
| public void rewriteOperatorOutputSchema(Operator operator) {
|
| List<ColumnInfo> columns = operator.getSchema().getSignature();
|
|
|
| for (ColumnInfo column : columns) {
|
| String columnName = column.getTabAlias() + "." + column.getInternalName();
|
| if (columnName.indexOf("$$") < 0) {
|
| LogicalVariable var = getVariable(columnName, column.getType());
|
| column.setInternalName(var.toString());
|
| }
|
| }
|
| }
|
|
|
| @Override
|
| public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) {
|
|
|
| //printOperatorSchema(operator);
|
| List<ColumnInfo> columns = operator.getSchema().getSignature();
|
| if (variables.size() != columns.size()) {
|
| throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: "
|
| + variables.size() + " expected " + columns.size());
|
| }
|
|
|
| for (int i = 0; i < variables.size(); i++) {
|
| LogicalVariable var = variables.get(i);
|
| ColumnInfo column = columns.get(i);
|
| String fieldName = column.getTabAlias() + "." + column.getInternalName();
|
| if (fieldName.indexOf("$$") < 0) {
|
| updateVariable(fieldName, var);
|
| column.setInternalName(var.toString());
|
| }
|
| }
|
| //printOperatorSchema(operator);
|
| }
|
|
|
| /**
|
| * rewrite an expression and substitute variables
|
| *
|
| * @param expr
|
| * hive expression
|
| */
|
| public void rewriteExpression(ExprNodeDesc expr) {
|
| if (expr instanceof ExprNodeColumnDesc) {
|
| ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
|
| String fieldName = desc.getTabAlias() + "." + desc.getColumn();
|
| if (fieldName.indexOf("$$") < 0) {
|
| LogicalVariable var = getVariableOnly(fieldName);
|
| if (var == null) {
|
| fieldName = "." + desc.getColumn();
|
| var = getVariableOnly(fieldName);
|
| if (var == null) {
|
| fieldName = "null." + desc.getColumn();
|
| var = getVariableOnly(fieldName);
|
| if (var == null) {
|
| throw new IllegalStateException(fieldName + " is wrong!!! ");
|
| }
|
| }
|
| }
|
| String name = this.logicalVariableToFieldMap.get(var);
|
| var = getVariableOnly(name);
|
| desc.setColumn(var.toString());
|
| }
|
| } else {
|
| if (expr.getChildren() != null && expr.getChildren().size() > 0) {
|
| List<ExprNodeDesc> children = expr.getChildren();
|
| for (ExprNodeDesc desc : children)
|
| rewriteExpression(desc);
|
| }
|
| }
|
| }
|
|
|
| /**
|
| * rewrite an expression and substitute variables
|
| *
|
| * @param expr
|
| * hive expression
|
| */
|
| public void rewriteExpressionPartial(ExprNodeDesc expr) {
|
| if (expr instanceof ExprNodeColumnDesc) {
|
| ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
|
| String fieldName = desc.getTabAlias() + "." + desc.getColumn();
|
| if (fieldName.indexOf("$$") < 0) {
|
| LogicalVariable var = getVariableOnly(fieldName);
|
| desc.setColumn(var.toString());
|
| }
|
| } else {
|
| if (expr.getChildren() != null && expr.getChildren().size() > 0) {
|
| List<ExprNodeDesc> children = expr.getChildren();
|
| for (ExprNodeDesc desc : children)
|
| rewriteExpressionPartial(desc);
|
| }
|
| }
|
| }
|
|
|
| // private void printOperatorSchema(Operator operator) {
|
| // // System.out.println(operator.getName());
|
| // // List<ColumnInfo> columns = operator.getSchema().getSignature();
|
| // // for (ColumnInfo column : columns) {
|
| // // System.out.print(column.getTabAlias() + "." +
|
| // // column.getInternalName() + " ");
|
| // // }
|
| // // System.out.println();
|
| // }
|
|
|
| /**
|
| * translate scalar function expression
|
| *
|
| * @param hiveExpr
|
| * @return
|
| */
|
| public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) {
|
| ILogicalExpression AlgebricksExpr;
|
|
|
| if (hiveExpr instanceof ExprNodeGenericFuncDesc) {
|
| List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
|
| List<ExprNodeDesc> children = hiveExpr.getChildren();
|
|
|
| for (ExprNodeDesc child : children)
|
| arguments.add(translateScalarFucntion(child));
|
|
|
| ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;
|
| GenericUDF genericUdf = funcExpr.getGenericUDF();
|
| UDF udf = null;
|
| if (genericUdf instanceof GenericUDFBridge) {
|
| GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;
|
| try {
|
| udf = bridge.getUdfClass().newInstance();
|
| } catch (Exception e) {
|
| e.printStackTrace();
|
| }
|
| }
|
|
|
| /**
|
| * set up the hive function
|
| */
|
| Object hiveFunction = genericUdf;
|
| if (udf != null)
|
| hiveFunction = udf;
|
|
|
| FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction
|
| .getClass());
|
| if (funcId == null) {
|
| funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName());
|
| }
|
|
|
| Object functionInfo = null;
|
| if (genericUdf instanceof GenericUDFBridge) {
|
| functionInfo = funcExpr;
|
| }
|
|
|
| /**
|
| * generate the function call expression
|
| */
|
| ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
|
| funcId, functionInfo), arguments);
|
| AlgebricksExpr = AlgebricksFuncExpr;
|
|
|
| } else if (hiveExpr instanceof ExprNodeColumnDesc) {
|
| ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;
|
| LogicalVariable var = this.getVariable(column.getColumn());
|
| AlgebricksExpr = new VariableReferenceExpression(var);
|
|
|
| } else if (hiveExpr instanceof ExprNodeFieldDesc) {
|
| FunctionIdentifier funcId;
|
| funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS);
|
|
|
| ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
|
| funcId, hiveExpr));
|
| AlgebricksExpr = AlgebricksFuncExpr;
|
| } else if (hiveExpr instanceof ExprNodeConstantDesc) {
|
| ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;
|
| Object value = hiveConst.getValue();
|
| AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value));
|
| } else if (hiveExpr instanceof ExprNodeNullDesc) {
|
| FunctionIdentifier funcId;
|
| funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL);
|
|
|
| ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
|
| funcId, hiveExpr));
|
|
|
| AlgebricksExpr = AlgebricksFuncExpr;
|
| } else {
|
| throw new IllegalStateException("unknown hive expression");
|
| }
|
| return new MutableObject<ILogicalExpression>(AlgebricksExpr);
|
| }
|
|
|
| /**
|
| * translate aggregation function expression
|
| *
|
| * @param aggregateDesc
|
| * @return
|
| */
|
| public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) {
|
|
|
| String UDAFName = aggregateDesc.getGenericUDAFName();
|
|
|
| List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
|
| List<ExprNodeDesc> children = aggregateDesc.getParameters();
|
|
|
| for (ExprNodeDesc child : children)
|
| arguments.add(translateScalarFucntion(child));
|
|
|
| FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "("
|
| + aggregateDesc.getMode() + ")");
|
| HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);
|
| AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false,
|
| arguments);
|
| return new MutableObject<ILogicalExpression>(aggregationExpression);
|
| }
|
|
|
| /**
|
| * translate aggregation function expression
|
| *
|
| * @param aggregator
|
| * @return
|
| */
|
| public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {
|
|
|
| String UDTFName = udtfDesc.getUDTFName();
|
|
|
| FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName);
|
| UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo(
|
| funcId, udtfDesc));
|
| unnestingExpression.getArguments().add(argument);
|
| return new MutableObject<ILogicalExpression>(unnestingExpression);
|
| }
|
|
|
| /**
|
| * get typeinfo
|
| */
|
| @Override
|
| public TypeInfo getType(LogicalVariable var) {
|
| return variableToType.get(var);
|
| }
|
|
|
| /**
|
| * get variable from variable name
|
| */
|
| @Override
|
| public LogicalVariable getVariable(String name) {
|
| return nameToLogicalVariableMap.get(name);
|
| }
|
|
|
| @Override
|
| public LogicalVariable getVariableFromFieldName(String fieldName) {
|
| return this.getVariableOnly(fieldName);
|
| }
|
|
|
| /**
|
| * set the metadata provider
|
| */
|
| @Override
|
| public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) {
|
| this.metaData = metadata;
|
| }
|
|
|
| /**
|
| * insert ReplicateOperator when necessary
|
| */
|
| private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {
|
| Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
|
| buildChildToParentsMapping(roots, childToParentsMap);
|
| for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) {
|
| List<Mutable<ILogicalOperator>> pList = entry.getValue();
|
| if (pList.size() > 1) {
|
| ILogicalOperator rop = new ReplicateOperator(pList.size());
|
| Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
|
| Mutable<ILogicalOperator> childRef = entry.getKey();
|
| rop.getInputs().add(childRef);
|
| for (Mutable<ILogicalOperator> parentRef : pList) {
|
| ILogicalOperator parentOp = parentRef.getValue();
|
| int index = parentOp.getInputs().indexOf(childRef);
|
| parentOp.getInputs().set(index, ropRef);
|
| }
|
| }
|
| }
|
| }
|
|
|
| /**
|
| * build the mapping from child to Parents
|
| *
|
| * @param roots
|
| * @param childToParentsMap
|
| */
|
| private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots,
|
| Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {
|
| for (Mutable<ILogicalOperator> opRef : roots) {
|
| List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs();
|
| for (Mutable<ILogicalOperator> childRef : childRefs) {
|
| List<Mutable<ILogicalOperator>> parentList = map.get(childRef);
|
| if (parentList == null) {
|
| parentList = new ArrayList<Mutable<ILogicalOperator>>();
|
| map.put(childRef, parentList);
|
| }
|
| if (!parentList.contains(opRef))
|
| parentList.add(opRef);
|
| }
|
| buildChildToParentsMapping(childRefs, map);
|
| }
|
| }
|
| }
|