package edu.uci.ics.hivesterix.runtime.evaluator; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.util.List; | |
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; | |
import org.apache.hadoop.hive.ql.metadata.HiveException; | |
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; | |
import org.apache.hadoop.hive.serde2.SerDe; | |
import org.apache.hadoop.hive.serde2.SerDeException; | |
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; | |
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; | |
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
import org.apache.hadoop.io.BytesWritable; | |
import edu.uci.ics.hivesterix.serde.lazy.LazyObject; | |
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction; | |
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference; | |
public class AggregationFunctionEvaluator implements ICopyAggregateFunction { | |
/** | |
* the mode of aggregation function | |
*/ | |
private GenericUDAFEvaluator.Mode mode; | |
/** | |
* an array of evaluators | |
*/ | |
private ExprNodeEvaluator[] evaluators; | |
/** | |
* udaf evaluator partial | |
*/ | |
private GenericUDAFEvaluator udafPartial; | |
/** | |
* udaf evaluator complete | |
*/ | |
private GenericUDAFEvaluator udafComplete; | |
/** | |
* cached parameter objects | |
*/ | |
private Object[] cachedParameters; | |
/** | |
* cached row objects | |
*/ | |
private LazyObject<? extends ObjectInspector> cachedRowObject; | |
/** | |
* the output channel | |
*/ | |
private DataOutput out; | |
/** | |
* aggregation buffer | |
*/ | |
private AggregationBuffer aggBuffer; | |
/** | |
* we only use lazy serde to do serialization | |
*/ | |
private SerDe lazySer; | |
/** | |
* the output object inspector for this aggregation function | |
*/ | |
private ObjectInspector outputInspector; | |
/** | |
* the output object inspector for this aggregation function | |
*/ | |
private ObjectInspector outputInspectorPartial; | |
/** | |
* parameter inspectors | |
*/ | |
private ObjectInspector[] parameterInspectors; | |
/** | |
* output make sure the aggregation functio has least object creation | |
* | |
* @param desc | |
* @param oi | |
* @param output | |
*/ | |
public AggregationFunctionEvaluator(List<ExprNodeDesc> inputs, List<TypeInfo> inputTypes, String genericUDAFName, | |
GenericUDAFEvaluator.Mode aggMode, boolean distinct, ObjectInspector oi, DataOutput output, | |
ExprNodeEvaluator[] evals, ObjectInspector[] pInspectors, Object[] parameterCache, SerDe serde, | |
LazyObject<? extends ObjectInspector> row, GenericUDAFEvaluator udafunctionPartial, | |
GenericUDAFEvaluator udafunctionComplete, ObjectInspector outputOi, ObjectInspector outputOiPartial) { | |
// shared object across threads | |
this.out = output; | |
this.mode = aggMode; | |
this.parameterInspectors = pInspectors; | |
// thread local objects | |
this.evaluators = evals; | |
this.cachedParameters = parameterCache; | |
this.cachedRowObject = row; | |
this.lazySer = serde; | |
this.udafPartial = udafunctionPartial; | |
this.udafComplete = udafunctionComplete; | |
this.outputInspector = outputOi; | |
this.outputInspectorPartial = outputOiPartial; | |
} | |
@Override | |
public void init() throws AlgebricksException { | |
try { | |
aggBuffer = udafPartial.getNewAggregationBuffer(); | |
} catch (HiveException e) { | |
throw new AlgebricksException(e); | |
} | |
} | |
@Override | |
public void step(IFrameTupleReference tuple) throws AlgebricksException { | |
readIntoCache(tuple); | |
processRow(); | |
} | |
private void processRow() throws AlgebricksException { | |
try { | |
// get values by evaluating them | |
for (int i = 0; i < cachedParameters.length; i++) { | |
cachedParameters[i] = evaluators[i].evaluate(cachedRowObject); | |
} | |
processAggregate(); | |
} catch (HiveException e) { | |
throw new AlgebricksException(e); | |
} | |
} | |
private void processAggregate() throws HiveException { | |
/** | |
* accumulate the aggregation function | |
*/ | |
switch (mode) { | |
case PARTIAL1: | |
case COMPLETE: | |
udafPartial.iterate(aggBuffer, cachedParameters); | |
break; | |
case PARTIAL2: | |
case FINAL: | |
if (udafPartial instanceof GenericUDAFCount.GenericUDAFCountEvaluator) { | |
Object parameter = ((PrimitiveObjectInspector) parameterInspectors[0]) | |
.getPrimitiveWritableObject(cachedParameters[0]); | |
udafPartial.merge(aggBuffer, parameter); | |
} else | |
udafPartial.merge(aggBuffer, cachedParameters[0]); | |
break; | |
default: | |
break; | |
} | |
} | |
/** | |
* serialize the result | |
* | |
* @param result | |
* the evaluation result | |
* @throws IOException | |
* @throws AlgebricksException | |
*/ | |
private void serializeResult(Object result, ObjectInspector oi) throws IOException, AlgebricksException { | |
try { | |
BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, oi); | |
out.write(outputWritable.getBytes(), 0, outputWritable.getLength()); | |
} catch (SerDeException e) { | |
throw new AlgebricksException(e); | |
} | |
} | |
/** | |
* bind the tuple reference to the cached row object | |
* | |
* @param r | |
*/ | |
private void readIntoCache(IFrameTupleReference r) { | |
cachedRowObject.init(r); | |
} | |
@Override | |
public void finish() throws AlgebricksException { | |
// aggregator | |
try { | |
Object result = null; | |
result = udafPartial.terminatePartial(aggBuffer); | |
if (mode == GenericUDAFEvaluator.Mode.COMPLETE || mode == GenericUDAFEvaluator.Mode.FINAL) { | |
result = udafComplete.terminate(aggBuffer); | |
serializeResult(result, outputInspector); | |
} else { | |
serializeResult(result, outputInspectorPartial); | |
} | |
} catch (HiveException e) { | |
throw new AlgebricksException(e); | |
} catch (IOException e) { | |
throw new AlgebricksException(e); | |
} | |
} | |
@Override | |
public void finishPartial() throws AlgebricksException { | |
// aggregator. | |
try { | |
Object result = null; | |
// get aggregations | |
result = udafPartial.terminatePartial(aggBuffer); | |
serializeResult(result, outputInspectorPartial); | |
} catch (HiveException e) { | |
throw new AlgebricksException(e); | |
} catch (IOException e) { | |
throw new AlgebricksException(e); | |
} | |
} | |
} |