blob: e50037628af4f0c01ed4b4fa1f4db6e9597e8ae4 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.evaluator;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.BytesWritable;
import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class AggregationFunctionEvaluator implements ICopyAggregateFunction {
/**
* the mode of aggregation function
*/
private GenericUDAFEvaluator.Mode mode;
/**
* an array of evaluators
*/
private ExprNodeEvaluator[] evaluators;
/**
* udaf evaluator partial
*/
private GenericUDAFEvaluator udafPartial;
/**
* udaf evaluator complete
*/
private GenericUDAFEvaluator udafComplete;
/**
* cached parameter objects
*/
private Object[] cachedParameters;
/**
* cached row objects
*/
private LazyObject<? extends ObjectInspector> cachedRowObject;
/**
* the output channel
*/
private DataOutput out;
/**
* aggregation buffer
*/
private AggregationBuffer aggBuffer;
/**
* we only use lazy serde to do serialization
*/
private SerDe lazySer;
/**
* the output object inspector for this aggregation function
*/
private ObjectInspector outputInspector;
/**
* the output object inspector for this aggregation function
*/
private ObjectInspector outputInspectorPartial;
/**
* parameter inspectors
*/
private ObjectInspector[] parameterInspectors;
/**
* output make sure the aggregation functio has least object creation
*
* @param desc
* @param oi
* @param output
*/
public AggregationFunctionEvaluator(List<ExprNodeDesc> inputs, List<TypeInfo> inputTypes, String genericUDAFName,
GenericUDAFEvaluator.Mode aggMode, boolean distinct, ObjectInspector oi, DataOutput output,
ExprNodeEvaluator[] evals, ObjectInspector[] pInspectors, Object[] parameterCache, SerDe serde,
LazyObject<? extends ObjectInspector> row, GenericUDAFEvaluator udafunctionPartial,
GenericUDAFEvaluator udafunctionComplete, ObjectInspector outputOi, ObjectInspector outputOiPartial) {
// shared object across threads
this.out = output;
this.mode = aggMode;
this.parameterInspectors = pInspectors;
// thread local objects
this.evaluators = evals;
this.cachedParameters = parameterCache;
this.cachedRowObject = row;
this.lazySer = serde;
this.udafPartial = udafunctionPartial;
this.udafComplete = udafunctionComplete;
this.outputInspector = outputOi;
this.outputInspectorPartial = outputOiPartial;
}
@Override
public void init() throws AlgebricksException {
try {
aggBuffer = udafPartial.getNewAggregationBuffer();
} catch (HiveException e) {
throw new AlgebricksException(e);
}
}
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
readIntoCache(tuple);
processRow();
}
private void processRow() throws AlgebricksException {
try {
// get values by evaluating them
for (int i = 0; i < cachedParameters.length; i++) {
cachedParameters[i] = evaluators[i].evaluate(cachedRowObject);
}
processAggregate();
} catch (HiveException e) {
throw new AlgebricksException(e);
}
}
private void processAggregate() throws HiveException {
/**
* accumulate the aggregation function
*/
switch (mode) {
case PARTIAL1:
case COMPLETE:
udafPartial.iterate(aggBuffer, cachedParameters);
break;
case PARTIAL2:
case FINAL:
if (udafPartial instanceof GenericUDAFCount.GenericUDAFCountEvaluator) {
Object parameter = ((PrimitiveObjectInspector) parameterInspectors[0])
.getPrimitiveWritableObject(cachedParameters[0]);
udafPartial.merge(aggBuffer, parameter);
} else
udafPartial.merge(aggBuffer, cachedParameters[0]);
break;
default:
break;
}
}
/**
* serialize the result
*
* @param result
* the evaluation result
* @throws IOException
* @throws AlgebricksException
*/
private void serializeResult(Object result, ObjectInspector oi) throws IOException, AlgebricksException {
try {
BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, oi);
out.write(outputWritable.getBytes(), 0, outputWritable.getLength());
} catch (SerDeException e) {
throw new AlgebricksException(e);
}
}
/**
* bind the tuple reference to the cached row object
*
* @param r
*/
private void readIntoCache(IFrameTupleReference r) {
cachedRowObject.init(r);
}
@Override
public void finish() throws AlgebricksException {
// aggregator
try {
Object result = null;
result = udafPartial.terminatePartial(aggBuffer);
if (mode == GenericUDAFEvaluator.Mode.COMPLETE || mode == GenericUDAFEvaluator.Mode.FINAL) {
result = udafComplete.terminate(aggBuffer);
serializeResult(result, outputInspector);
} else {
serializeResult(result, outputInspectorPartial);
}
} catch (HiveException e) {
throw new AlgebricksException(e);
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
@Override
public void finishPartial() throws AlgebricksException {
// aggregator.
try {
Object result = null;
// get aggregations
result = udafPartial.terminatePartial(aggBuffer);
serializeResult(result, outputInspectorPartial);
} catch (HiveException e) {
throw new AlgebricksException(e);
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
}