blob: 81b62f61ea5967ba09a80b03cf5ed7213c0fa371 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.compat.MetastoreShim;
import org.apache.thrift.TException;
import org.apache.impala.thrift.TColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Utility methods for interacting with the Hive Metastore.
public class MetaStoreUtil {
private static final Logger LOG = LoggerFactory.getLogger(MetaStoreUtil.class);
// Maximum comment length, e.g., for columns, that can be stored in the HMS.
// This number is a lower bound of the constraint set in the HMS DB schema,
// because the constraint varies among different backing databases, e.g.,
// for Postgres it is 4000, but for most other databases it is 256.
public static final int CREATE_MAX_COMMENT_LENGTH = 256;
// The longest strings Hive accepts for [serde] property keys.
public static final int MAX_PROPERTY_KEY_LENGTH = 256;
// The longest strings Hive accepts for [serde] property values.
public static final int MAX_PROPERTY_VALUE_LENGTH = 4000;
// Maximum owner length. The owner can be user or role.
public static final int MAX_OWNER_LENGTH = 128;
// The default maximum number of partitions to fetch from the Hive metastore in one
// RPC.
private static final short DEFAULT_MAX_PARTITIONS_PER_RPC = 1000;
// The maximum number of partitions to fetch from the metastore in one RPC.
// Read from the 'hive.metastore.batch.retrieve.table.partition.max' Hive configuration
// and defaults to DEFAULT_MAX_PARTITION_BATCH_SIZE if the value is not present in the
// Hive configuration.
private static short maxPartitionsPerRpc_ = DEFAULT_MAX_PARTITIONS_PER_RPC;
// The configuration key that Hive uses to set the null partition key value.
public static final String NULL_PARTITION_KEY_VALUE_CONF_KEY =
// The default value for the above configuration key.
public static final String DEFAULT_NULL_PARTITION_KEY_VALUE =
// The configuration key represents thrift URI for the remote Hive Metastore.
public static final String HIVE_METASTORE_URIS_KEY = "hive.metastore.uris";
// The default value for the above configuration key.
public static final String DEFAULT_HIVE_METASTORE_URIS = "";
public static final String hiveMetastoreUris_;
static {
// Get the value from the Hive configuration, if present.
HiveConf hiveConf = new HiveConf(HdfsTable.class);
String strValue =
if (strValue != null) {
try {
maxPartitionsPerRpc_ = Short.parseShort(strValue);
} catch (NumberFormatException e) {
LOG.error("Error parsing max partition batch size from HiveConfig: ", e);
if (maxPartitionsPerRpc_ <= 0) {
LOG.error(String.format("Invalid value for max partition batch size: %d. Using " +
"default: %d", maxPartitionsPerRpc_, DEFAULT_MAX_PARTITIONS_PER_RPC));
hiveMetastoreUris_ = hiveConf.get(HIVE_METASTORE_URIS_KEY,
* Return the value that Hive is configured to use for NULL partition key values.
public static String getNullPartitionKeyValue(IMetaStoreClient client)
throws ConfigValSecurityException, TException {
return client.getConfigValue(
* Return the value of thrift URI for the remote Hive Metastore.
public static String getHiveMetastoreUris() {
return hiveMetastoreUris_;
* Return the value set for the given config in the metastore.
public static String getMetastoreConfigValue(
IMetaStoreClient client, String config, String defaultVal) throws TException {
return client.getConfigValue(config, defaultVal);
* Fetches all partitions for a table in batches, with each batch containing at most
* 'maxPartsPerRpc' partitions. Returns a List containing all fetched Partitions.
* Will throw a MetaException if existing partitions are dropped while a fetch is in
* progress. To help protect against this, the operation can be retried if there is
* a MetaException by setting the "numRetries" parameter.
* Failures due to thrift exceptions (TExceptions) are not retried because they
* generally mean the connection is broken or has timed out. The HiveClient supports
* configuring retires at the connection level so it can be enabled independently.
public static List<org.apache.hadoop.hive.metastore.api.Partition> fetchAllPartitions(
IMetaStoreClient client, String dbName, String tblName, int numRetries)
throws MetaException, TException {
Preconditions.checkArgument(numRetries >= 0);
int retryAttempt = 0;
while (true) {
try {
// First, get all partition names that currently exist.
List<String> partNames = client.listPartitionNames(dbName, tblName, (short) -1);
return MetaStoreUtil.fetchPartitionsByName(client, partNames, dbName, tblName);
} catch (MetaException e) {
// Only retry for MetaExceptions, since TExceptions could indicate a broken
// connection which we can't recover from by retrying.
if (retryAttempt < numRetries) {
LOG.error(String.format("Error fetching partitions for table: %s.%s. " +
"Retry attempt: %d/%d", dbName, tblName, retryAttempt, numRetries), e);
// TODO: Sleep for a bit?
} else {
throw e;
* Given a List of partition names, fetches the matching Partitions from the HMS
* in batches. Each batch will contain at most 'maxPartsPerRpc' partitions.
* Returns a List containing all fetched Partitions.
* Will throw a MetaException if any partitions in 'partNames' do not exist.
public static List<Partition> fetchPartitionsByName(
IMetaStoreClient client, List<String> partNames, String dbName, String tblName)
throws MetaException, TException {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Fetching %d partitions for: %s.%s using partition " +
"batch size: %d", partNames.size(), dbName, tblName, maxPartitionsPerRpc_));
List<org.apache.hadoop.hive.metastore.api.Partition> fetchedPartitions =
// Fetch the partitions in batches.
for (int i = 0; i < partNames.size(); i += maxPartitionsPerRpc_) {
// Get a subset of partition names to fetch.
List<String> partsToFetch =
partNames.subList(i, Math.min(i + maxPartitionsPerRpc_, partNames.size()));
// Fetch these partitions from the metastore.
client.getPartitionsByNames(dbName, tblName, partsToFetch));
return fetchedPartitions;
* Checks that a given 'property' is short enough for HMS to handle. If not, throws an
* 'AnalysisException' with 'name' as its prefix.
public static void checkShortProperty(String name, String property, int length)
throws AnalysisException {
if (property.length() > length) {
throw new AnalysisException(
name + " length must be <= " + length + ": " + property.length());
* Checks that each key and value in a property map is short enough for HMS to handle.
* If not, An 'AnalysisException' is thrown with 'mapName' as its prefix.
public static void checkShortPropertyMap(
String mapName, Map<String, String> propertyMap) throws AnalysisException {
if (null != propertyMap) {
for (Map.Entry<String, String> property : propertyMap.entrySet()) {
checkShortProperty(mapName + " key", property.getKey(), MAX_PROPERTY_KEY_LENGTH);
mapName + " value", property.getValue(), MAX_PROPERTY_VALUE_LENGTH);
* Does a case-insensitive search for 'propertyKey' in 'propertyMap'. If a match is
* found, the matched key is returned, otherwise null is returned. 'propertyMap' and
* 'propertyKey' must not be null.
public static String findTblPropKeyCaseInsensitive(Map<String, String> propertyMap,
String propertyKey) {
for (String key : propertyMap.keySet()) {
if (key != null && key.equalsIgnoreCase(propertyKey)) return key;
return null;
* Returns a copy of the comma-separated list of values 'inputCsv', with all occurences
* of 'toReplace' replaced with value 'replaceWith'.
public static String replaceValueInCsvList(String input, String toReplace,
String replaceWith) {
Iterable<String> inputList =
List<String> outputList = Lists.newArrayList();
for (String elem : inputList) {
if (elem.equalsIgnoreCase(toReplace)) {
} else {
return Joiner.on(",").join(outputList);
* Returns a copy of the comma-separated list of values 'inputCsv', with all occurences
* of 'toRemove' removed.
public static String removeValueFromCsvList(String inputCsv, String toRemove) {
Iterable<String> inputList =
List<String> outputList = Lists.newArrayList();
for (String elem : inputList) {
if (!elem.equalsIgnoreCase(toRemove)) outputList.add(elem);
return Joiner.on(",").join(outputList);
* Returns the intersection of the comma-separated list of values 'leftCsv' with the
* names of the columns in 'rightCols'. The return value is a comma-separated list.
public static String intersectCsvListWithColumNames(String leftCsv,
List<TColumn> rightCols) {
Iterable<String> leftCols =
HashSet<String> rightColNames = Sets.newHashSet();
for (TColumn c : rightCols) rightColNames.add(c.getColumnName().toLowerCase());
List<String> outputList = Lists.newArrayList();
for (String leftCol : leftCols) {
if (rightColNames.contains(leftCol.toLowerCase())) outputList.add(leftCol);
return Joiner.on(",").join(outputList);
public static List<String> getPartValsFromName(Table msTbl, String partName)
throws MetaException, CatalogException {
LinkedHashMap<String, String> hm = Warehouse.makeSpecFromName(partName);
List<String> partVals = Lists.newArrayList();
for (FieldSchema field: msTbl.getPartitionKeys()) {
String key = field.getName();
String val = hm.get(key);
if (val == null) {
throw new CatalogException("Incomplete partition name - missing " + key);
return partVals;
* Check if the hms table is a bucketed table or not
public static boolean isBucketedTable(Table msTbl) {
return msTbl.getSd().getNumBuckets() > 0;
* A helper class that encapsulates all the information needed to fire and insert event
* with HMS.
public static class InsertEventInfo {
// List of partition values corresponding to the partition keys in
// a partitioned table. This is null for non-partitioned table.
private List<String> partVals;
// Set of all the 'new' files added by this insert. This is empty in
// case of insert overwrite.
private Collection<String> newFiles;
// If true, sets the 'replace' flag to true indicating that the
// operation was an insert overwrite in the notification log. Will set the same to
// false otherwise.
private boolean isOverwrite;
public InsertEventInfo(
List<String> partVals, Collection<String> newFiles, boolean isOverwrite) {
this.partVals = partVals;
this.newFiles = newFiles;
this.isOverwrite = isOverwrite;
public List<String> getPartVals() { return this.partVals; }
public Collection<String> getNewFiles() { return this.newFiles; }
public boolean isOverwrite() { return this.isOverwrite; }