blob: ad02239279e450ac299ed5ace12131cdfc58f202 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.evaluator;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public abstract class AbstractExpressionEvaluator implements ICopyEvaluator {
private List<ICopyEvaluator> children;
private ExprNodeEvaluator evaluator;
private IDataOutputProvider out;
private ObjectInspector inspector;
/**
* output object inspector
*/
private ObjectInspector outputInspector;
/**
* cached row object
*/
private LazyObject<? extends ObjectInspector> cachedRowObject;
/**
* serializer/derialzer for lazy object
*/
private SerDe lazySer;
/**
* data output
*/
DataOutput dataOutput;
public AbstractExpressionEvaluator(ExprNodeEvaluator hiveEvaluator, ObjectInspector oi, IDataOutputProvider output)
throws AlgebricksException {
evaluator = hiveEvaluator;
out = output;
inspector = oi;
dataOutput = out.getDataOutput();
}
protected ObjectInspector getRowInspector() {
return null;
}
protected IDataOutputProvider getIDataOutputProvider() {
return out;
}
protected ExprNodeEvaluator getHiveEvaluator() {
return evaluator;
}
public ObjectInspector getObjectInspector() {
return inspector;
}
@Override
public void evaluate(IFrameTupleReference r) throws AlgebricksException {
// initialize hive evaluator
try {
if (outputInspector == null)
outputInspector = evaluator.initialize(inspector);
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException(e.getMessage());
}
readIntoCache(r);
try {
Object result = evaluator.evaluate(cachedRowObject);
// if (result == null) {
// result = evaluator.evaluate(cachedRowObject);
//
// // check if result is null
//
// String errorMsg = "serialize null object in \n output " +
// outputInspector.toString() + " \n input "
// + inspector.toString() + "\n ";
// errorMsg += "";
// List<Object> columns = ((StructObjectInspector)
// inspector).getStructFieldsDataAsList(cachedRowObject);
// for (Object column : columns) {
// errorMsg += column.toString() + " ";
// }
// errorMsg += "\n";
// Log.info(errorMsg);
// System.out.println(errorMsg);
// // result = new BooleanWritable(true);
// throw new IllegalStateException(errorMsg);
// }
serializeResult(result);
} catch (HiveException e) {
e.printStackTrace();
throw new AlgebricksException(e.getMessage());
} catch (IOException e) {
e.printStackTrace();
throw new AlgebricksException(e.getMessage());
}
}
/**
* serialize the result
*
* @param result
* the evaluation result
* @throws IOException
* @throws AlgebricksException
*/
private void serializeResult(Object result) throws IOException, AlgebricksException {
if (lazySer == null)
lazySer = new LazySerDe();
try {
BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, outputInspector);
dataOutput.write(outputWritable.getBytes(), 0, outputWritable.getLength());
} catch (SerDeException e) {
throw new AlgebricksException(e);
}
}
/**
* bind the tuple reference to the cached row object
*
* @param r
*/
private void readIntoCache(IFrameTupleReference r) {
if (cachedRowObject == null)
cachedRowObject = (LazyObject<? extends ObjectInspector>) LazyFactory.createLazyObject(inspector);
cachedRowObject.init(r);
}
/**
* set a list of children of this evaluator
*
* @param children
*/
public void setChildren(List<ICopyEvaluator> children) {
this.children = children;
}
public void addChild(ICopyEvaluator child) {
if (children == null)
children = new ArrayList<ICopyEvaluator>();
children.add(child);
}
}