blob: 26ce5fdeb5c3cf7f317fca951a13aa68f988d135 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hivesterix.runtime.operator.filewrite;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.UUID;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@SuppressWarnings("deprecation")
public class HivePushRuntimeFactory implements IPushRuntimeFactory {
private static final long serialVersionUID = 1L;
private final RecordDescriptor inputRecordDesc;
private transient JobConf conf;
private final FileSinkDesc fileSink;
private final RowSchema outSchema;
private final Schema schema;
/**
* the content of the configuration
*/
private String confContent;
public HivePushRuntimeFactory(RecordDescriptor inputRecordDesc, JobConf conf, FileSinkOperator fsp, Schema sch) {
this.inputRecordDesc = inputRecordDesc;
this.conf = conf;
this.fileSink = fsp.getConf();
outSchema = fsp.getSchema();
this.schema = sch;
writeConfContent();
}
@Override
public String toString() {
return "file write";
}
@Override
public IPushRuntime createPushRuntime(IHyracksTaskContext context) throws AlgebricksException {
if (conf == null)
readConfContent();
return new HiveFileWritePushRuntime(context, inputRecordDesc, conf, fileSink, outSchema, schema);
}
private void readConfContent() {
File dir = new File("hadoop-conf-tmp");
if (!dir.exists()) {
dir.mkdir();
}
String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
try {
PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));
out.write(confContent);
out.close();
conf = new JobConf(fileName);
} catch (Exception e) {
e.printStackTrace();
}
}
private void writeConfContent() {
File dir = new File("hadoop-conf-tmp");
if (!dir.exists()) {
dir.mkdir();
}
String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
try {
DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));
conf.writeXml(out);
out.close();
DataInputStream in = new DataInputStream(new FileInputStream(fileName));
StringBuffer buffer = new StringBuffer();
String line;
while ((line = in.readLine()) != null) {
buffer.append(line + "\n");
}
in.close();
confContent = buffer.toString();
} catch (Exception e) {
e.printStackTrace();
}
}
}