blob: b7d5779798768787bf9e13aab25a029daf29260b [file] [log] [blame]
package edu.uci.ics.hivesterix.logical.plan.visitor;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import edu.uci.ics.hivesterix.common.config.ConfUtil;
import edu.uci.ics.hivesterix.logical.plan.HiveOperatorAnnotations;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class GroupByVisitor extends DefaultVisitor {
private List<Mutable<ILogicalExpression>> AlgebricksAggs = new ArrayList<Mutable<ILogicalExpression>>();
private List<IFunctionInfo> localAggs = new ArrayList<IFunctionInfo>();
private boolean isDistinct = false;
private boolean gbyKeyNotRedKey = false;
@Override
public Mutable<ILogicalOperator> visit(GroupByOperator operator,
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException {
// get descriptors
GroupByDesc desc = (GroupByDesc) operator.getConf();
GroupByDesc.Mode mode = desc.getMode();
List<ExprNodeDesc> keys = desc.getKeys();
List<AggregationDesc> aggregators = desc.getAggregators();
Operator child = operator.getChildOperators().get(0);
if (child.getType() == OperatorType.REDUCESINK) {
List<ExprNodeDesc> partKeys = ((ReduceSinkDesc) child.getConf()).getPartitionCols();
if (keys.size() != partKeys.size())
gbyKeyNotRedKey = true;
}
if (mode == GroupByDesc.Mode.PARTIAL1 || mode == GroupByDesc.Mode.HASH || mode == GroupByDesc.Mode.COMPLETE
|| (aggregators.size() == 0 && isDistinct == false) || gbyKeyNotRedKey) {
AlgebricksAggs.clear();
// add an assign operator if the key is not a column expression
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
ILogicalOperator currentOperator = null;
ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables);
if (assignOperator != null) {
currentOperator = assignOperator;
AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
}
// get key variable expression list
List<Mutable<ILogicalExpression>> keyExprs = new ArrayList<Mutable<ILogicalExpression>>();
for (LogicalVariable var : keyVariables) {
keyExprs.add(t.translateScalarFucntion(new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, var
.toString(), "", false)));
}
if (aggregators.size() == 0) {
List<Mutable<ILogicalExpression>> distinctExprs = new ArrayList<Mutable<ILogicalExpression>>();
for (LogicalVariable var : keyVariables) {
Mutable<ILogicalExpression> varExpr = new MutableObject<ILogicalExpression>(
new VariableReferenceExpression(var));
distinctExprs.add(varExpr);
}
t.rewriteOperatorOutputSchema(keyVariables, operator);
isDistinct = true;
ILogicalOperator lop = new DistinctOperator(distinctExprs);
lop.getInputs().add(AlgebricksParentOperatorRef);
return new MutableObject<ILogicalOperator>(lop);
}
// get the pair<LogicalVariable, ILogicalExpression> list
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> keyParameters = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
keyVariables.clear();
for (Mutable<ILogicalExpression> expr : keyExprs) {
LogicalVariable keyVar = t.getVariable(expr.getValue().toString(), TypeInfoFactory.unknownTypeInfo);
keyParameters.add(new Pair(keyVar, expr));
keyVariables.add(keyVar);
}
// get the parameters for the aggregator operator
ArrayList<LogicalVariable> aggVariables = new ArrayList<LogicalVariable>();
ArrayList<Mutable<ILogicalExpression>> aggExprs = new ArrayList<Mutable<ILogicalExpression>>();
// get the type of each aggregation function
HashMap<AggregationDesc, TypeInfo> aggToType = new HashMap<AggregationDesc, TypeInfo>();
List<ColumnInfo> columns = operator.getSchema().getSignature();
int offset = keys.size();
for (int i = offset; i < columns.size(); i++) {
aggToType.put(aggregators.get(i - offset), columns.get(i).getType());
}
localAggs.clear();
// rewrite parameter expressions for all aggregators
for (AggregationDesc aggregator : aggregators) {
for (ExprNodeDesc parameter : aggregator.getParameters()) {
t.rewriteExpression(parameter);
}
Mutable<ILogicalExpression> aggExpr = t.translateAggregation(aggregator);
AbstractFunctionCallExpression localAggExpr = (AbstractFunctionCallExpression) aggExpr.getValue();
localAggs.add(localAggExpr.getFunctionInfo());
AggregationDesc logicalAgg = new AggregationDesc(aggregator.getGenericUDAFName(),
aggregator.getGenericUDAFEvaluator(), aggregator.getParameters(), aggregator.getDistinct(),
Mode.COMPLETE);
Mutable<ILogicalExpression> logicalAggExpr = t.translateAggregation(logicalAgg);
AlgebricksAggs.add(logicalAggExpr);
if (!gbyKeyNotRedKey)
aggExprs.add(logicalAggExpr);
else
aggExprs.add(aggExpr);
aggVariables.add(t.getVariable(aggregator.getExprString() + aggregator.getMode(),
aggToType.get(aggregator)));
}
if (child.getType() != OperatorType.REDUCESINK)
gbyKeyNotRedKey = false;
// get the sub plan list
AggregateOperator aggOperator = new AggregateOperator(aggVariables, aggExprs);
NestedTupleSourceOperator nestedTupleSource = new NestedTupleSourceOperator(
new MutableObject<ILogicalOperator>());
aggOperator.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSource));
List<Mutable<ILogicalOperator>> subRoots = new ArrayList<Mutable<ILogicalOperator>>();
subRoots.add(new MutableObject<ILogicalOperator>(aggOperator));
ILogicalPlan subPlan = new ALogicalPlanImpl(subRoots);
List<ILogicalPlan> subPlans = new ArrayList<ILogicalPlan>();
subPlans.add(subPlan);
// create the group by operator
currentOperator = new edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator(
keyParameters, new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(), subPlans);
currentOperator.getInputs().add(AlgebricksParentOperatorRef);
nestedTupleSource.getDataSourceReference().setValue(currentOperator);
List<LogicalVariable> outputVariables = new ArrayList<LogicalVariable>();
outputVariables.addAll(keyVariables);
outputVariables.addAll(aggVariables);
t.rewriteOperatorOutputSchema(outputVariables, operator);
if (gbyKeyNotRedKey) {
currentOperator.getAnnotations().put(HiveOperatorAnnotations.LOCAL_GROUP_BY, Boolean.TRUE);
}
HiveConf conf = ConfUtil.getHiveConf();
Boolean extGby = conf.getBoolean("hive.algebricks.groupby.external", false);
if (extGby && isSerializable(aggregators)) {
currentOperator.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, Boolean.TRUE);
}
return new MutableObject<ILogicalOperator>(currentOperator);
} else {
isDistinct = false;
// rewrite parameter expressions for all aggregators
int i = 0;
for (AggregationDesc aggregator : aggregators) {
for (ExprNodeDesc parameter : aggregator.getParameters()) {
t.rewriteExpression(parameter);
}
Mutable<ILogicalExpression> agg = t.translateAggregation(aggregator);
AggregateFunctionCallExpression originalAgg = (AggregateFunctionCallExpression) AlgebricksAggs.get(i)
.getValue();
originalAgg.setStepOneAggregate(localAggs.get(i));
AggregateFunctionCallExpression currentAgg = (AggregateFunctionCallExpression) agg.getValue();
if (currentAgg.getFunctionInfo() != null) {
originalAgg.setTwoStep(true);
originalAgg.setStepTwoAggregate(currentAgg.getFunctionInfo());
}
i++;
}
return null;
}
}
@Override
public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator,
Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
Operator downStream = (Operator) operator.getChildOperators().get(0);
if (!(downStream instanceof GroupByOperator)) {
return null;
}
ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
List<ExprNodeDesc> keys = desc.getKeyCols();
List<ExprNodeDesc> values = desc.getValueCols();
// insert assign for keys
ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables);
// insert assign for values
ArrayList<LogicalVariable> valueVariables = new ArrayList<LogicalVariable>();
t.getAssignOperator(AlgebricksParentOperatorRef, values, valueVariables);
ArrayList<LogicalVariable> columns = new ArrayList<LogicalVariable>();
columns.addAll(keyVariables);
columns.addAll(valueVariables);
t.rewriteOperatorOutputSchema(columns, operator);
return null;
}
private boolean isSerializable(List<AggregationDesc> descs) throws AlgebricksException {
try {
for (AggregationDesc desc : descs) {
GenericUDAFEvaluator udaf = desc.getGenericUDAFEvaluator();
AggregationBuffer buf = udaf.getNewAggregationBuffer();
Class<?> bufferClass = buf.getClass();
Field[] fields = bufferClass.getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
String type = field.getType().toString();
if (!(type.equals("int") || type.equals("long") || type.equals("float") || type.equals("double") || type
.equals("boolean"))) {
return false;
}
}
}
return true;
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
}