blob: 7681bd107aebf5ccf96553934ca05c27fea1bc23 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.operator.filescan;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hivesterix.serde.parser.IHiveParser;
import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
public class HiveTupleParser extends AbstractHiveTupleParser {
private int[] outputColumnsOffset;
/**
* class of input format
*/
private InputFormat inputFormat;
/**
* serialization/deserialization object
*/
private SerDe serDe;
/**
* the input row object inspector
*/
private ObjectInspector objectInspector;
/**
* the hadoop job conf
*/
private JobConf job;
/**
* Hyrax context to control resource allocation
*/
private final IHyracksTaskContext ctx;
/**
* lazy serde: format flow in between operators
*/
private final SerDe outputSerDe;
/**
* the parser from hive data to binary data
*/
private IHiveParser parser = null;
/**
* parser for any hive input format
*
* @param inputFormatClass
* @param serDeClass
* @param tbl
* @param conf
* @throws AlgebricksException
*/
public HiveTupleParser(String inputFormatClass, String serDeClass,
String outputSerDeClass, Properties tbl, JobConf conf,
final IHyracksTaskContext ctx, int[] outputColumnsOffset)
throws AlgebricksException {
try {
conf.setClassLoader(this.getClass().getClassLoader());
inputFormat = (InputFormat) ReflectionUtils.newInstance(
Class.forName(inputFormatClass), conf);
job = conf;
// initialize the input serde
serDe = (SerDe) ReflectionUtils.newInstance(
Class.forName(serDeClass), job);
serDe.initialize(job, tbl);
// initialize the output serde
outputSerDe = (SerDe) ReflectionUtils.newInstance(
Class.forName(outputSerDeClass), job);
outputSerDe.initialize(job, tbl);
// object inspector of the row
objectInspector = serDe.getObjectInspector();
// hyracks context
this.ctx = ctx;
this.outputColumnsOffset = outputColumnsOffset;
if (objectInspector instanceof LazySimpleStructObjectInspector) {
LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) objectInspector;
List<? extends StructField> fieldRefs = rowInspector
.getAllStructFieldRefs();
boolean lightWeightParsable = true;
for (StructField fieldRef : fieldRefs) {
Category category = fieldRef.getFieldObjectInspector()
.getCategory();
if (!(category == Category.PRIMITIVE)) {
lightWeightParsable = false;
break;
}
}
if (lightWeightParsable)
parser = new TextToBinaryTupleParser(
this.outputColumnsOffset, this.objectInspector);
}
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
/**
* parse a input HDFS file split, the result is send to the writer
* one-frame-a-time
*
* @param split
* the HDFS file split
* @param writer
* the writer
* @throws HyracksDataException
* if there is sth. wrong in the ser/de
*/
@Override
public void parse(FileSplit split, IFrameWriter writer)
throws HyracksDataException {
try {
StructObjectInspector structInspector = (StructObjectInspector) objectInspector;
// create the reader, key, and value
RecordReader reader = inputFormat.getRecordReader(split, job,
Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// allocate a new frame
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(
ctx.getFrameSize());
appender.reset(frame, true);
List<? extends StructField> fieldRefs = structInspector
.getAllStructFieldRefs();
int size = 0;
for (int i = 0; i < outputColumnsOffset.length; i++)
if (outputColumnsOffset[i] >= 0)
size++;
ArrayTupleBuilder tb = new ArrayTupleBuilder(size);
DataOutput dos = tb.getDataOutput();
StructField[] outputFieldRefs = new StructField[size];
Object[] outputFields = new Object[size];
for (int i = 0; i < outputColumnsOffset.length; i++)
if (outputColumnsOffset[i] >= 0)
outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
while (reader.next(key, value)) {
// reuse the tuple builder
tb.reset();
if (parser != null) {
Text text = (Text) value;
parser.parse(text.getBytes(), 0, text.getLength(), tb);
} else {
Object row = serDe.deserialize((Writable) value);
// write fields to the tuple builder one by one
int i = 0;
for (StructField fieldRef : fieldRefs) {
if (outputColumnsOffset[i] >= 0)
outputFields[outputColumnsOffset[i]] = structInspector
.getStructFieldData(row, fieldRef);
i++;
}
i = 0;
for (Object field : outputFields) {
BytesWritable fieldWritable = (BytesWritable) outputSerDe
.serialize(field, outputFieldRefs[i]
.getFieldObjectInspector());
dos.write(fieldWritable.getBytes(), 0,
fieldWritable.getSize());
tb.addFieldEndOffset();
i++;
}
}
if (!appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize())) {
if (appender.getTupleCount() <= 0)
throw new IllegalStateException(
"zero tuples in a frame!");
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
}
}
reader.close();
System.gc();
// flush the last frame
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(frame, writer);
}
} catch (IOException e) {
throw new HyracksDataException(e);
} catch (SerDeException e) {
throw new HyracksDataException(e);
}
}
}