package edu.uci.ics.hivesterix.logical.plan.visitor; | |
import java.lang.reflect.Field; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import org.apache.commons.lang3.mutable.Mutable; | |
import org.apache.commons.lang3.mutable.MutableObject; | |
import org.apache.hadoop.hive.conf.HiveConf; | |
import org.apache.hadoop.hive.ql.exec.ColumnInfo; | |
import org.apache.hadoop.hive.ql.exec.GroupByOperator; | |
import org.apache.hadoop.hive.ql.exec.Operator; | |
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; | |
import org.apache.hadoop.hive.ql.plan.AggregationDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; | |
import org.apache.hadoop.hive.ql.plan.GroupByDesc; | |
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; | |
import org.apache.hadoop.hive.ql.plan.api.OperatorType; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; | |
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; | |
import edu.uci.ics.hivesterix.common.config.ConfUtil; | |
import edu.uci.ics.hivesterix.logical.plan.HiveOperatorAnnotations; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor; | |
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator; | |
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; | |
import edu.uci.ics.hyracks.algebricks.common.utils.Pair; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; | |
@SuppressWarnings({ "rawtypes", "unchecked" }) | |
public class GroupByVisitor extends DefaultVisitor { | |
private List<Mutable<ILogicalExpression>> AlgebricksAggs = new ArrayList<Mutable<ILogicalExpression>>(); | |
private List<IFunctionInfo> localAggs = new ArrayList<IFunctionInfo>(); | |
private boolean isDistinct = false; | |
private boolean gbyKeyNotRedKey = false; | |
@Override | |
public Mutable<ILogicalOperator> visit(GroupByOperator operator, | |
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException { | |
// get descriptors | |
GroupByDesc desc = (GroupByDesc) operator.getConf(); | |
GroupByDesc.Mode mode = desc.getMode(); | |
List<ExprNodeDesc> keys = desc.getKeys(); | |
List<AggregationDesc> aggregators = desc.getAggregators(); | |
Operator child = operator.getChildOperators().get(0); | |
if (child.getType() == OperatorType.REDUCESINK) { | |
List<ExprNodeDesc> partKeys = ((ReduceSinkDesc) child.getConf()).getPartitionCols(); | |
if (keys.size() != partKeys.size()) | |
gbyKeyNotRedKey = true; | |
} | |
if (mode == GroupByDesc.Mode.PARTIAL1 || mode == GroupByDesc.Mode.HASH || mode == GroupByDesc.Mode.COMPLETE | |
|| (aggregators.size() == 0 && isDistinct == false) || gbyKeyNotRedKey) { | |
AlgebricksAggs.clear(); | |
// add an assign operator if the key is not a column expression | |
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>(); | |
ILogicalOperator currentOperator = null; | |
ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables); | |
if (assignOperator != null) { | |
currentOperator = assignOperator; | |
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator); | |
} | |
// get key variable expression list | |
List<Mutable<ILogicalExpression>> keyExprs = new ArrayList<Mutable<ILogicalExpression>>(); | |
for (LogicalVariable var : keyVariables) { | |
keyExprs.add(t.translateScalarFucntion(new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, var | |
.toString(), "", false))); | |
} | |
if (aggregators.size() == 0) { | |
List<Mutable<ILogicalExpression>> distinctExprs = new ArrayList<Mutable<ILogicalExpression>>(); | |
for (LogicalVariable var : keyVariables) { | |
Mutable<ILogicalExpression> varExpr = new MutableObject<ILogicalExpression>( | |
new VariableReferenceExpression(var)); | |
distinctExprs.add(varExpr); | |
} | |
t.rewriteOperatorOutputSchema(keyVariables, operator); | |
isDistinct = true; | |
ILogicalOperator lop = new DistinctOperator(distinctExprs); | |
lop.getInputs().add(AlgebricksParentOperatorRef); | |
return new MutableObject<ILogicalOperator>(lop); | |
} | |
// get the pair<LogicalVariable, ILogicalExpression> list | |
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> keyParameters = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); | |
keyVariables.clear(); | |
for (Mutable<ILogicalExpression> expr : keyExprs) { | |
LogicalVariable keyVar = t.getVariable(expr.getValue().toString(), TypeInfoFactory.unknownTypeInfo); | |
keyParameters.add(new Pair(keyVar, expr)); | |
keyVariables.add(keyVar); | |
} | |
// get the parameters for the aggregator operator | |
ArrayList<LogicalVariable> aggVariables = new ArrayList<LogicalVariable>(); | |
ArrayList<Mutable<ILogicalExpression>> aggExprs = new ArrayList<Mutable<ILogicalExpression>>(); | |
// get the type of each aggregation function | |
HashMap<AggregationDesc, TypeInfo> aggToType = new HashMap<AggregationDesc, TypeInfo>(); | |
List<ColumnInfo> columns = operator.getSchema().getSignature(); | |
int offset = keys.size(); | |
for (int i = offset; i < columns.size(); i++) { | |
aggToType.put(aggregators.get(i - offset), columns.get(i).getType()); | |
} | |
localAggs.clear(); | |
// rewrite parameter expressions for all aggregators | |
for (AggregationDesc aggregator : aggregators) { | |
for (ExprNodeDesc parameter : aggregator.getParameters()) { | |
t.rewriteExpression(parameter); | |
} | |
Mutable<ILogicalExpression> aggExpr = t.translateAggregation(aggregator); | |
AbstractFunctionCallExpression localAggExpr = (AbstractFunctionCallExpression) aggExpr.getValue(); | |
localAggs.add(localAggExpr.getFunctionInfo()); | |
AggregationDesc logicalAgg = new AggregationDesc(aggregator.getGenericUDAFName(), | |
aggregator.getGenericUDAFEvaluator(), aggregator.getParameters(), aggregator.getDistinct(), | |
Mode.COMPLETE); | |
Mutable<ILogicalExpression> logicalAggExpr = t.translateAggregation(logicalAgg); | |
AlgebricksAggs.add(logicalAggExpr); | |
if (!gbyKeyNotRedKey) | |
aggExprs.add(logicalAggExpr); | |
else | |
aggExprs.add(aggExpr); | |
aggVariables.add(t.getVariable(aggregator.getExprString() + aggregator.getMode(), | |
aggToType.get(aggregator))); | |
} | |
if (child.getType() != OperatorType.REDUCESINK) | |
gbyKeyNotRedKey = false; | |
// get the sub plan list | |
AggregateOperator aggOperator = new AggregateOperator(aggVariables, aggExprs); | |
NestedTupleSourceOperator nestedTupleSource = new NestedTupleSourceOperator( | |
new MutableObject<ILogicalOperator>()); | |
aggOperator.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSource)); | |
List<Mutable<ILogicalOperator>> subRoots = new ArrayList<Mutable<ILogicalOperator>>(); | |
subRoots.add(new MutableObject<ILogicalOperator>(aggOperator)); | |
ILogicalPlan subPlan = new ALogicalPlanImpl(subRoots); | |
List<ILogicalPlan> subPlans = new ArrayList<ILogicalPlan>(); | |
subPlans.add(subPlan); | |
// create the group by operator | |
currentOperator = new edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator( | |
keyParameters, new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(), subPlans); | |
currentOperator.getInputs().add(AlgebricksParentOperatorRef); | |
nestedTupleSource.getDataSourceReference().setValue(currentOperator); | |
List<LogicalVariable> outputVariables = new ArrayList<LogicalVariable>(); | |
outputVariables.addAll(keyVariables); | |
outputVariables.addAll(aggVariables); | |
t.rewriteOperatorOutputSchema(outputVariables, operator); | |
if (gbyKeyNotRedKey) { | |
currentOperator.getAnnotations().put(HiveOperatorAnnotations.LOCAL_GROUP_BY, Boolean.TRUE); | |
} | |
HiveConf conf = ConfUtil.getHiveConf(); | |
Boolean extGby = conf.getBoolean("hive.algebricks.groupby.external", false); | |
if (extGby && isSerializable(aggregators)) { | |
currentOperator.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, Boolean.TRUE); | |
} | |
return new MutableObject<ILogicalOperator>(currentOperator); | |
} else { | |
isDistinct = false; | |
// rewrite parameter expressions for all aggregators | |
int i = 0; | |
for (AggregationDesc aggregator : aggregators) { | |
for (ExprNodeDesc parameter : aggregator.getParameters()) { | |
t.rewriteExpression(parameter); | |
} | |
Mutable<ILogicalExpression> agg = t.translateAggregation(aggregator); | |
AggregateFunctionCallExpression originalAgg = (AggregateFunctionCallExpression) AlgebricksAggs.get(i) | |
.getValue(); | |
originalAgg.setStepOneAggregate(localAggs.get(i)); | |
AggregateFunctionCallExpression currentAgg = (AggregateFunctionCallExpression) agg.getValue(); | |
if (currentAgg.getFunctionInfo() != null) { | |
originalAgg.setTwoStep(true); | |
originalAgg.setStepTwoAggregate(currentAgg.getFunctionInfo()); | |
} | |
i++; | |
} | |
return null; | |
} | |
} | |
@Override | |
public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator, | |
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) { | |
Operator downStream = (Operator) operator.getChildOperators().get(0); | |
if (!(downStream instanceof GroupByOperator)) { | |
return null; | |
} | |
ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf(); | |
List<ExprNodeDesc> keys = desc.getKeyCols(); | |
List<ExprNodeDesc> values = desc.getValueCols(); | |
// insert assign for keys | |
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>(); | |
t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables); | |
// insert assign for values | |
ArrayList<LogicalVariable> valueVariables = new ArrayList<LogicalVariable>(); | |
t.getAssignOperator(AlgebricksParentOperatorRef, values, valueVariables); | |
ArrayList<LogicalVariable> columns = new ArrayList<LogicalVariable>(); | |
columns.addAll(keyVariables); | |
columns.addAll(valueVariables); | |
t.rewriteOperatorOutputSchema(columns, operator); | |
return null; | |
} | |
private boolean isSerializable(List<AggregationDesc> descs) throws AlgebricksException { | |
try { | |
for (AggregationDesc desc : descs) { | |
GenericUDAFEvaluator udaf = desc.getGenericUDAFEvaluator(); | |
AggregationBuffer buf = udaf.getNewAggregationBuffer(); | |
Class<?> bufferClass = buf.getClass(); | |
Field[] fields = bufferClass.getDeclaredFields(); | |
for (Field field : fields) { | |
field.setAccessible(true); | |
String type = field.getType().toString(); | |
if (!(type.equals("int") || type.equals("long") || type.equals("float") || type.equals("double") || type | |
.equals("boolean"))) { | |
return false; | |
} | |
} | |
} | |
return true; | |
} catch (Exception e) { | |
throw new AlgebricksException(e); | |
} | |
} | |
} |