blob: 4bfd070a014035c1c04ab51b9b5dbe769ab562db [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.catalog.events.InFlightEvents;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDbInfoSelector;
import org.apache.impala.thrift.TFunctionBinaryType;
import org.apache.impala.thrift.TFunctionCategory;
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
import org.apache.impala.thrift.TPartialDbInfo;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.PatternMatcher;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Internal representation of db-related metadata. Owned by Catalog instance.
* Not thread safe.
*
* Tables are stored in a map from the table name to the table object. They may
* be loaded 'eagerly' at construction or 'lazily' on first reference.
* Tables are accessed via getTable which may trigger a metadata read in two cases:
* * if the table has never been loaded
* * if the table loading failed on the previous attempt
*
* Native user added functions are persisted to the parameters map of the hive metastore
* db object corresponding to this instance. This map's key is the function signature and
* value is the base64 representation of the thrift serialized function object.
*
*/
public class Db extends CatalogObjectImpl implements FeDb {
private static final Logger LOG = LoggerFactory.getLogger(Db.class);
// TODO: We should have a consistent synchronization model for Db and Table
// Right now, we synchronize functions and thriftDb_ object in-place and do
// not take read lock on catalogVersion. See IMPALA-8366 for details
private final AtomicReference<TDatabase> thriftDb_ = new AtomicReference<>();
public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
// Hive metastore imposes a limit of 4000 bytes on the key and value strings
// in DB parameters map. We need ensure that this limit isn't crossed
// while serializing functions to the metastore.
private static final int HIVE_METASTORE_DB_PARAM_LIMIT_BYTES = 4000;
// Table metadata cache.
private final CatalogObjectCache<Table> tableCache_;
// All of the registered user functions. The key is the user facing name (e.g. "myUdf"),
// and the values are all the overloaded variants (e.g. myUdf(double), myUdf(string))
// This includes both UDFs and UDAs. Updates are made thread safe by synchronizing
// on this map. When a new Db object is initialized, this list is updated with the
// UDF/UDAs already persisted, if any, in the metastore DB. Functions are sorted in a
// canonical order defined by FunctionResolutionOrder.
private final Map<String, List<Function>> functions_;
// If true, this database is an Impala system database.
// (e.g. can't drop it, can't add tables to it, etc).
private boolean isSystemDb_ = false;
// tracks the in-flight metastore events for this db
private final InFlightEvents inFlightEvents_ = new InFlightEvents();
public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
setMetastoreDb(name, msDb);
tableCache_ = new CatalogObjectCache<>();
functions_ = new HashMap<>();
}
public void setIsSystemDb(boolean b) { isSystemDb_ = b; }
/**
* Creates a Db object with no tables based on the given TDatabase thrift struct.
*/
public static Db fromTDatabase(TDatabase db) {
return new Db(db.getDb_name(), db.getMetastore_db());
}
/**
* Updates the hms parameters map by adding the input <k,v> pair.
*/
private void putToHmsParameters(String k, String v) {
org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.get().metastore_db;
Preconditions.checkNotNull(msDb);
Map<String, String> hmsParams = msDb.getParameters();
if (hmsParams == null) hmsParams = new HashMap<>();
hmsParams.put(k,v);
msDb.setParameters(hmsParams);
}
/**
* Updates the hms parameters map by removing the <k,v> pair corresponding to
* input key <k>. Returns true if the parameters map contains a pair <k,v>
* corresponding to input k and it is removed, false otherwise.
*/
private boolean removeFromHmsParameters(String k) {
org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.get().metastore_db;
Preconditions.checkNotNull(msDb);
if (msDb.getParameters() == null) return false;
return msDb.getParameters().remove(k) != null;
}
@Override // FeDb
public boolean isSystemDb() { return isSystemDb_; }
@Override // FeDb
public TDatabase toThrift() { return thriftDb_.get(); }
@Override // FeDb
public String getName() { return thriftDb_.get().getDb_name(); }
@Override
public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.DATABASE; }
/**
* Adds a table to the table cache.
*/
public void addTable(Table table) { tableCache_.add(table); }
/**
* Gets all table names in the table cache.
*/
@Override
public List<String> getAllTableNames() {
return Lists.newArrayList(tableCache_.keySet());
}
/**
* Returns the tables in the cache.
*/
public List<Table> getTables() { return tableCache_.getValues(); }
@Override
public boolean containsTable(String tableName) {
return tableCache_.contains(tableName.toLowerCase());
}
/**
* Returns the Table with the given name if present in the table cache or null if the
* table does not exist in the cache.
*/
@Override // FeTable
public Table getTable(String tblName) { return tableCache_.get(tblName); }
/**
* Returns the Table with the given name if present in the table cache or null if the
* table does not exist in the cache. If the table is unloaded, the result is an
* IncompleteTable.
*/
@Override
public Table getTableIfCached(String tblName) { return getTable(tblName); }
/**
* Removes the table name and any cached metadata from the Table cache.
*/
public Table removeTable(String tableName) {
return tableCache_.remove(tableName.toLowerCase());
}
@Override
public FeKuduTable createKuduCtasTarget(
org.apache.hadoop.hive.metastore.api.Table msTbl,
List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
List<KuduPartitionParam> kuduPartitionParams) {
return KuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
kuduPartitionParams);
}
@Override
public FeFsTable createFsCtasTarget(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws CatalogException {
return HdfsTable.createCtasTarget(this, msTbl);
}
@Override // FeDb
public org.apache.hadoop.hive.metastore.api.Database getMetaStoreDb() {
return thriftDb_.get().getMetastore_db();
}
@Override // FeDb
public int numFunctions() {
synchronized (functions_) {
return functions_.size();
}
}
@Override // FeDb
public boolean containsFunction(String name) {
synchronized (functions_) {
return functions_.get(name) != null;
}
}
/*
* See comment in Catalog.
*/
@Override // FeDb
public Function getFunction(Function desc, Function.CompareMode mode) {
synchronized (functions_) {
List<Function> fns = functions_.get(desc.functionName());
if (fns == null) return null;
return FunctionUtils.resolveFunction(fns, desc, mode);
}
}
public Function getFunction(String signatureString) {
synchronized (functions_) {
for (List<Function> fns: functions_.values()) {
for (Function f: fns) {
if (f.signatureString().equals(signatureString)) return f;
}
}
}
return null;
}
/**
* Adds the user defined function fn to metastore DB params. fn is
* serialized to thrift using TBinaryProtocol and then base64-encoded
* to be compatible with the HMS' representation of params.
*/
private boolean addFunctionToDbParams(Function fn) {
Preconditions.checkState(
fn.getBinaryType() != TFunctionBinaryType.BUILTIN &&
fn.getBinaryType() != TFunctionBinaryType.JAVA);
try {
TSerializer serializer =
new TSerializer(new TCompactProtocol.Factory());
byte[] serializedFn = serializer.serialize(fn.toThrift());
String base64Fn = Base64.getEncoder().encodeToString(serializedFn);
String fnKey = FUNCTION_INDEX_PREFIX + fn.signatureString();
if (base64Fn.length() > HIVE_METASTORE_DB_PARAM_LIMIT_BYTES) {
throw new ImpalaRuntimeException(
"Serialized function size exceeded HMS 4K byte limit");
}
putToHmsParameters(fnKey, base64Fn);
} catch (ImpalaException | TException e) {
LOG.error("Error adding function " + fn.getName() + " to DB params", e);
return false;
}
return true;
}
public boolean addFunction(Function fn) {
// We use the db parameters map to persist native and IR functions.
boolean addToDbParams =
(fn.getBinaryType() == TFunctionBinaryType.NATIVE ||
fn.getBinaryType() == TFunctionBinaryType.IR);
return addFunction(fn, addToDbParams);
}
/**
* Registers the function fn to this database. If addToDbParams is true,
* fn is added to the metastore DB params. Returns false if the function
* fn already exists or when a failure is encountered while adding it to
* the metastore DB params and true otherwise.
*/
public boolean addFunction(Function fn, boolean addToDbParams) {
Preconditions.checkState(fn.dbName().equals(getName()));
synchronized (functions_) {
if (getFunction(fn, Function.CompareMode.IS_INDISTINGUISHABLE) != null) {
return false;
}
List<Function> fns = functions_.get(fn.functionName());
if (fns == null) {
fns = new ArrayList<>();
functions_.put(fn.functionName(), fns);
}
if (addToDbParams && !addFunctionToDbParams(fn)) return false;
fns.add(fn);
Collections.sort(fns, FunctionUtils.FUNCTION_RESOLUTION_ORDER);
return true;
}
}
/**
* See comment in Catalog.
*/
public Function removeFunction(Function desc) {
synchronized (functions_) {
Function fn = getFunction(desc, Function.CompareMode.IS_INDISTINGUISHABLE);
if (fn == null) return null;
List<Function> fns = functions_.get(desc.functionName());
Preconditions.checkNotNull(fns);
fns.remove(fn);
if (fns.isEmpty()) functions_.remove(desc.functionName());
if (fn.getBinaryType() == TFunctionBinaryType.JAVA) return fn;
// Remove the function from the metastore database parameters
String fnKey = FUNCTION_INDEX_PREFIX + fn.signatureString();
boolean removeFn = removeFromHmsParameters(fnKey);
Preconditions.checkState(removeFn);
return fn;
}
}
public void removeAllFunctions() {
synchronized (functions_) {
functions_.clear();
}
}
/**
* Removes a Function with the matching signature string. Returns the removed Function
* if a Function was removed as a result of this call, null otherwise.
* TODO: Move away from using signature strings and instead use Function IDs.
*/
public Function removeFunction(String signatureStr) {
synchronized (functions_) {
Function targetFn = getFunction(signatureStr);
if (targetFn != null) return removeFunction(targetFn);
}
return null;
}
/**
* Add a builtin with the specified name and signatures to this db.
* This defaults to not using a Prepare/Close function.
*/
public void addScalarBuiltin(String fnName, String symbol, boolean userVisible,
boolean varArgs, Type retType, Type ... args) {
addScalarBuiltin(fnName, symbol, userVisible, null, null, varArgs, retType, args);
}
/**
* Add a builtin with the specified name and signatures to this db.
*/
public void addScalarBuiltin(String fnName, String symbol, boolean userVisible,
String prepareFnSymbol, String closeFnSymbol, boolean varArgs, Type retType,
Type ... args) {
Preconditions.checkState(isSystemDb());
addBuiltin(ScalarFunction.createBuiltin(
fnName, Lists.newArrayList(args), varArgs, retType,
symbol, prepareFnSymbol, closeFnSymbol, userVisible));
}
/**
* Adds a builtin to this database. The function must not already exist.
*/
public void addBuiltin(Function fn) {
Preconditions.checkState(isSystemDb());
Preconditions.checkState(fn != null);
Preconditions.checkState(getFunction(fn, Function.CompareMode.IS_IDENTICAL) == null);
addFunction(fn, false);
}
/**
* Returns a map of functionNames to list of (overloaded) functions with that name.
* This is not thread safe so a higher level lock must be taken while iterating
* over the returned functions.
*/
public Map<String, List<Function>> getAllFunctions() {
return functions_;
}
/**
* Returns a list of transient functions in this Db.
*/
protected List<Function> getTransientFunctions() {
List<Function> result = new ArrayList<>();
synchronized (functions_) {
for (String fnKey: functions_.keySet()) {
for (Function fn: functions_.get(fnKey)) {
if (fn.userVisible() && !fn.isPersistent()) {
result.add(fn);
}
}
}
}
return result;
}
/**
* Returns all functions that match the pattern of 'matcher'.
*/
@Override
public List<Function> getFunctions(TFunctionCategory category,
PatternMatcher matcher) {
Preconditions.checkNotNull(matcher);
List<Function> result = new ArrayList<>();
synchronized (functions_) {
for (Map.Entry<String, List<Function>> fns: functions_.entrySet()) {
if (!matcher.matches(fns.getKey())) continue;
for (Function fn: fns.getValue()) {
if ((category == null || Function.categoryMatch(fn, category))
&& fn.userVisible()) {
result.add(fn);
}
}
}
}
return result;
}
/**
* Returns all functions with the given name
*/
@Override // FeDb
public List<Function> getFunctions(String name) {
Preconditions.checkNotNull(name);
synchronized (functions_) {
List<Function> candidates = functions_.get(name);
if (candidates == null) return new ArrayList<>();
return FunctionUtils.getVisibleFunctions(candidates);
}
}
@Override
public List<Function> getFunctions(TFunctionCategory category, String name) {
Preconditions.checkNotNull(category);
Preconditions.checkNotNull(name);
synchronized (functions_) {
List<Function> candidates = functions_.get(name);
if (candidates == null) return new ArrayList<>();
return FunctionUtils.getVisibleFunctionsInCategory(candidates, category);
}
}
@Override
protected void setTCatalogObject(TCatalogObject catalogObject) {
catalogObject.setDb(toThrift());
}
/**
* Get partial information about this DB in order to service CatalogdMetaProvider
* running in a remote impalad.
*/
public TGetPartialCatalogObjectResponse getPartialInfo(
TGetPartialCatalogObjectRequest req) {
TDbInfoSelector selector = Preconditions.checkNotNull(req.db_info_selector,
"no db_info_selector");
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
resp.setObject_version_number(getCatalogVersion());
resp.db_info = new TPartialDbInfo();
if (selector.want_hms_database) {
// TODO(todd): we need to deep-copy here because 'addFunction' other DDLs
// modify the parameter map in place. We need to change those to copy-on-write
// instead to avoid this copy.
resp.db_info.hms_database = getMetaStoreDb().deepCopy();
}
if (selector.want_table_names) {
resp.db_info.table_names = getAllTableNames();
}
if (selector.want_function_names) {
resp.db_info.function_names = ImmutableList.copyOf(functions_.keySet());
}
return resp;
}
/**
* Replaces the metastore db object of this Db with the given Metastore Database object
* @param msDb
*/
public void setMetastoreDb(String name, Database msDb) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(msDb);
// create the TDatabase first before atomically replacing setting it in the thriftDb_
TDatabase tDatabase = new TDatabase(name.toLowerCase());
tDatabase.setMetastore_db(msDb);
thriftDb_.set(tDatabase);
}
/**
* Gets the current list of versions for in-flight events for this database
*/
public List<Long> getVersionsForInflightEvents() {
return inFlightEvents_.getAll();
}
/**
* Removes a given version from the collection of version numbers for in-flight events
* @param versionNumber version number to remove from the collection
* @return true if version was successfully removed, false if didn't exist
*/
public boolean removeFromVersionsForInflightEvents(long versionNumber) {
return inFlightEvents_.remove(versionNumber);
}
/**
* Adds a version number to the collection of versions for in-flight events. If the
* collection is already at the max size defined by
* <code>InflightEvents.MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the
* given version and does not add it
* @param versionNumber version number to add
*/
public void addToVersionsForInflightEvents(long versionNumber) {
if (!inFlightEvents_.add(versionNumber)) {
LOG.warn(String.format("Could not add version %s to the list of in-flight "
+ "events. This could cause unnecessary database %s invalidation when the "
+ "event is processed", versionNumber, getName()));
}
}
@Override // FeDb
public String getOwnerUser() {
org.apache.hadoop.hive.metastore.api.Database db = getMetaStoreDb();
return db == null ? null : db.getOwnerName();
}
}