blob: 994aa376ddcaad93a28d6743bad77a073c268303 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.PatternMatcher;
import org.apache.log4j.Logger;
/**
* Thread safe interface for reading and updating metadata stored in the Hive MetaStore.
* This class provides a storage API for caching CatalogObjects: databases, tables,
* and functions and the relevant metadata to go along with them. Although this class is
* thread safe, it does not guarantee consistency with the MetaStore. It is important
* to keep in mind that there may be external (potentially conflicting) concurrent
* metastore updates occurring at any time.
* The CatalogObject storage hierarchy is:
* Catalog -> Db -> Table
* -> Function
* Each level has its own synchronization, so the cache of Dbs is synchronized and each
* Db has a cache of tables which is synchronized independently.
*
* The catalog is populated with the impala builtins on startup. Builtins and user
* functions are treated identically by the catalog. The builtins go in a specific
* database that the user cannot modify.
* Builtins are populated on startup in initBuiltins().
*/
public abstract class Catalog {
// Initial catalog version.
public final static long INITIAL_CATALOG_VERSION = 0L;
public static final String DEFAULT_DB = "default";
public static final String BUILTINS_DB = "_impala_builtins";
protected final MetaStoreClientPool metaStoreClientPool_ =
new MetaStoreClientPool(0, 0);
// Cache of authorization policy metadata. Populated from data retried from the
// Sentry Service, if configured.
protected AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
// Thread safe cache of database metadata. Uses an AtomicReference so reset()
// operations can atomically swap dbCache_ references.
// TODO: Update this to use a CatalogObjectCache?
protected AtomicReference<ConcurrentHashMap<String, Db>> dbCache_ =
new AtomicReference<ConcurrentHashMap<String, Db>>(
new ConcurrentHashMap<String, Db>());
// DB that contains all builtins
private static Db builtinsDb_;
// Cache of data sources.
protected final CatalogObjectCache<DataSource> dataSources_;
// Cache of known HDFS cache pools. Allows for checking the existence
// of pools without hitting HDFS.
protected final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ =
new CatalogObjectCache<HdfsCachePool>(false);
public Catalog() {
dataSources_ = new CatalogObjectCache<DataSource>();
builtinsDb_ = new BuiltinsDb(BUILTINS_DB, this);
addDb(builtinsDb_);
}
/**
* Creates a new instance of Catalog. It also adds 'numClients' clients to
* 'metastoreClientPool_'.
* 'initialCnxnTimeoutSec' specifies the time (in seconds) Catalog will wait to
* establish an initial connection to the HMS. Using this setting allows catalogd and
* HMS to be started simultaneously.
*/
public Catalog(int numClients, int initialCnxnTimeoutSec) {
this();
metaStoreClientPool_.initClients(numClients, initialCnxnTimeoutSec);
}
public Db getBuiltinsDb() { return builtinsDb_; }
/**
* Adds a new database to the catalog, replacing any existing database with the same
* name. Returns the previous database with this name, or null if there was no
* previous database.
*/
public Db addDb(Db db) {
return dbCache_.get().put(db.getName().toLowerCase(), db);
}
/**
* Gets the Db object from the Catalog using a case-insensitive lookup on the name.
* Returns null if no matching database is found.
*/
public Db getDb(String dbName) {
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
"Null or empty database name given as argument to Catalog.getDb");
return dbCache_.get().get(dbName.toLowerCase());
}
/**
* Removes a database from the metadata cache. Returns the value removed or null
* if not database was removed as part of this operation. Used by DROP DATABASE
* statements.
*/
public Db removeDb(String dbName) {
return dbCache_.get().remove(dbName.toLowerCase());
}
/**
* Returns all databases that match 'matcher'.
*/
public List<Db> getDbs(PatternMatcher matcher) {
return filterCatalogObjectsByPattern(dbCache_.get().values(), matcher);
}
/**
* Returns the Table object for the given dbName/tableName. This will trigger a
* metadata load if the table metadata is not yet cached.
*/
public Table getTable(String dbName, String tableName) throws
CatalogException {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return db.getTable(tableName);
}
/**
* Removes a table from the catalog and returns the table that was removed, or null
* if the table/database does not exist.
*/
public Table removeTable(TTableName tableName) {
// Remove the old table name from the cache and add the new table.
Db db = getDb(tableName.getDb_name());
if (db == null) return null;
return db.removeTable(tableName.getTable_name());
}
/**
* Returns all tables in 'dbName' that match 'matcher'.
*
* dbName must not be null.
*
* Table names are returned unqualified.
*/
public List<String> getTableNames(String dbName, PatternMatcher matcher)
throws DatabaseNotFoundException {
Preconditions.checkNotNull(dbName);
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return filterStringsByPattern(db.getAllTableNames(), matcher);
}
/**
* Returns true if the table and the database exist in the Impala Catalog. Returns
* false if either the table or the database do not exist.
*/
public boolean containsTable(String dbName, String tableName) {
Db db = getDb(dbName);
return (db == null) ? false : db.containsTable(tableName);
}
/**
* Adds a data source to the in-memory map of data sources. It is not
* persisted to the metastore.
* @return true if this item was added or false if the existing value was preserved.
*/
public boolean addDataSource(DataSource dataSource) {
return dataSources_.add(dataSource);
}
/**
* Removes a data source from the in-memory map of data sources.
* @return the item that was removed if it existed in the cache, null otherwise.
*/
public DataSource removeDataSource(String dataSourceName) {
Preconditions.checkNotNull(dataSourceName);
return dataSources_.remove(dataSourceName.toLowerCase());
}
/**
* Gets the specified data source.
*/
public DataSource getDataSource(String dataSourceName) {
Preconditions.checkNotNull(dataSourceName);
return dataSources_.get(dataSourceName.toLowerCase());
}
/**
* Gets a list of all data sources.
*/
public List<DataSource> getDataSources() {
return dataSources_.getValues();
}
/**
* Returns a list of data sources names that match pattern.
*
* @see PatternMatcher#matches(String) for details of the pattern match semantics.
*
* pattern may be null (and thus matches everything).
*/
public List<String> getDataSourceNames(String pattern) {
return filterStringsByPattern(dataSources_.keySet(),
PatternMatcher.createHivePatternMatcher(pattern));
}
/**
* Returns all DataSources that match 'matcher'.
*/
public List<DataSource> getDataSources(PatternMatcher matcher) {
return filterCatalogObjectsByPattern(dataSources_.getValues(), matcher);
}
/**
* Adds a function to the catalog.
* Returns true if the function was successfully added.
* Returns false if the function already exists.
* TODO: allow adding a function to a global scope. We probably want this to resolve
* after the local scope.
* e.g. if we had fn() and db.fn(). If the current database is 'db', fn() would
* resolve first to db.fn().
*/
public boolean addFunction(Function fn) {
Db db = getDb(fn.dbName());
if (db == null) return false;
return db.addFunction(fn);
}
/**
* Returns the function that best matches 'desc' that is registered with the
* catalog using 'mode' to check for matching. If desc matches multiple functions
* in the catalog, it will return the function with the strictest matching mode.
* If multiple functions match at the same matching mode, ties are broken by comparing
* argument types in lexical order. Argument types are ordered by argument precision
* (e.g. double is preferred over float) and then by alphabetical order of argument
* type name, to guarantee deterministic results.
*/
public Function getFunction(Function desc, Function.CompareMode mode) {
Db db = getDb(desc.dbName());
if (db == null) return null;
return db.getFunction(desc, mode);
}
public static Function getBuiltin(Function desc, Function.CompareMode mode) {
return builtinsDb_.getFunction(desc, mode);
}
/**
* Removes a function from the catalog. Increments the catalog version and returns
* the Function object that was removed if the function existed, otherwise returns
* null.
*/
public Function removeFunction(Function desc) {
Db db = getDb(desc.dbName());
if (db == null) return null;
return db.removeFunction(desc);
}
/**
* Returns true if there is a function with this function name. Parameters
* are ignored.
*/
public boolean containsFunction(FunctionName name) {
Db db = getDb(name.getDb());
if (db == null) return false;
return db.containsFunction(name.getFunction());
}
/**
* Adds a new HdfsCachePool to the catalog.
*/
public boolean addHdfsCachePool(HdfsCachePool cachePool) {
return hdfsCachePools_.add(cachePool);
}
/**
* Gets a HdfsCachePool given a cache pool name. Returns null if the cache
* pool does not exist.
*/
public HdfsCachePool getHdfsCachePool(String poolName) {
return hdfsCachePools_.get(poolName);
}
/**
* Release the Hive Meta Store Client resources. Can be called multiple times
* (additional calls will be no-ops).
*/
public void close() { metaStoreClientPool_.close(); }
/**
* Returns a managed meta store client from the client connection pool.
*/
public MetaStoreClient getMetaStoreClient() { return metaStoreClientPool_.getClient(); }
/**
* Return all members of 'candidates' that match 'matcher'.
* The results are sorted in String.CASE_INSENSITIVE_ORDER.
* matcher must not be null.
*/
private List<String> filterStringsByPattern(Iterable<String> candidates,
PatternMatcher matcher) {
Preconditions.checkNotNull(matcher);
List<String> filtered = Lists.newArrayList();
for (String candidate: candidates) {
if (matcher.matches(candidate)) filtered.add(candidate);
}
Collections.sort(filtered, String.CASE_INSENSITIVE_ORDER);
return filtered;
}
private static class CatalogObjectOrder implements Comparator<CatalogObject> {
@Override
public int compare(CatalogObject o1, CatalogObject o2) {
return String.CASE_INSENSITIVE_ORDER.compare(o1.getName(), o2.getName());
}
}
private static final CatalogObjectOrder CATALOG_OBJECT_ORDER = new CatalogObjectOrder();
/**
* Return all members of 'candidates' that match 'matcher'.
* The results are sorted in CATALOG_OBJECT_ORDER.
* matcher must not be null.
*/
private <T extends CatalogObject> List<T> filterCatalogObjectsByPattern(
Iterable<? extends T> candidates, PatternMatcher matcher) {
Preconditions.checkNotNull(matcher);
List<T> filtered = Lists.newArrayList();
for (T candidate: candidates) {
if (matcher.matches(candidate.getName())) filtered.add(candidate);
}
Collections.sort(filtered, CATALOG_OBJECT_ORDER);
return filtered;
}
public HdfsPartition getHdfsPartition(String dbName, String tableName,
org.apache.hadoop.hive.metastore.api.Partition msPart) throws CatalogException {
List<TPartitionKeyValue> partitionSpec = Lists.newArrayList();
Table table = getTable(dbName, tableName);
if (!(table instanceof HdfsTable)) {
throw new PartitionNotFoundException(
"Not an HdfsTable: " + dbName + "." + tableName);
}
for (int i = 0; i < msPart.getValues().size(); ++i) {
partitionSpec.add(new TPartitionKeyValue(
((HdfsTable)table).getColumns().get(i).getName(), msPart.getValues().get(i)));
}
return getHdfsPartition(table.getDb().getName(), table.getName(), partitionSpec);
}
/**
* Returns the HdfsPartition object for the given dbName/tableName and partition spec.
* This will trigger a metadata load if the table metadata is not yet cached.
* @throws DatabaseNotFoundException - If the database does not exist.
* @throws TableNotFoundException - If the table does not exist.
* @throws PartitionNotFoundException - If the partition does not exist.
* @throws TableLoadingException - If there is an error loading the table metadata.
*/
public HdfsPartition getHdfsPartition(String dbName, String tableName,
List<TPartitionKeyValue> partitionSpec) throws CatalogException {
String partitionNotFoundMsg =
"Partition not found: " + Joiner.on(", ").join(partitionSpec);
Table table = getTable(dbName, tableName);
// This is not an Hdfs table, throw an error.
if (!(table instanceof HdfsTable)) {
throw new PartitionNotFoundException(partitionNotFoundMsg);
}
// Get the HdfsPartition object for the given partition spec.
HdfsPartition partition =
((HdfsTable) table).getPartitionFromThriftPartitionSpec(partitionSpec);
if (partition == null) throw new PartitionNotFoundException(partitionNotFoundMsg);
return partition;
}
/**
* Returns true if the table contains the given partition spec, otherwise false.
* This may trigger a metadata load if the table metadata is not yet cached.
* @throws DatabaseNotFoundException - If the database does not exist.
* @throws TableNotFoundException - If the table does not exist.
* @throws TableLoadingException - If there is an error loading the table metadata.
*/
public boolean containsHdfsPartition(String dbName, String tableName,
List<TPartitionKeyValue> partitionSpec) throws CatalogException {
try {
return getHdfsPartition(dbName, tableName, partitionSpec) != null;
} catch (PartitionNotFoundException e) {
return false;
}
}
/**
* Gets the thrift representation of a catalog object, given the "object
* description". The object description is just a TCatalogObject with only the
* catalog object type and object name set.
* If the object is not found, a CatalogException is thrown.
*/
public TCatalogObject getTCatalogObject(TCatalogObject objectDesc)
throws CatalogException {
TCatalogObject result = new TCatalogObject();
switch (objectDesc.getType()) {
case DATABASE: {
Db db = getDb(objectDesc.getDb().getDb_name());
if (db == null) {
throw new CatalogException(
"Database not found: " + objectDesc.getDb().getDb_name());
}
result.setType(db.getCatalogObjectType());
result.setCatalog_version(db.getCatalogVersion());
result.setDb(db.toThrift());
break;
}
case TABLE:
case VIEW: {
Table table = getTable(objectDesc.getTable().getDb_name(),
objectDesc.getTable().getTbl_name());
if (table == null) {
throw new CatalogException("Table not found: " +
objectDesc.getTable().getTbl_name());
}
table.getLock().lock();
try {
result.setType(table.getCatalogObjectType());
result.setCatalog_version(table.getCatalogVersion());
result.setTable(table.toThrift());
} finally {
table.getLock().unlock();
}
break;
}
case FUNCTION: {
TFunction tfn = objectDesc.getFn();
Function desc = Function.fromThrift(tfn);
Function fn = getFunction(desc, Function.CompareMode.IS_INDISTINGUISHABLE);
if (fn == null) {
throw new CatalogException("Function not found: " + tfn);
}
result.setType(fn.getCatalogObjectType());
result.setCatalog_version(fn.getCatalogVersion());
result.setFn(fn.toThrift());
break;
}
case DATA_SOURCE: {
String dataSrcName = objectDesc.getData_source().getName();
DataSource dataSrc = getDataSource(dataSrcName);
if (dataSrc == null) {
throw new CatalogException("Data source not found: " + dataSrcName);
}
result.setType(dataSrc.getCatalogObjectType());
result.setCatalog_version(dataSrc.getCatalogVersion());
result.setData_source(dataSrc.toThrift());
break;
}
case HDFS_CACHE_POOL: {
HdfsCachePool pool = getHdfsCachePool(objectDesc.getCache_pool().getPool_name());
if (pool == null) {
throw new CatalogException(
"Hdfs cache pool not found: " + objectDesc.getCache_pool().getPool_name());
}
result.setType(pool.getCatalogObjectType());
result.setCatalog_version(pool.getCatalogVersion());
result.setCache_pool(pool.toThrift());
break;
}
case ROLE:
Role role = authPolicy_.getRole(objectDesc.getRole().getRole_name());
if (role == null) {
throw new CatalogException("Role not found: " +
objectDesc.getRole().getRole_name());
}
result.setType(role.getCatalogObjectType());
result.setCatalog_version(role.getCatalogVersion());
result.setRole(role.toThrift());
break;
case PRIVILEGE:
Role tmpRole = authPolicy_.getRole(objectDesc.getPrivilege().getRole_id());
if (tmpRole == null) {
throw new CatalogException("No role associated with ID: " +
objectDesc.getPrivilege().getRole_id());
}
for (RolePrivilege p: tmpRole.getPrivileges()) {
if (p.getName().equalsIgnoreCase(
objectDesc.getPrivilege().getPrivilege_name())) {
result.setType(p.getCatalogObjectType());
result.setCatalog_version(p.getCatalogVersion());
result.setPrivilege(p.toThrift());
return result;
}
}
throw new CatalogException(String.format("Role '%s' does not contain " +
"privilege: '%s'", tmpRole.getName(),
objectDesc.getPrivilege().getPrivilege_name()));
default: throw new IllegalStateException(
"Unexpected TCatalogObject type: " + objectDesc.getType());
}
return result;
}
public static boolean isDefaultDb(String dbName) {
return DEFAULT_DB.equals(dbName.toLowerCase());
}
}