blob: ef346bc3ae568fb08ea98f238dc70387f11fa49f [file] [log] [blame]
package edu.uci.ics.hivesterix.logical.plan.visitor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@SuppressWarnings("rawtypes")
public class JoinVisitor extends DefaultVisitor {
/**
* reduce sink operator to variables
*/
private HashMap<Operator, List<LogicalVariable>> reduceSinkToKeyVariables = new HashMap<Operator, List<LogicalVariable>>();
/**
* reduce sink operator to variables
*/
private HashMap<Operator, List<String>> reduceSinkToFieldNames = new HashMap<Operator, List<String>>();
/**
* reduce sink operator to variables
*/
private HashMap<Operator, List<TypeInfo>> reduceSinkToTypes = new HashMap<Operator, List<TypeInfo>>();
/**
* map a join operator (in hive) to its parent operators (in hive)
*/
private HashMap<Operator, List<Operator>> operatorToHiveParents = new HashMap<Operator, List<Operator>>();
/**
* map a join operator (in hive) to its parent operators (in asterix)
*/
private HashMap<Operator, List<ILogicalOperator>> operatorToAsterixParents = new HashMap<Operator, List<ILogicalOperator>>();
/**
* the latest traversed reduce sink operator
*/
private Operator latestReduceSink = null;
/**
* the latest generated parent for join
*/
private ILogicalOperator latestAlgebricksOperator = null;
/**
* process a join operator
*/
@Override
public Mutable<ILogicalOperator> visit(JoinOperator operator, Mutable<ILogicalOperator> AlgebricksParentOperator,
Translator t) {
latestAlgebricksOperator = AlgebricksParentOperator.getValue();
translateJoinOperatorPreprocess(operator, t);
List<Operator> parents = operatorToHiveParents.get(operator);
if (parents.size() < operator.getParentOperators().size()) {
return null;
} else {
ILogicalOperator joinOp = translateJoinOperator(operator, AlgebricksParentOperator, t);
// clearStatus();
return new MutableObject<ILogicalOperator>(joinOp);
}
}
private void reorder(Byte[] order, List<ILogicalOperator> parents, List<Operator> hiveParents) {
ILogicalOperator[] lops = new ILogicalOperator[parents.size()];
Operator[] ops = new Operator[hiveParents.size()];
for (Operator op : hiveParents) {
ReduceSinkOperator rop = (ReduceSinkOperator) op;
ReduceSinkDesc rdesc = rop.getConf();
int tag = rdesc.getTag();
int index = -1;
for (int i = 0; i < order.length; i++)
if (order[i] == tag) {
index = i;
break;
}
lops[index] = parents.get(hiveParents.indexOf(op));
ops[index] = op;
}
parents.clear();
hiveParents.clear();
for (int i = 0; i < lops.length; i++) {
parents.add(lops[i]);
hiveParents.add(ops[i]);
}
}
/**
* translate a hive join operator to asterix join operator->assign
* operator->project operator
*
* @param parentOperator
* @param operator
* @return
*/
private ILogicalOperator translateJoinOperator(Operator operator, Mutable<ILogicalOperator> parentOperator,
Translator t) {
JoinDesc joinDesc = (JoinDesc) operator.getConf();
// get the projection expression (already re-written) from each source
// table
Map<Byte, List<ExprNodeDesc>> exprMap = joinDesc.getExprs();
reorder(joinDesc.getTagOrder(), operatorToAsterixParents.get(operator), operatorToHiveParents.get(operator));
// make an reduce join operator
ILogicalOperator currentOperator = generateJoinTree(joinDesc.getCondsList(),
operatorToAsterixParents.get(operator), operatorToHiveParents.get(operator), 0, t);
parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
// add assign and project operator on top of a join
// output variables
ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
Set<Entry<Byte, List<ExprNodeDesc>>> entries = exprMap.entrySet();
Iterator<Entry<Byte, List<ExprNodeDesc>>> iterator = entries.iterator();
while (iterator.hasNext()) {
List<ExprNodeDesc> outputExprs = iterator.next().getValue();
ILogicalOperator assignOperator = t.getAssignOperator(parentOperator, outputExprs, variables);
if (assignOperator != null) {
currentOperator = assignOperator;
parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
}
}
ILogicalOperator po = new ProjectOperator(variables);
po.getInputs().add(parentOperator);
t.rewriteOperatorOutputSchema(variables, operator);
return po;
}
/**
* deal with reduce sink operator for the case of join
*/
@Override
public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator, Mutable<ILogicalOperator> parentOperator,
Translator t) {
Operator downStream = (Operator) operator.getChildOperators().get(0);
if (!(downStream instanceof JoinOperator))
return null;
ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
List<ExprNodeDesc> keys = desc.getKeyCols();
List<ExprNodeDesc> values = desc.getValueCols();
List<ExprNodeDesc> partitionCols = desc.getPartitionCols();
/**
* rewrite key, value, paritioncol expressions
*/
for (ExprNodeDesc key : keys)
t.rewriteExpression(key);
for (ExprNodeDesc value : values)
t.rewriteExpression(value);
for (ExprNodeDesc col : partitionCols)
t.rewriteExpression(col);
ILogicalOperator currentOperator = null;
// add assign operator for keys if necessary
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
ILogicalOperator assignOperator = t.getAssignOperator(parentOperator, keys, keyVariables);
if (assignOperator != null) {
currentOperator = assignOperator;
parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
}
// add assign operator for values if necessary
ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
assignOperator = t.getAssignOperator(parentOperator, values, variables);
if (assignOperator != null) {
currentOperator = assignOperator;
parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
}
// unified schema: key, value
ArrayList<LogicalVariable> unifiedKeyValues = new ArrayList<LogicalVariable>();
unifiedKeyValues.addAll(keyVariables);
for (LogicalVariable value : variables)
if (keyVariables.indexOf(value) < 0)
unifiedKeyValues.add(value);
// insert projection operator, it is a *must*,
// in hive, reduce sink sometimes also do the projection operator's
// task
currentOperator = new ProjectOperator(unifiedKeyValues);
currentOperator.getInputs().add(parentOperator);
parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
reduceSinkToKeyVariables.put(operator, keyVariables);
List<String> fieldNames = new ArrayList<String>();
List<TypeInfo> types = new ArrayList<TypeInfo>();
for (LogicalVariable var : unifiedKeyValues) {
fieldNames.add(var.toString());
types.add(t.getType(var));
}
reduceSinkToFieldNames.put(operator, fieldNames);
reduceSinkToTypes.put(operator, types);
t.rewriteOperatorOutputSchema(variables, operator);
latestAlgebricksOperator = currentOperator;
latestReduceSink = operator;
return new MutableObject<ILogicalOperator>(currentOperator);
}
/**
* partial rewrite a join operator
*
* @param operator
* @param t
*/
private void translateJoinOperatorPreprocess(Operator operator, Translator t) {
JoinDesc desc = (JoinDesc) operator.getConf();
ReduceSinkDesc reduceSinkDesc = (ReduceSinkDesc) latestReduceSink.getConf();
int tag = reduceSinkDesc.getTag();
Map<Byte, List<ExprNodeDesc>> exprMap = desc.getExprs();
List<ExprNodeDesc> exprs = exprMap.get(Byte.valueOf((byte) tag));
for (ExprNodeDesc expr : exprs)
t.rewriteExpression(expr);
List<Operator> parents = operatorToHiveParents.get(operator);
if (parents == null) {
parents = new ArrayList<Operator>();
operatorToHiveParents.put(operator, parents);
}
parents.add(latestReduceSink);
List<ILogicalOperator> asterixParents = operatorToAsterixParents.get(operator);
if (asterixParents == null) {
asterixParents = new ArrayList<ILogicalOperator>();
operatorToAsterixParents.put(operator, asterixParents);
}
asterixParents.add(latestAlgebricksOperator);
}
// generate a join tree from a list of exchange/reducesink operator
// both exchanges and reduce sinks have the same order
private ILogicalOperator generateJoinTree(List<JoinCondDesc> conds, List<ILogicalOperator> exchanges,
List<Operator> reduceSinks, int offset, Translator t) {
// get a list of reduce sink descs (input descs)
int inputSize = reduceSinks.size() - offset;
if (inputSize == 2) {
ILogicalOperator currentRoot;
List<ReduceSinkDesc> reduceSinkDescs = new ArrayList<ReduceSinkDesc>();
for (int i = reduceSinks.size() - 1; i >= offset; i--)
reduceSinkDescs.add((ReduceSinkDesc) reduceSinks.get(i).getConf());
// get the object inspector for the join
List<String> fieldNames = new ArrayList<String>();
List<TypeInfo> types = new ArrayList<TypeInfo>();
for (int i = reduceSinks.size() - 1; i >= offset; i--) {
fieldNames.addAll(reduceSinkToFieldNames.get(reduceSinks.get(i)));
types.addAll(reduceSinkToTypes.get(reduceSinks.get(i)));
}
// get number of equality conjunctions in the final join condition
int size = reduceSinkDescs.get(0).getKeyCols().size();
// make up the join conditon expression
List<ExprNodeDesc> joinConditionChildren = new ArrayList<ExprNodeDesc>();
for (int i = 0; i < size; i++) {
// create a join key pair
List<ExprNodeDesc> keyPair = new ArrayList<ExprNodeDesc>();
for (ReduceSinkDesc sink : reduceSinkDescs) {
keyPair.add(sink.getKeyCols().get(i));
}
// create a hive equal condition
ExprNodeDesc equality = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
new GenericUDFOPEqual(), keyPair);
// add the equal condition to the conjunction list
joinConditionChildren.add(equality);
}
// get final conjunction expression
ExprNodeDesc conjunct = null;
if (joinConditionChildren.size() > 1)
conjunct = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(),
joinConditionChildren);
else if (joinConditionChildren.size() == 1)
conjunct = joinConditionChildren.get(0);
else {
// there is no join equality condition, equal-join
conjunct = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, new Boolean(true));
}
// get an ILogicalExpression from hive's expression
Mutable<ILogicalExpression> expression = t.translateScalarFucntion(conjunct);
Mutable<ILogicalOperator> leftBranch = new MutableObject<ILogicalOperator>(
exchanges.get(exchanges.size() - 1));
Mutable<ILogicalOperator> rightBranch = new MutableObject<ILogicalOperator>(
exchanges.get(exchanges.size() - 2));
// get the join operator
if (conds.get(offset).getType() == JoinDesc.LEFT_OUTER_JOIN) {
currentRoot = new LeftOuterJoinOperator(expression);
Mutable<ILogicalOperator> temp = leftBranch;
leftBranch = rightBranch;
rightBranch = temp;
} else if (conds.get(offset).getType() == JoinDesc.RIGHT_OUTER_JOIN) {
currentRoot = new LeftOuterJoinOperator(expression);
} else
currentRoot = new InnerJoinOperator(expression);
currentRoot.getInputs().add(leftBranch);
currentRoot.getInputs().add(rightBranch);
// rewriteOperatorOutputSchema(variables, operator);
return currentRoot;
} else {
// get the child join operator and insert and one-to-one exchange
ILogicalOperator joinSrcOne = generateJoinTree(conds, exchanges, reduceSinks, offset + 1, t);
// joinSrcOne.addInput(childJoin);
ILogicalOperator currentRoot;
List<ReduceSinkDesc> reduceSinkDescs = new ArrayList<ReduceSinkDesc>();
for (int i = offset; i < offset + 2; i++)
reduceSinkDescs.add((ReduceSinkDesc) reduceSinks.get(i).getConf());
// get the object inspector for the join
List<String> fieldNames = new ArrayList<String>();
List<TypeInfo> types = new ArrayList<TypeInfo>();
for (int i = offset; i < reduceSinks.size(); i++) {
fieldNames.addAll(reduceSinkToFieldNames.get(reduceSinks.get(i)));
types.addAll(reduceSinkToTypes.get(reduceSinks.get(i)));
}
// get number of equality conjunctions in the final join condition
int size = reduceSinkDescs.get(0).getKeyCols().size();
// make up the join condition expression
List<ExprNodeDesc> joinConditionChildren = new ArrayList<ExprNodeDesc>();
for (int i = 0; i < size; i++) {
// create a join key pair
List<ExprNodeDesc> keyPair = new ArrayList<ExprNodeDesc>();
for (ReduceSinkDesc sink : reduceSinkDescs) {
keyPair.add(sink.getKeyCols().get(i));
}
// create a hive equal condition
ExprNodeDesc equality = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
new GenericUDFOPEqual(), keyPair);
// add the equal condition to the conjunction list
joinConditionChildren.add(equality);
}
// get final conjunction expression
ExprNodeDesc conjunct = null;
if (joinConditionChildren.size() > 1)
conjunct = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(),
joinConditionChildren);
else if (joinConditionChildren.size() == 1)
conjunct = joinConditionChildren.get(0);
else {
// there is no join equality condition, full outer join
conjunct = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, new Boolean(true));
}
// get an ILogicalExpression from hive's expression
Mutable<ILogicalExpression> expression = t.translateScalarFucntion(conjunct);
Mutable<ILogicalOperator> leftBranch = new MutableObject<ILogicalOperator>(joinSrcOne);
Mutable<ILogicalOperator> rightBranch = new MutableObject<ILogicalOperator>(exchanges.get(offset));
// get the join operator
if (conds.get(offset).getType() == JoinDesc.LEFT_OUTER_JOIN) {
currentRoot = new LeftOuterJoinOperator(expression);
Mutable<ILogicalOperator> temp = leftBranch;
leftBranch = rightBranch;
rightBranch = temp;
} else if (conds.get(offset).getType() == JoinDesc.RIGHT_OUTER_JOIN) {
currentRoot = new LeftOuterJoinOperator(expression);
} else
currentRoot = new InnerJoinOperator(expression);
// set the inputs from Algebricks join operator
// add the current table
currentRoot.getInputs().add(leftBranch);
currentRoot.getInputs().add(rightBranch);
return currentRoot;
}
}
}