blob: bba8a45a3b98bfa94318a95fa1651d500f84fdee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
public abstract class HCatBaseInputFormat
extends InputFormat<WritableComparable, HCatRecord> {
/**
* get the schema for the HCatRecord data returned by HCatInputFormat.
*
* @param context the jobContext
* @throws IllegalArgumentException
*/
private Class<? extends InputFormat> inputFileFormatClass;
// TODO needs to go in InitializeInput? as part of InputJobInfo
public static HCatSchema getOutputSchema(JobContext context)
throws IOException {
String os = context.getConfiguration().get(
HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
if (os == null) {
return getTableSchema(context);
} else {
return (HCatSchema) HCatUtil.deserialize(os);
}
}
/**
* Set the schema for the HCatRecord data returned by HCatInputFormat.
* @param job the job object
* @param hcatSchema the schema to use as the consolidated schema
*/
public static void setOutputSchema(Job job,HCatSchema hcatSchema)
throws IOException {
job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA,
HCatUtil.serialize(hcatSchema));
}
protected static
org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
return (
org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>)
ReflectionUtils.newInstance(inputFormatClass, job);
}
/**
* Logically split the set of input files for the job. Returns the
* underlying InputFormat's splits
* @param jobContext the job context object
* @return the splits, an HCatInputSplit wrapper over the storage
* handler InputSplits
* @throws IOException or InterruptedException
*/
@Override
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
//Get the job info from the configuration,
//throws exception if not initialized
InputJobInfo inputJobInfo;
try {
inputJobInfo = getJobInfo(jobContext);
} catch (Exception e) {
throw new IOException(e);
}
List<InputSplit> splits = new ArrayList<InputSplit>();
List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
if(partitionInfoList == null ) {
//No partitions match the specified partition filter
return splits;
}
HCatStorageHandler storageHandler;
JobConf jobConf;
Configuration conf = jobContext.getConfiguration();
//For each matching partition, call getSplits on the underlying InputFormat
for(PartInfo partitionInfo : partitionInfoList) {
jobConf = HCatUtil.getJobConfFromContext(jobContext);
setInputPath(jobConf, partitionInfo.getLocation());
Map<String,String> jobProperties = partitionInfo.getJobProperties();
HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
for(HCatFieldSchema field:
inputJobInfo.getTableInfo().getDataColumns().getFields())
allCols.append(field);
for(HCatFieldSchema field:
inputJobInfo.getTableInfo().getPartitionColumns().getFields())
allCols.append(field);
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
storageHandler = HCatUtil.getStorageHandler(
jobConf, partitionInfo);
//Get the input format
Class inputFormatClass = storageHandler.getInputFormatClass();
org.apache.hadoop.mapred.InputFormat inputFormat =
getMapRedInputFormat(jobConf, inputFormatClass);
//Call getSplit on the InputFormat, create an
//HCatSplit for each underlying split
//NumSplits is 0 for our purposes
org.apache.hadoop.mapred.InputSplit[] baseSplits =
inputFormat.getSplits(jobConf, 0);
for(org.apache.hadoop.mapred.InputSplit split : baseSplits) {
splits.add(new HCatSplit(
partitionInfo,
split,allCols));
}
}
return splits;
}
/**
* Create the RecordReader for the given InputSplit. Returns the underlying
* RecordReader if the required operations are supported and schema matches
* with HCatTable schema. Returns an HCatRecordReader if operations need to
* be implemented in HCat.
* @param split the split
* @param taskContext the task attempt context
* @return the record reader instance, either an HCatRecordReader(later) or
* the underlying storage handler's RecordReader
* @throws IOException or InterruptedException
*/
@Override
public RecordReader<WritableComparable, HCatRecord>
createRecordReader(InputSplit split,
TaskAttemptContext taskContext) throws IOException, InterruptedException {
HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
PartInfo partitionInfo = hcatSplit.getPartitionInfo();
JobContext jobContext = taskContext;
HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
jobContext.getConfiguration(), partitionInfo);
JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
Map<String, String> jobProperties = partitionInfo.getJobProperties();
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
Map<String,String> valuesNotInDataCols = getColValsNotInDataColumns(
getOutputSchema(jobContext),partitionInfo
);
return new HCatRecordReader(storageHandler, valuesNotInDataCols);
}
/**
* gets values for fields requested by output schema which will not be in the data
*/
private static Map<String,String> getColValsNotInDataColumns(HCatSchema outputSchema,
PartInfo partInfo){
HCatSchema dataSchema = partInfo.getPartitionSchema();
Map<String,String> vals = new HashMap<String,String>();
for (String fieldName : outputSchema.getFieldNames()){
if (dataSchema.getPosition(fieldName) == null){
// this entry of output is not present in the output schema
// so, we first check the table schema to see if it is a part col
if (partInfo.getPartitionValues().containsKey(fieldName)){
vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
} else {
vals.put(fieldName, null);
}
}
}
return vals;
}
/**
* Gets the HCatTable schema for the table specified in the HCatInputFormat.setInput call
* on the specified job context. This information is available only after HCatInputFormat.setInput
* has been called for a JobContext.
* @param context the context
* @return the table schema
* @throws IOException if HCatInputFormat.setInput has not been called
* for the current context
*/
public static HCatSchema getTableSchema(JobContext context)
throws IOException {
InputJobInfo inputJobInfo = getJobInfo(context);
HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
for(HCatFieldSchema field:
inputJobInfo.getTableInfo().getDataColumns().getFields())
allCols.append(field);
for(HCatFieldSchema field:
inputJobInfo.getTableInfo().getPartitionColumns().getFields())
allCols.append(field);
return allCols;
}
/**
* Gets the InputJobInfo object by reading the Configuration and deserializing
* the string. If InputJobInfo is not present in the configuration, throws an
* exception since that means HCatInputFormat.setInput has not been called.
* @param jobContext the job context
* @return the InputJobInfo object
* @throws IOException the exception
*/
private static InputJobInfo getJobInfo(JobContext jobContext)
throws IOException {
String jobString = jobContext.getConfiguration().get(
HCatConstants.HCAT_KEY_JOB_INFO);
if( jobString == null ) {
throw new IOException("job information not found in JobContext."
+ " HCatInputFormat.setInput() not called?");
}
return (InputJobInfo) HCatUtil.deserialize(jobString);
}
private void setInputPath(JobConf jobConf, String location)
throws IOException{
// ideally we should just call FileInputFormat.setInputPaths() here - but
// that won't work since FileInputFormat.setInputPaths() needs
// a Job object instead of a JobContext which we are handed here
int length = location.length();
int curlyOpen = 0;
int pathStart = 0;
boolean globPattern = false;
List<String> pathStrings = new ArrayList<String>();
for (int i=0; i<length; i++) {
char ch = location.charAt(i);
switch(ch) {
case '{' : {
curlyOpen++;
if (!globPattern) {
globPattern = true;
}
break;
}
case '}' : {
curlyOpen--;
if (curlyOpen == 0 && globPattern) {
globPattern = false;
}
break;
}
case ',' : {
if (!globPattern) {
pathStrings.add(location.substring(pathStart, i));
pathStart = i + 1 ;
}
break;
}
}
}
pathStrings.add(location.substring(pathStart, length));
Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
String separator = "";
StringBuilder str = new StringBuilder();
for (Path path : paths) {
FileSystem fs = path.getFileSystem(jobConf);
final String qualifiedPath = path.makeQualified(fs).toString();
str.append(separator)
.append(StringUtils.escapeString(qualifiedPath));
separator = StringUtils.COMMA_STR;
}
jobConf.set("mapred.input.dir", str.toString());
}
}