blob: 5e4e21e07e25bbb5f892fff92739345ee716b845 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.jobgen;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hivesterix.common.config.ConfUtil;
import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveKeyValueParserFactory;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class HiveScanRuntimeGenerator {
private PartitionDesc fileDesc;
private transient Path filePath;
private String filePathName;
private Properties properties;
public HiveScanRuntimeGenerator(PartitionDesc path) {
fileDesc = path;
properties = fileDesc.getProperties();
String inputPath = (String) properties.getProperty("location");
if (inputPath.startsWith("file:")) {
// Windows
String[] strs = inputPath.split(":");
filePathName = strs[strs.length - 1];
} else {
// Linux
filePathName = inputPath;
}
filePath = new Path(filePathName);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRuntimeOperatorAndConstraint(
IDataSource dataSource, List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables,
boolean projectPushed, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
try {
// get the correct delimiter from Hive metastore or other data
// structures
IOperatorSchema propagatedSchema = new HiveOperatorSchema();
List<LogicalVariable> outputVariables = projectPushed ? projectVariables : scanVariables;
for (LogicalVariable var : outputVariables)
propagatedSchema.addVariable(var);
int[] outputColumnsOffset = new int[scanVariables.size()];
int i = 0;
for (LogicalVariable var : scanVariables)
if (outputVariables.contains(var)) {
int offset = outputVariables.indexOf(var);
outputColumnsOffset[i++] = offset;
} else
outputColumnsOffset[i++] = -1;
Object[] schemaTypes = dataSource.getSchemaTypes();
// get record descriptor
RecordDescriptor recDescriptor = mkRecordDescriptor(propagatedSchema, schemaTypes, context);
// setup the run time operator and constraints
JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);
String[] locConstraints = ConfUtil.getNCs();
Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();
ClusterTopology topology = ConfUtil.getClusterTopology();
Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);
InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);
String[] schedule = scheduler.getLocationConstraints(splits);
IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,
schedule, new HiveKeyValueParserFactory(fileDesc, conf, outputColumnsOffset));
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner,
new AlgebricksAbsolutePartitionConstraint(locConstraints));
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
private static RecordDescriptor mkRecordDescriptor(IOperatorSchema opSchema, Object[] types, JobGenContext context)
throws AlgebricksException {
ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
int size = opSchema.getSize();
for (int i = 0; i < size; i++) {
Object t = types[i];
fields[i] = sdp.getSerializerDeserializer(t);
i++;
}
return new RecordDescriptor(fields);
}
}