blob: 2e7866341482448e72e37b2a27fc8c8f4d6f8ee7 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.evaluator;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.udf.generic.Collector;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
import edu.uci.ics.hivesterix.serde.lazy.LazyColumnar;
import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class UDTFFunctionEvaluator implements ICopyUnnestingFunction, Collector {
/**
* udtf function
*/
private UDTFDesc func;
/**
* input object inspector
*/
private ObjectInspector inputInspector;
/**
* output object inspector
*/
private ObjectInspector outputInspector;
/**
* object inspector for udtf
*/
private ObjectInspector[] udtfInputOIs;
/**
* generic udtf
*/
private GenericUDTF udtf;
/**
* data output
*/
private DataOutput out;
/**
* the input row object
*/
private LazyColumnar cachedRowObject;
/**
* cached row object (input)
*/
private Object[] cachedInputObjects;
/**
* serialization/deserialization
*/
private SerDe lazySerDe;
/**
* columns feed into UDTF
*/
private int[] columns;
public UDTFFunctionEvaluator(UDTFDesc desc, Schema schema, int[] cols, DataOutput output) {
this.func = desc;
this.inputInspector = schema.toObjectInspector();
udtf = func.getGenericUDTF();
out = output;
columns = cols;
}
@Override
public void init(IFrameTupleReference tuple) throws AlgebricksException {
cachedInputObjects = new LazyObject[columns.length];
try {
cachedRowObject = (LazyColumnar) LazyFactory.createLazyObject(inputInspector);
outputInspector = udtf.initialize(udtfInputOIs);
} catch (HiveException e) {
throw new AlgebricksException(e);
}
udtf.setCollector(this);
lazySerDe = new LazySerDe();
readIntoCache(tuple);
}
@Override
public boolean step() throws AlgebricksException {
try {
udtf.process(cachedInputObjects);
return true;
} catch (HiveException e) {
throw new AlgebricksException(e);
}
}
/**
* bind the tuple reference to the cached row object
*
* @param r
*/
private void readIntoCache(IFrameTupleReference r) {
cachedRowObject.init(r);
for (int i = 0; i < cachedInputObjects.length; i++) {
cachedInputObjects[i] = cachedRowObject.getField(columns[i]);
}
}
/**
* serialize the result
*
* @param result
* the evaluation result
* @throws IOException
* @throws AlgebricksException
*/
private void serializeResult(Object result) throws SerDeException, IOException {
BytesWritable outputWritable = (BytesWritable) lazySerDe.serialize(result, outputInspector);
out.write(outputWritable.getBytes(), 0, outputWritable.getLength());
}
@Override
public void collect(Object input) throws HiveException {
try {
serializeResult(input);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
throw new HiveException(e);
}
}
}