blob: 69aa881e5ff5115f9c9cf1cb11f991a3eac1ea60 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.operator.filescan;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Properties;
import java.util.UUID;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@SuppressWarnings("deprecation")
public class HiveTupleParserFactory implements ITupleParserFactory {
private static final long serialVersionUID = 1L;
private int[] outputColumns;
private String outputSerDeClass = LazySerDe.class.getName();
private String inputSerDeClass;
private transient JobConf conf;
private Properties tbl;
private String confContent;
private String inputFormatClass;
public HiveTupleParserFactory(PartitionDesc desc, JobConf conf,
int[] outputColumns) {
this.conf = conf;
tbl = desc.getProperties();
inputFormatClass = (String) tbl.getProperty("file.inputformat");
inputSerDeClass = (String) tbl.getProperty("serialization.lib");
this.outputColumns = outputColumns;
writeConfContent();
}
@Override
public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
readConfContent();
try {
return new HiveTupleParser(inputFormatClass, inputSerDeClass,
outputSerDeClass, tbl, conf, ctx, outputColumns);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
private void writeConfContent() {
File dir = new File("hadoop-conf-tmp");
if (!dir.exists()) {
dir.mkdir();
}
String fileName = "hadoop-conf-tmp/" + UUID.randomUUID()
+ System.currentTimeMillis() + ".xml";
try {
DataOutputStream out = new DataOutputStream(new FileOutputStream(
new File(fileName)));
conf.writeXml(out);
out.close();
DataInputStream in = new DataInputStream(new FileInputStream(
fileName));
StringBuffer buffer = new StringBuffer();
String line;
while ((line = in.readLine()) != null) {
buffer.append(line + "\n");
}
in.close();
confContent = buffer.toString();
} catch (Exception e) {
e.printStackTrace();
}
}
private void readConfContent() {
File dir = new File("hadoop-conf-tmp");
if (!dir.exists()) {
dir.mkdir();
}
String fileName = "hadoop-conf-tmp/" + UUID.randomUUID()
+ System.currentTimeMillis() + ".xml";
try {
PrintWriter out = new PrintWriter((new OutputStreamWriter(
new FileOutputStream(new File(fileName)))));
out.write(confContent);
out.close();
conf = new JobConf(fileName);
} catch (Exception e) {
e.printStackTrace();
}
}
}