blob: 0e667b4b4da910bf8e2a732b1d530eabc0ac3774 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.compat.MetastoreShim;
import org.apache.thrift.TException;
import org.apache.impala.thrift.TColumn;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility methods for interacting with the Hive Metastore.
*/
public class MetaStoreUtil {
private static final Logger LOG = LoggerFactory.getLogger(MetaStoreUtil.class);
// Maximum comment length, e.g., for columns, that can be stored in the HMS.
// This number is a lower bound of the constraint set in the HMS DB schema,
// because the constraint varies among different backing databases, e.g.,
// for Postgres it is 4000, but for most other databases it is 256.
public static final int CREATE_MAX_COMMENT_LENGTH = 256;
// The longest strings Hive accepts for [serde] property keys.
public static final int MAX_PROPERTY_KEY_LENGTH = 256;
// The longest strings Hive accepts for [serde] property values.
public static final int MAX_PROPERTY_VALUE_LENGTH = 4000;
// Maximum owner length. The owner can be user or role.
// https://github.com/apache/hive/blob/13fbae57321f3525cabb326df702430d61c242f9/standalone-metastore/src/main/resources/package.jdo#L63
public static final int MAX_OWNER_LENGTH = 128;
// The default maximum number of partitions to fetch from the Hive metastore in one
// RPC.
private static final short DEFAULT_MAX_PARTITIONS_PER_RPC = 1000;
// The maximum number of partitions to fetch from the metastore in one RPC.
// Read from the 'hive.metastore.batch.retrieve.table.partition.max' Hive configuration
// and defaults to DEFAULT_MAX_PARTITION_BATCH_SIZE if the value is not present in the
// Hive configuration.
private static short maxPartitionsPerRpc_ = DEFAULT_MAX_PARTITIONS_PER_RPC;
// The configuration key that Hive uses to set the null partition key value.
public static final String NULL_PARTITION_KEY_VALUE_CONF_KEY =
"hive.exec.default.partition.name";
// The default value for the above configuration key.
public static final String DEFAULT_NULL_PARTITION_KEY_VALUE =
"__HIVE_DEFAULT_PARTITION__";
// The configuration key represents thrift URI for the remote Hive Metastore.
public static final String HIVE_METASTORE_URIS_KEY = "hive.metastore.uris";
// The default value for the above configuration key.
public static final String DEFAULT_HIVE_METASTORE_URIS = "";
public static final String hiveMetastoreUris_;
static {
// Get the value from the Hive configuration, if present.
HiveConf hiveConf = new HiveConf(HdfsTable.class);
String strValue =
hiveConf.get(MetastoreShim.metastoreBatchRetrieveObjectsMaxConfigKey());
if (strValue != null) {
try {
maxPartitionsPerRpc_ = Short.parseShort(strValue);
} catch (NumberFormatException e) {
LOG.error("Error parsing max partition batch size from HiveConfig: ", e);
}
}
if (maxPartitionsPerRpc_ <= 0) {
LOG.error(String.format("Invalid value for max partition batch size: %d. Using " +
"default: %d", maxPartitionsPerRpc_, DEFAULT_MAX_PARTITIONS_PER_RPC));
maxPartitionsPerRpc_ = DEFAULT_MAX_PARTITIONS_PER_RPC;
}
hiveMetastoreUris_ = hiveConf.get(HIVE_METASTORE_URIS_KEY,
DEFAULT_HIVE_METASTORE_URIS);
}
/**
* Return the value that Hive is configured to use for NULL partition key values.
*/
public static String getNullPartitionKeyValue(IMetaStoreClient client)
throws ConfigValSecurityException, TException {
return client.getConfigValue(
NULL_PARTITION_KEY_VALUE_CONF_KEY, DEFAULT_NULL_PARTITION_KEY_VALUE);
}
/**
* Return the value of thrift URI for the remote Hive Metastore.
*/
public static String getHiveMetastoreUris() {
return hiveMetastoreUris_;
}
/**
* Return the value set for the given config in the metastore.
*/
public static String getMetastoreConfigValue(
IMetaStoreClient client, String config, String defaultVal) throws TException {
return client.getConfigValue(config, defaultVal);
}
/**
* Fetches all partitions for a table in batches, with each batch containing at most
* 'maxPartsPerRpc' partitions. Returns a List containing all fetched Partitions.
* Will throw a MetaException if existing partitions are dropped while a fetch is in
* progress. To help protect against this, the operation can be retried if there is
* a MetaException by setting the "numRetries" parameter.
* Failures due to thrift exceptions (TExceptions) are not retried because they
* generally mean the connection is broken or has timed out. The HiveClient supports
* configuring retires at the connection level so it can be enabled independently.
*/
public static List<Partition> fetchAllPartitions(IMetaStoreClient client, Table msTbl,
int numRetries) throws TException {
String dbName = msTbl.getDbName();
String tblName = msTbl.getTableName();
Preconditions.checkArgument(numRetries >= 0);
int retryAttempt = 0;
while (true) {
try {
// First, get all partition names that currently exist.
List<String> partNames = client.listPartitionNames(dbName, tblName, (short) -1);
return MetaStoreUtil.fetchPartitionsByName(client, partNames, msTbl);
} catch (MetaException e) {
// Only retry for MetaExceptions, since TExceptions could indicate a broken
// connection which we can't recover from by retrying.
if (retryAttempt < numRetries) {
LOG.error(String.format("Error fetching partitions for table: %s.%s. " +
"Retry attempt: %d/%d", dbName, tblName, retryAttempt, numRetries), e);
++retryAttempt;
// TODO: Sleep for a bit?
} else {
throw e;
}
}
}
}
/**
* Given a List of partition names, fetches the matching Partitions from the HMS
* in batches. Each batch will contain at most 'maxPartsPerRpc' partitions.
* The partition-level schema will be replaced with the table's to reduce memory
* footprint (IMPALA-11812).
* @return a List containing all fetched Partitions.
* @throws MetaException if any partitions in 'partNames' do not exist.
* @throws TException for RPC failures.
*/
public static List<Partition> fetchPartitionsByName(IMetaStoreClient client,
List<String> partNames, Table msTbl) throws TException {
LOG.info("Fetching {} partitions for: {}.{} using partition batch size: {}",
partNames.size(), msTbl.getDbName(), msTbl.getTableName(),
maxPartitionsPerRpc_);
List<org.apache.hadoop.hive.metastore.api.Partition> fetchedPartitions =
Lists.newArrayList();
// Fetch the partitions in batches.
int numDone = 0;
for (int i = 0; i < partNames.size(); i += maxPartitionsPerRpc_) {
// Get a subset of partition names to fetch.
List<String> partsToFetch =
partNames.subList(i, Math.min(i + maxPartitionsPerRpc_, partNames.size()));
// Fetch these partitions from the metastore.
List<Partition> partitions = client.getPartitionsByNames(
msTbl.getDbName(), msTbl.getTableName(), partsToFetch);
replaceSchemaFromTable(partitions, msTbl);
fetchedPartitions.addAll(partitions);
numDone += partitions.size();
LOG.info("Fetched {}/{} partitions for table {}.{}", numDone, partNames.size(),
msTbl.getDbName(), msTbl.getTableName());
}
return fetchedPartitions;
}
/**
* A wrapper for HMS add_partitions API to replace the partition-level schema with
* table's to save memory.
* @return a List containing all added Partitions.
*/
public static List<Partition> addPartitions(IMetaStoreClient client, Table tbl,
List<Partition> partitions, boolean ifNotExists, boolean needResults)
throws TException {
List<Partition> addedPartitions = client.add_partitions(partitions,
ifNotExists, needResults);
replaceSchemaFromTable(addedPartitions, tbl);
return addedPartitions;
}
/**
* Replace the column list in the given partitions with the table level schema to save
* memory for wide tables (IMPALA-11812). Note that Impala never use the partition
* level schema.
*/
public static void replaceSchemaFromTable(List<Partition> partitions, Table msTbl) {
for (Partition p : partitions) {
p.getSd().setCols(msTbl.getSd().getCols());
}
}
/**
* Shallow copy the given StorageDescriptor.
*/
public static StorageDescriptor shallowCopyStorageDescriptor(StorageDescriptor other) {
return new StorageDescriptor(
other.getCols(),
other.getLocation(),
other.getInputFormat(),
other.getOutputFormat(),
other.isCompressed(),
other.getNumBuckets(),
other.getSerdeInfo(),
other.getBucketCols(),
other.getSortCols(),
other.getParameters());
}
/**
* Checks that a given 'property' is short enough for HMS to handle. If not, throws an
* 'AnalysisException' with 'name' as its prefix.
*/
public static void checkShortProperty(String name, String property, int length)
throws AnalysisException {
if (property.length() > length) {
throw new AnalysisException(
name + " length must be <= " + length + ": " + property.length());
}
}
/**
* Checks that each key and value in a property map is short enough for HMS to handle.
* If not, An 'AnalysisException' is thrown with 'mapName' as its prefix.
*/
public static void checkShortPropertyMap(
String mapName, Map<String, String> propertyMap) throws AnalysisException {
if (null != propertyMap) {
for (Map.Entry<String, String> property : propertyMap.entrySet()) {
checkShortProperty(mapName + " key", property.getKey(), MAX_PROPERTY_KEY_LENGTH);
checkShortProperty(
mapName + " value", property.getValue(), MAX_PROPERTY_VALUE_LENGTH);
}
}
}
/**
* Does a case-insensitive search for 'propertyKey' in 'propertyMap'. If a match is
* found, the matched key is returned, otherwise null is returned. 'propertyMap' and
* 'propertyKey' must not be null.
*/
public static String findTblPropKeyCaseInsensitive(Map<String, String> propertyMap,
String propertyKey) {
Preconditions.checkNotNull(propertyMap);
Preconditions.checkNotNull(propertyKey);
for (String key : propertyMap.keySet()) {
if (key != null && key.equalsIgnoreCase(propertyKey)) return key;
}
return null;
}
/**
* Returns a copy of the comma-separated list of values 'inputCsv', with all occurences
* of 'toReplace' replaced with value 'replaceWith'.
*/
public static String replaceValueInCsvList(String input, String toReplace,
String replaceWith) {
Iterable<String> inputList =
Splitter.on(",").trimResults().omitEmptyStrings().split(input);
List<String> outputList = Lists.newArrayList();
for (String elem : inputList) {
if (elem.equalsIgnoreCase(toReplace)) {
outputList.add(replaceWith);
} else {
outputList.add(elem);
}
}
return Joiner.on(",").join(outputList);
}
/**
* Returns a copy of the comma-separated list of values 'inputCsv', with all occurences
* of 'toRemove' removed.
*/
public static String removeValueFromCsvList(String inputCsv, String toRemove) {
Iterable<String> inputList =
Splitter.on(",").trimResults().omitEmptyStrings().split(inputCsv);
List<String> outputList = Lists.newArrayList();
for (String elem : inputList) {
if (!elem.equalsIgnoreCase(toRemove)) outputList.add(elem);
}
return Joiner.on(",").join(outputList);
}
/**
* Returns the intersection of the comma-separated list of values 'leftCsv' with the
* names of the columns in 'rightCols'. The return value is a comma-separated list.
*/
public static String intersectCsvListWithColumNames(String leftCsv,
List<TColumn> rightCols) {
Iterable<String> leftCols =
Splitter.on(",").trimResults().omitEmptyStrings().split(leftCsv);
HashSet<String> rightColNames = Sets.newHashSet();
for (TColumn c : rightCols) rightColNames.add(c.getColumnName().toLowerCase());
List<String> outputList = Lists.newArrayList();
for (String leftCol : leftCols) {
if (rightColNames.contains(leftCol.toLowerCase())) outputList.add(leftCol);
}
return Joiner.on(",").join(outputList);
}
public static List<String> getPartValsFromName(Table msTbl, String partName)
throws MetaException, CatalogException {
Preconditions.checkNotNull(msTbl);
LinkedHashMap<String, String> hm = Warehouse.makeSpecFromName(partName);
List<String> partVals = Lists.newArrayList();
for (FieldSchema field: msTbl.getPartitionKeys()) {
String key = field.getName();
String val = hm.get(key);
if (val == null) {
throw new CatalogException("Incomplete partition name - missing " + key);
}
partVals.add(val);
}
return partVals;
}
/**
* Check if the hms table is a bucketed table or not
*/
public static boolean isBucketedTable(Table msTbl) {
Preconditions.checkNotNull(msTbl);
return msTbl.getSd().getNumBuckets() > 0;
}
public static class TableInsertEventInfo {
// list of partition level insert event info
private final List<InsertEventRequestData> insertEventRequestData_;
// The partition list corresponding to insertEventRequestData, used by Apache Hive 3
private final List<List<String>> insertEventPartVals_;
// transaction id in case this represents a transactional table.
private final long txnId_;
// writeId in case this is for a transactional table.
private final long writeId_;
// true in case this is for transactional table.
private final boolean isTransactional_;
public TableInsertEventInfo(List<InsertEventRequestData> insertEventInfos_,
List<List<String>> insertEventPartVals_, boolean isTransactional, long txnId,
long writeId) {
Preconditions.checkState(insertEventInfos_.size() == insertEventPartVals_.size());
this.insertEventRequestData_ = insertEventInfos_;
this.insertEventPartVals_ = insertEventPartVals_;
this.txnId_ = txnId;
this.writeId_ = writeId;
this.isTransactional_ = isTransactional;
}
public boolean isTransactional() {
return isTransactional_;
}
public List<InsertEventRequestData> getInsertEventReqData() {
return insertEventRequestData_;
}
public List<List<String>> getInsertEventPartVals() {
return insertEventPartVals_;
}
public long getTxnId() {
return txnId_;
}
public long getWriteId() {
return writeId_;
}
}
/**
* Returns true if the min/max stats of 'type' can be stored in HMS (Hive metastore).
*/
public static boolean canStoreMinmaxInHMS(Type type) {
return (type.isIntegerType() || type.isFloatingPointType() || type.isDecimal()
|| type.isDate());
}
}