blob: 074ff92b410184410b5b0dbc023cc66c90feacf0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.impala.catalog.Function;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.JniUtil;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TFunctionBinaryType;
import org.apache.impala.thrift.TFunctionCategory;
import org.apache.impala.util.PatternMatcher;
* Internal representation of db-related metadata. Owned by Catalog instance.
* Not thread safe.
* Tables are stored in a map from the table name to the table object. They may
* be loaded 'eagerly' at construction or 'lazily' on first reference.
* Tables are accessed via getTable which may trigger a metadata read in two cases:
* * if the table has never been loaded
* * if the table loading failed on the previous attempt
* Native user added functions are persisted to the parameters map of the hive metastore
* db object corresponding to this instance. This map's key is the function signature and
* value is the base64 representation of the thrift serialized function object.
public class Db implements CatalogObject {
private static final Logger LOG = LoggerFactory.getLogger(Db.class);
private final Catalog parentCatalog_;
private final TDatabase thriftDb_;
private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
// Hive metastore imposes a limit of 4000 bytes on the key and value strings
// in DB parameters map. We need ensure that this limit isn't crossed
// while serializing functions to the metastore.
private static final int HIVE_METASTORE_DB_PARAM_LIMIT_BYTES = 4000;
// Table metadata cache.
private final CatalogObjectCache<Table> tableCache_;
// All of the registered user functions. The key is the user facing name (e.g. "myUdf"),
// and the values are all the overloaded variants (e.g. myUdf(double), myUdf(string))
// This includes both UDFs and UDAs. Updates are made thread safe by synchronizing
// on this map. When a new Db object is initialized, this list is updated with the
// UDF/UDAs already persisted, if any, in the metastore DB. Functions are sorted in a
// canonical order defined by FunctionResolutionOrder.
private final HashMap<String, List<Function>> functions_;
// If true, this database is an Impala system database.
// (e.g. can't drop it, can't add tables to it, etc).
private boolean isSystemDb_ = false;
public Db(String name, Catalog catalog,
org.apache.hadoop.hive.metastore.api.Database msDb) {
thriftDb_ = new TDatabase(name.toLowerCase());
parentCatalog_ = catalog;
tableCache_ = new CatalogObjectCache<Table>();
functions_ = new HashMap<String, List<Function>>();
public void setIsSystemDb(boolean b) { isSystemDb_ = b; }
* Creates a Db object with no tables based on the given TDatabase thrift struct.
public static Db fromTDatabase(TDatabase db, Catalog parentCatalog) {
return new Db(db.getDb_name(), parentCatalog, db.getMetastore_db());
* Updates the hms parameters map by adding the input <k,v> pair.
private void putToHmsParameters(String k, String v) {
org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.metastore_db;
Map<String, String> hmsParams = msDb.getParameters();
if (hmsParams == null) hmsParams = Maps.newHashMap();
* Updates the hms parameters map by removing the <k,v> pair corresponding to
* input key <k>. Returns true if the parameters map contains a pair <k,v>
* corresponding to input k and it is removed, false otherwise.
private boolean removeFromHmsParameters(String k) {
org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.metastore_db;
if (msDb.getParameters() == null) return false;
return msDb.getParameters().remove(k) != null;
public boolean isSystemDb() { return isSystemDb_; }
public TDatabase toThrift() { return thriftDb_; }
public String getName() { return thriftDb_.getDb_name(); }
public TCatalogObjectType getCatalogObjectType() {
return TCatalogObjectType.DATABASE;
* Adds a table to the table cache.
public void addTable(Table table) {
* Gets all table names in the table cache.
public List<String> getAllTableNames() {
return Lists.newArrayList(tableCache_.keySet());
* Returns the tables in the cache.
public List<Table> getTables() { return tableCache_.getValues(); }
public boolean containsTable(String tableName) {
return tableCache_.contains(tableName.toLowerCase());
* Returns the Table with the given name if present in the table cache or null if the
* table does not exist in the cache.
public Table getTable(String tblName) {
return tableCache_.get(tblName);
* Removes the table name and any cached metadata from the Table cache.
public Table removeTable(String tableName) {
return tableCache_.remove(tableName.toLowerCase());
* Comparator that sorts function overloads. We want overloads to be always considered
* in a canonical order so that overload resolution in the case of multiple valid
* overloads does not depend on the order in which functions are added to the Db. The
* order is based on the PrimitiveType enum because this was the order used implicitly
* for builtin operators and functions in earlier versions of Impala.
private static class FunctionResolutionOrder implements Comparator<Function> {
public int compare(Function f1, Function f2) {
int numSharedArgs = Math.min(f1.getNumArgs(), f2.getNumArgs());
for (int i = 0; i < numSharedArgs; ++i) {
int cmp = typeCompare(f1.getArgs()[i], f2.getArgs()[i]);
if (cmp < 0) {
return -1;
} else if (cmp > 0) {
return 1;
// Put alternative with fewer args first.
if (f1.getNumArgs() < f2.getNumArgs()) {
return -1;
} else if (f1.getNumArgs() > f2.getNumArgs()) {
return 1;
return 0;
private int typeCompare(Type t1, Type t2) {
private static final FunctionResolutionOrder FUNCTION_RESOLUTION_ORDER =
new FunctionResolutionOrder();
* Returns the metastore.api.Database object this Database was created from.
* Returns null if it is not related to a hive database such as builtins_db.
public org.apache.hadoop.hive.metastore.api.Database getMetaStoreDb() {
return thriftDb_.getMetastore_db();
* Returns the number of functions in this database.
public int numFunctions() {
synchronized (functions_) {
return functions_.size();
* See comment in Catalog.
public boolean containsFunction(String name) {
synchronized (functions_) {
return functions_.get(name) != null;
* See comment in Catalog.
public Function getFunction(Function desc, Function.CompareMode mode) {
synchronized (functions_) {
List<Function> fns = functions_.get(desc.functionName());
if (fns == null) return null;
// First check for identical
for (Function f: fns) {
if (, Function.CompareMode.IS_IDENTICAL)) return f;
if (mode == Function.CompareMode.IS_IDENTICAL) return null;
// Next check for indistinguishable
for (Function f: fns) {
if (, Function.CompareMode.IS_INDISTINGUISHABLE)) return f;
if (mode == Function.CompareMode.IS_INDISTINGUISHABLE) return null;
// Next check for strict supertypes
for (Function f: fns) {
if (, Function.CompareMode.IS_SUPERTYPE_OF)) return f;
if (mode == Function.CompareMode.IS_SUPERTYPE_OF) return null;
// Finally check for non-strict supertypes
for (Function f: fns) {
if (, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF)) return f;
return null;
public Function getFunction(String signatureString) {
synchronized (functions_) {
for (List<Function> fns: functions_.values()) {
for (Function f: fns) {
if (f.signatureString().equals(signatureString)) return f;
return null;
* Adds the user defined function fn to metastore DB params. fn is
* serialized to thrift using TBinaryProtocol and then base64-encoded
* to be compatible with the HMS' representation of params.
private boolean addFunctionToDbParams(Function fn) {
fn.getBinaryType() != TFunctionBinaryType.BUILTIN &&
fn.getBinaryType() != TFunctionBinaryType.JAVA);
try {
TSerializer serializer =
new TSerializer(new TCompactProtocol.Factory());
byte[] serializedFn = serializer.serialize(fn.toThrift());
String base64Fn = Base64.encodeBase64String(serializedFn);
String fnKey = FUNCTION_INDEX_PREFIX + fn.signatureString();
if (base64Fn.length() > HIVE_METASTORE_DB_PARAM_LIMIT_BYTES) {
throw new ImpalaRuntimeException(
"Serialized function size exceeded HMS 4K byte limit");
putToHmsParameters(fnKey, base64Fn);
} catch (ImpalaException | TException e) {
LOG.error("Error adding function " + fn.getName() + " to DB params", e);
return false;
return true;
public boolean addFunction(Function fn) {
// We use the db parameters map to persist native and IR functions.
boolean addToDbParams =
(fn.getBinaryType() == TFunctionBinaryType.NATIVE ||
fn.getBinaryType() == TFunctionBinaryType.IR);
return addFunction(fn, addToDbParams);
* Registers the function fn to this database. If addToDbParams is true,
* fn is added to the metastore DB params. Returns false if the function
* fn already exists or when a failure is encountered while adding it to
* the metastore DB params and true otherwise.
public boolean addFunction(Function fn, boolean addToDbParams) {
synchronized (functions_) {
if (getFunction(fn, Function.CompareMode.IS_INDISTINGUISHABLE) != null) {
return false;
List<Function> fns = functions_.get(fn.functionName());
if (fns == null) {
fns = Lists.newArrayList();
functions_.put(fn.functionName(), fns);
if (addToDbParams && !addFunctionToDbParams(fn)) return false;
Collections.sort(fns, FUNCTION_RESOLUTION_ORDER);
return true;
* See comment in Catalog.
public Function removeFunction(Function desc) {
synchronized (functions_) {
Function fn = getFunction(desc, Function.CompareMode.IS_INDISTINGUISHABLE);
if (fn == null) return null;
List<Function> fns = functions_.get(desc.functionName());
if (fns.isEmpty()) functions_.remove(desc.functionName());
if (fn.getBinaryType() == TFunctionBinaryType.JAVA) return fn;
// Remove the function from the metastore database parameters
String fnKey = FUNCTION_INDEX_PREFIX + fn.signatureString();
boolean removeFn = removeFromHmsParameters(fnKey);
return fn;
public void removeAllFunctions() {
synchronized (functions_) {
* Removes a Function with the matching signature string. Returns the removed Function
* if a Function was removed as a result of this call, null otherwise.
* TODO: Move away from using signature strings and instead use Function IDs.
public Function removeFunction(String signatureStr) {
synchronized (functions_) {
Function targetFn = getFunction(signatureStr);
if (targetFn != null) return removeFunction(targetFn);
return null;
* Add a builtin with the specified name and signatures to this db.
* This defaults to not using a Prepare/Close function.
public void addScalarBuiltin(String fnName, String symbol, boolean userVisible,
boolean varArgs, Type retType, Type ... args) {
addScalarBuiltin(fnName, symbol, userVisible, null, null, varArgs, retType, args);
* Add a builtin with the specified name and signatures to this db.
public void addScalarBuiltin(String fnName, String symbol, boolean userVisible,
String prepareFnSymbol, String closeFnSymbol, boolean varArgs, Type retType,
Type ... args) {
fnName, Lists.newArrayList(args), varArgs, retType,
symbol, prepareFnSymbol, closeFnSymbol, userVisible));
* Adds a builtin to this database. The function must not already exist.
public void addBuiltin(Function fn) {
Preconditions.checkState(fn != null);
Preconditions.checkState(getFunction(fn, Function.CompareMode.IS_IDENTICAL) == null);
addFunction(fn, false);
* Returns a map of functionNames to list of (overloaded) functions with that name.
* This is not thread safe so a higher level lock must be taken while iterating
* over the returned functions.
protected HashMap<String, List<Function>> getAllFunctions() {
return functions_;
* Returns a list of transient functions in this Db.
protected List<Function> getTransientFunctions() {
List<Function> result = Lists.newArrayList();
synchronized (functions_) {
for (String fnKey: functions_.keySet()) {
for (Function fn: functions_.get(fnKey)) {
if (fn.userVisible() && !fn.isPersistent()) {
return result;
* Returns all functions that match the pattern of 'matcher'.
public List<Function> getFunctions(TFunctionCategory category,
PatternMatcher matcher) {
List<Function> result = Lists.newArrayList();
synchronized (functions_) {
for (Map.Entry<String, List<Function>> fns: functions_.entrySet()) {
if (!matcher.matches(fns.getKey())) continue;
for (Function fn: fns.getValue()) {
if ((category == null || Function.categoryMatch(fn, category))
&& fn.userVisible()) {
return result;
* Returns all functions with the given name
public List<Function> getFunctions(String name) {
List<Function> result = Lists.newArrayList();
synchronized (functions_) {
if (!functions_.containsKey(name)) return result;
for (Function fn: functions_.get(name)) {
if (fn.userVisible()) result.add(fn);
return result;
* Returns all functions with the given name and category.
public List<Function> getFunctions(TFunctionCategory category, String name) {
List<Function> result = Lists.newArrayList();
synchronized (functions_) {
if (!functions_.containsKey(name)) return result;
for (Function fn: functions_.get(name)) {
if (fn.userVisible() && Function.categoryMatch(fn, category)) {
return result;
public long getCatalogVersion() { return catalogVersion_; }
public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
public boolean isLoaded() { return true; }