blob: aa1837c244261f9c2037ed8da7139afd51d63619 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hivesterix.logical.plan.visitor;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.UDTFOperator;
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
/**
* The lateral view join operator is used for FROM src LATERAL VIEW udtf()...
* This operator was implemented with the following operator DAG in mind.
* For a query such as
* SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS
* adid
* The top of the operator DAG will look similar to
* [Table Scan] | [Lateral View Forward] / \ [Select](*) [Select](adid_list) | |
* | [UDTF] (explode) \ / [Lateral View Join] | | [Select] (pageid, adid.*) |
* ....
* Rows from the table scan operator are first to a lateral view forward
* operator that just forwards the row and marks the start of a LV. The select
* operator on the left picks all the columns while the select operator on the
* right picks only the columns needed by the UDTF.
* The output of select in the left branch and output of the UDTF in the right
* branch are then sent to the lateral view join (LVJ). In most cases, the UDTF
* will generate > 1 row for every row received from the TS, while the left
* select operator will generate only one. For each row output from the TS, the
* LVJ outputs all possible rows that can be created by joining the row from the
* left select and one of the rows output from the UDTF.
* Additional lateral views can be supported by adding a similar DAG after the
* previous LVJ operator.
*/
@SuppressWarnings("rawtypes")
public class LateralViewJoinVisitor extends DefaultVisitor {
private UDTFDesc udtf;
private List<Mutable<ILogicalOperator>> parents = new ArrayList<Mutable<ILogicalOperator>>();
@Override
public Mutable<ILogicalOperator> visit(LateralViewJoinOperator operator,
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException {
parents.add(AlgebricksParentOperatorRef);
if (operator.getParentOperators().size() > parents.size()) {
return null;
}
ILogicalOperator parentOperator = null;
ILogicalExpression unnestArg = null;
List<LogicalVariable> projectVariables = new ArrayList<LogicalVariable>();
for (Mutable<ILogicalOperator> parentLOpRef : parents) {
VariableUtilities.getLiveVariables(parentLOpRef.getValue(), projectVariables);
}
for (Operator parentOp : operator.getParentOperators()) {
if (parentOp instanceof UDTFOperator) {
int index = operator.getParentOperators().indexOf(parentOp);
List<LogicalVariable> unnestVars = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(parents.get(index).getValue(), unnestVars);
unnestArg = new VariableReferenceExpression(unnestVars.get(0));
parentOperator = parents.get(index).getValue();
}
}
LogicalVariable var = t.getVariable(udtf.toString(), TypeInfoFactory.unknownTypeInfo);
Mutable<ILogicalExpression> unnestExpr = t.translateUnnestFunction(udtf, new MutableObject<ILogicalExpression>(
unnestArg));
ILogicalOperator currentOperator = new UnnestOperator(var, unnestExpr);
List<LogicalVariable> outputVars = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(parents.get(0).getValue(), outputVars);
outputVars.add(var);
ILogicalOperator inputProjectOperator = new ProjectOperator(projectVariables);
currentOperator.getInputs().add(new MutableObject<ILogicalOperator>(inputProjectOperator));
inputProjectOperator.getInputs().addAll(parentOperator.getInputs());
parents.clear();
udtf = null;
List<ColumnInfo> inputSchema = operator.getSchema().getSignature();
rewriteOperatorDesc(outputVars, operator.getConf(), inputSchema, t);
//t.rewriteOperatorOutputSchema(outputVars, operator);
return new MutableObject<ILogicalOperator>(currentOperator);
}
@Override
public Mutable<ILogicalOperator> visit(UDTFOperator operator,
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
Schema currentSchema = t.generateInputSchema(operator.getParentOperators().get(0));
udtf = (UDTFDesc) operator.getConf();
// populate the schema from upstream operator
operator.setSchema(operator.getParentOperators().get(0).getSchema());
List<LogicalVariable> latestOutputSchema = t.getVariablesFromSchema(currentSchema);
t.rewriteOperatorOutputSchema(latestOutputSchema, operator);
return null;
}
private void rewriteOperatorDesc(List<LogicalVariable> variables, LateralViewJoinDesc desc,
List<ColumnInfo> schema, Translator t) {
List<String> outputFieldNames = desc.getOutputInternalColNames();
for (int i = 0; i < variables.size(); i++) {
LogicalVariable var = variables.get(i);
String fieldName = outputFieldNames.get(i);
String tabAlias = findTabAlias(fieldName, schema);
fieldName = tabAlias + "." + fieldName;
if (fieldName.indexOf("$$") < 0) {
//outputFieldNames.set(i, var.toString());
t.updateVariable(fieldName, var);
}
}
}
private String findTabAlias(String fieldName, List<ColumnInfo> schema) {
for (int i = 0; i < schema.size(); i++) {
ColumnInfo column = schema.get(i);
if (column.getInternalName().equals(fieldName)) {
return column.getTabAlias();
}
}
return "null";
}
}