blob: ce6562383e55ff80c75731da3ba780373e58c9a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Class which handles querying the metadata server using the MetaStoreClient. The list of
* partitions matching the partition filter is fetched from the server and the information is
* serialized and written into the JobContext configuration. The inputInfo is also updated with
* info required in the client process context.
*/
public class InitializeInput {
private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
/**
* Set the input to use for the Job. This queries the metadata server with the specified
* partition predicates, gets the matching partitions, and puts the information in the job
* configuration object.
*
* To ensure a known InputJobInfo state, only the database name, table name, filter, and
* properties are preserved. All other modification from the given InputJobInfo are discarded.
*
* After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
* {code}
* InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
* job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
* {code}
*
* @param job the job object
* @param theirInputJobInfo information on the Input to read
* @throws Exception
*/
public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
InputJobInfo inputJobInfo = InputJobInfo.create(
theirInputJobInfo.getDatabaseName(),
theirInputJobInfo.getTableName(),
theirInputJobInfo.getFilter());
inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
job.getConfiguration().set(
HCatConstants.HCAT_KEY_JOB_INFO,
HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null)));
}
/**
* Returns the given InputJobInfo after populating with data queried from the metadata service.
*/
private static InputJobInfo getInputJobInfo(
Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
HiveMetaStoreClient client = null;
HiveConf hiveConf = null;
try {
if (job != null) {
hiveConf = HCatUtil.getHiveConf(job.getConfiguration());
} else {
hiveConf = new HiveConf(HCatInputFormat.class);
}
client = HCatUtil.getHiveClient(hiveConf);
Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
inputJobInfo.getTableName());
List<PartInfo> partInfoList = new ArrayList<PartInfo>();
inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
if (table.getPartitionKeys().size() != 0) {
//Partitioned table
List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
inputJobInfo.getTableName(),
inputJobInfo.getFilter(),
(short) -1);
// Default to 100,000 partitions if hive.metastore.maxpartition is not defined
int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
if (parts != null && parts.size() > maxPart) {
throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
}
// populate partition info
for (Partition ptn : parts) {
HCatSchema schema = HCatUtil.extractSchema(
new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
ptn.getParameters(), job.getConfiguration(), inputJobInfo);
partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
partInfoList.add(partInfo);
}
} else {
//Non partitioned table
HCatSchema schema = HCatUtil.extractSchema(table);
PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
table.getParameters(), job.getConfiguration(), inputJobInfo);
partInfo.setPartitionValues(new HashMap<String, String>());
partInfoList.add(partInfo);
}
inputJobInfo.setPartitions(partInfoList);
return inputJobInfo;
} finally {
HCatUtil.closeHiveClientQuietly(client);
}
}
private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException {
List<String> values = ptn.getValues();
if (values.size() != table.getPartitionKeys().size()) {
throw new IOException("Partition values in partition inconsistent with table definition, table "
+ table.getTableName() + " has "
+ table.getPartitionKeys().size()
+ " partition keys, partition has " + values.size() + "partition values");
}
Map<String, String> ptnKeyValues = new HashMap<String, String>();
int i = 0;
for (FieldSchema schema : table.getPartitionKeys()) {
// CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
i++;
}
return ptnKeyValues;
}
private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
Map<String, String> parameters, Configuration conf,
InputJobInfo inputJobInfo) throws IOException {
StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
Properties hcatProperties = new Properties();
HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
// copy the properties from storageHandler to jobProperties
Map<String, String> jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
for (String key : parameters.keySet()) {
hcatProperties.put(key, parameters.get(key));
}
// FIXME
// Bloating partinfo with inputJobInfo is not good
return new PartInfo(schema, storageHandler, sd.getLocation(),
hcatProperties, jobProperties, inputJobInfo.getTableInfo());
}
}