| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hcatalog.mapreduce; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.Partition; |
| import org.apache.hadoop.hive.ql.parse.EximUtil; |
| import org.apache.hadoop.hive.ql.parse.SemanticException; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.common.HCatUtil; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| import org.apache.hcatalog.data.schema.HCatSchemaUtils; |
| |
| /** The InputFormat to use to read data from HCat */ |
| public class HCatEximInputFormat extends HCatBaseInputFormat { |
| |
| /** |
| * Set the input to use for the Job. This queries the metadata file with |
| * the specified partition predicates, gets the matching partitions, puts |
| * the information in the conf object. The inputInfo object is updated with |
| * information needed in the client context |
| * |
| * @param job the job object |
| * @return two hcat schemas, for the table columns and the partition keys |
| * @throws IOException |
| * the exception in communicating with the metadata server |
| */ |
| public static List<HCatSchema> setInput(Job job, |
| String location, |
| Map<String, String> partitionFilter) throws IOException { |
| FileSystem fs; |
| try { |
| fs = FileSystem.get(new URI(location), job.getConfiguration()); |
| } catch (URISyntaxException e) { |
| throw new IOException(e); |
| } |
| Path fromPath = new Path(location); |
| Path metadataPath = new Path(fromPath, "_metadata"); |
| try { |
| Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> tp = EximUtil |
| .readMetaData(fs, metadataPath); |
| org.apache.hadoop.hive.metastore.api.Table table = tp.getKey(); |
| InputJobInfo inputInfo = InputJobInfo.create(table.getDbName(), table.getTableName(),null,null,null); |
| List<FieldSchema> partCols = table.getPartitionKeys(); |
| List<PartInfo> partInfoList = null; |
| if (partCols.size() > 0) { |
| List<String> partColNames = new ArrayList<String>(partCols.size()); |
| for (FieldSchema fsc : partCols) { |
| partColNames.add(fsc.getName()); |
| } |
| List<Partition> partitions = tp.getValue(); |
| partInfoList = filterPartitions(partitionFilter, partitions, table.getPartitionKeys()); |
| } else { |
| partInfoList = new ArrayList<PartInfo>(1); |
| HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getSd().getCols())); |
| Map<String,String> parameters = table.getParameters(); |
| String inputStorageDriverClass = null; |
| if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){ |
| inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS); |
| }else{ |
| throw new IOException("No input storage driver classname found, cannot read partition"); |
| } |
| Properties hcatProperties = new Properties(); |
| for (String key : parameters.keySet()){ |
| if (key.startsWith(InitializeInput.HCAT_KEY_PREFIX)){ |
| hcatProperties.put(key, parameters.get(key)); |
| } |
| } |
| PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass, location + "/data", hcatProperties); |
| partInfoList.add(partInfo); |
| } |
| inputInfo.setPartitions(partInfoList); |
| inputInfo.setTableInfo(HCatTableInfo.valueOf(table)); |
| job.getConfiguration().set( |
| HCatConstants.HCAT_KEY_JOB_INFO, |
| HCatUtil.serialize(inputInfo)); |
| List<HCatSchema> rv = new ArrayList<HCatSchema>(2); |
| rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols())); |
| rv.add(HCatSchemaUtils.getHCatSchema(partCols)); |
| return rv; |
| } catch(SemanticException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private static List<PartInfo> filterPartitions(Map<String, String> partitionFilter, |
| List<Partition> partitions, List<FieldSchema> partCols) throws IOException { |
| List<PartInfo> partInfos = new LinkedList<PartInfo>(); |
| for (Partition partition : partitions) { |
| boolean matches = true; |
| List<String> partVals = partition.getValues(); |
| assert partCols.size() == partVals.size(); |
| Map<String, String> partSpec = EximUtil.makePartSpec(partCols, partVals); |
| if (partitionFilter != null) { |
| for (Map.Entry<String, String> constraint : partitionFilter.entrySet()) { |
| String partVal = partSpec.get(constraint.getKey()); |
| if ((partVal == null) || !partVal.equals(constraint.getValue())) { |
| matches = false; |
| break; |
| } |
| } |
| } |
| if (matches) { |
| PartInfo partInfo = InitializeInput.extractPartInfo(partition.getSd(), |
| partition.getParameters()); |
| partInfo.setPartitionValues(partSpec); |
| partInfos.add(partInfo); |
| } |
| } |
| return partInfos; |
| } |
| } |