blob: 71b965249a9cb19824cdd132ac20dde004eb640c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
/** The InputFormat to use to read data from HCat */
public class HCatEximInputFormat extends HCatBaseInputFormat {
/**
* Set the input to use for the Job. This queries the metadata file with
* the specified partition predicates, gets the matching partitions, puts
* the information in the conf object. The inputInfo object is updated with
* information needed in the client context
*
* @param job the job object
* @return two hcat schemas, for the table columns and the partition keys
* @throws IOException
* the exception in communicating with the metadata server
*/
public static List<HCatSchema> setInput(Job job,
String location,
Map<String, String> partitionFilter) throws IOException {
FileSystem fs;
try {
fs = FileSystem.get(new URI(location), job.getConfiguration());
} catch (URISyntaxException e) {
throw new IOException(e);
}
Path fromPath = new Path(location);
Path metadataPath = new Path(fromPath, "_metadata");
try {
Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> tp = EximUtil
.readMetaData(fs, metadataPath);
org.apache.hadoop.hive.metastore.api.Table table = tp.getKey();
InputJobInfo inputInfo = InputJobInfo.create(table.getDbName(), table.getTableName(),null,null,null);
List<FieldSchema> partCols = table.getPartitionKeys();
List<PartInfo> partInfoList = null;
if (partCols.size() > 0) {
List<String> partColNames = new ArrayList<String>(partCols.size());
for (FieldSchema fsc : partCols) {
partColNames.add(fsc.getName());
}
List<Partition> partitions = tp.getValue();
partInfoList = filterPartitions(partitionFilter, partitions, table.getPartitionKeys());
} else {
partInfoList = new ArrayList<PartInfo>(1);
HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getSd().getCols()));
Map<String,String> parameters = table.getParameters();
String inputStorageDriverClass = null;
if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){
inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS);
}else{
throw new IOException("No input storage driver classname found, cannot read partition");
}
Properties hcatProperties = new Properties();
for (String key : parameters.keySet()){
if (key.startsWith(InitializeInput.HCAT_KEY_PREFIX)){
hcatProperties.put(key, parameters.get(key));
}
}
PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass, location + "/data", hcatProperties);
partInfoList.add(partInfo);
}
inputInfo.setPartitions(partInfoList);
inputInfo.setTableInfo(HCatTableInfo.valueOf(table));
job.getConfiguration().set(
HCatConstants.HCAT_KEY_JOB_INFO,
HCatUtil.serialize(inputInfo));
List<HCatSchema> rv = new ArrayList<HCatSchema>(2);
rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols()));
rv.add(HCatSchemaUtils.getHCatSchema(partCols));
return rv;
} catch(SemanticException e) {
throw new IOException(e);
}
}
private static List<PartInfo> filterPartitions(Map<String, String> partitionFilter,
List<Partition> partitions, List<FieldSchema> partCols) throws IOException {
List<PartInfo> partInfos = new LinkedList<PartInfo>();
for (Partition partition : partitions) {
boolean matches = true;
List<String> partVals = partition.getValues();
assert partCols.size() == partVals.size();
Map<String, String> partSpec = EximUtil.makePartSpec(partCols, partVals);
if (partitionFilter != null) {
for (Map.Entry<String, String> constraint : partitionFilter.entrySet()) {
String partVal = partSpec.get(constraint.getKey());
if ((partVal == null) || !partVal.equals(constraint.getValue())) {
matches = false;
break;
}
}
}
if (matches) {
PartInfo partInfo = InitializeInput.extractPartInfo(partition.getSd(),
partition.getParameters());
partInfo.setPartitionValues(partSpec);
partInfos.add(partInfo);
}
}
return partInfos;
}
}