blob: d2a0a82e47090483aa13f8bfe576586105e3d50a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.ResourceType;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.authorization.SentryConfig;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.JniUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.common.Reference;
import org.apache.impala.hive.executor.UdfExecutor;
import org.apache.impala.thrift.TCatalog;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.SentryProxy;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Specialized Catalog that implements the CatalogService specific Catalog
* APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
* and processing of DDL requests. For each DDL request, the CatalogServiceCatalog
* will return the catalog version that the update will show up in. The client
* can then wait until the statestore sends an update that contains that catalog
* version.
* The CatalogServiceCatalog also manages a global "catalog version". The version
* is incremented and assigned to a CatalogObject whenever it is
* added/modified/removed from the catalog. This means each CatalogObject will have a
* unique version and assigned versions are strictly increasing.
*
* Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
* background by the TableLoadingMgr; tables can be prioritized for loading by calling
* prioritizeLoad(). Background loading can also be enabled for the catalog, in which
* case missing tables (tables that are not yet loaded) are submitted to the
* TableLoadingMgr any table metadata is invalidated and on startup. The metadata of
* fully loaded tables (e.g. HdfsTable, HBaseTable, etc) are updated in-place and don't
* trigger a background metadata load through the TableLoadingMgr. Accessing a table
* that is not yet loaded (via getTable()), will load the table's metadata on-demand,
* out-of-band of the table loading thread pool.
*
* See the class comments in CatalogOpExecutor for a description of the locking protocol
* that should be employed if both the catalog lock and table locks need to be held at
* the same time.
*
* TODO: Consider removing on-demand loading and have everything go through the table
* loading thread pool.
*/
public class CatalogServiceCatalog extends Catalog {
private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
private final TUniqueId catalogServiceId_;
// Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
// protects catalogVersion_, it can be used to perform atomic bulk catalog operations
// since catalogVersion_ cannot change externally while the lock is being held.
// In addition to protecting catalogVersion_, it is currently used for the
// following bulk operations:
// * Building a delta update to send to the statestore in getCatalogObjects(),
// so a snapshot of the catalog can be taken without any version changes.
// * During a catalog invalidation (call to reset()), which re-reads all dbs and tables
// from the metastore.
// * During renameTable(), because a table must be removed and added to the catalog
// atomically (potentially in a different database).
private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
// Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
// with each update to the Catalog. Continued across the lifetime of the object.
// Protected by catalogLock_.
// TODO: Handle overflow of catalogVersion_ and nextTableId_.
// TODO: The name of this variable is misleading and can be interpreted as a property
// of the catalog server. Rename into something that indicates its role as a global
// sequence number assigned to catalog objects.
private long catalogVersion_ = INITIAL_CATALOG_VERSION;
// Manages the scheduling of background table loading.
private final TableLoadingMgr tableLoadingMgr_;
private final boolean loadInBackground_;
// Periodically polls HDFS to get the latest set of known cache pools.
private final ScheduledExecutorService cachePoolReader_ =
Executors.newScheduledThreadPool(1);
// Proxy to access the Sentry Service and also periodically refreshes the
// policy metadata. Null if Sentry Service is not enabled.
private final SentryProxy sentryProxy_;
// Local temporary directory to copy UDF Jars.
private static String localLibraryPath_;
/**
* Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
* will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
* seconds) CatalogServiceCatalog will wait to establish an initial connection to the
* HMS before giving up. Using this setting allows catalogd and HMS to be started
* simultaneously.
*/
public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
int initialHmsCnxnTimeoutSec, SentryConfig sentryConfig, TUniqueId catalogServiceId,
String kerberosPrincipal, String localLibraryPath) {
super(INITIAL_META_STORE_CLIENT_POOL_SIZE, initialHmsCnxnTimeoutSec);
catalogServiceId_ = catalogServiceId;
tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
loadInBackground_ = loadInBackground;
try {
// We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3,
// local, etc.)
if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
cachePoolReader_.scheduleAtFixedRate(
new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
}
} catch (IOException e) {
LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
}
if (sentryConfig != null) {
sentryProxy_ = new SentryProxy(sentryConfig, this, kerberosPrincipal);
} else {
sentryProxy_ = null;
}
localLibraryPath_ = new String("file://" + localLibraryPath);
}
// Timeout for acquiring a table lock
// TODO: Make this configurable
private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
// Time to sleep before retrying to acquire a table lock
private static final int TBL_LOCK_RETRY_MS = 10;
/**
* Tries to acquire catalogLock_ and the lock of 'tbl' in that order. Returns true if it
* successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
* when the function returns. Returns false otherwise and no lock is held in this case.
*/
public boolean tryLockTable(Table tbl) {
long begin = System.currentTimeMillis();
long end;
do {
catalogLock_.writeLock().lock();
if (tbl.getLock().tryLock()) {
if (LOG.isTraceEnabled()) {
end = System.currentTimeMillis();
LOG.trace(String.format("Lock for table %s was acquired in %d msec",
tbl.getFullName(), end - begin));
}
return true;
}
catalogLock_.writeLock().unlock();
try {
// Sleep to avoid spinning and allow other operations to make progress.
Thread.sleep(TBL_LOCK_RETRY_MS);
} catch (InterruptedException e) {
// ignore
}
end = System.currentTimeMillis();
} while (end - begin < TBL_LOCK_TIMEOUT_MS);
return false;
}
/**
* Reads the current set of cache pools from HDFS and updates the catalog.
* Called periodically by the cachePoolReader_.
*/
protected class CachePoolReader implements Runnable {
/**
* This constructor is needed to create a non-threaded execution of the class.
*/
public CachePoolReader() {
super();
}
public void run() {
if (LOG.isTraceEnabled()) LOG.trace("Reloading cache pool names from HDFS");
// Map of cache pool name to CachePoolInfo. Stored in a map to allow Set operations
// to be performed on the keys.
Map<String, CachePoolInfo> currentCachePools = Maps.newHashMap();
try {
DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem();
RemoteIterator<CachePoolEntry> itr = dfs.listCachePools();
while (itr.hasNext()) {
CachePoolInfo cachePoolInfo = itr.next().getInfo();
currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo);
}
} catch (Exception e) {
LOG.error("Error loading cache pools: ", e);
return;
}
catalogLock_.writeLock().lock();
try {
// Determine what has changed relative to what we have cached.
Set<String> droppedCachePoolNames = Sets.difference(
hdfsCachePools_.keySet(), currentCachePools.keySet());
Set<String> createdCachePoolNames = Sets.difference(
currentCachePools.keySet(), hdfsCachePools_.keySet());
// Add all new cache pools.
for (String createdCachePool: createdCachePoolNames) {
HdfsCachePool cachePool = new HdfsCachePool(
currentCachePools.get(createdCachePool));
cachePool.setCatalogVersion(
CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
hdfsCachePools_.add(cachePool);
}
// Remove dropped cache pools.
for (String cachePoolName: droppedCachePoolNames) {
hdfsCachePools_.remove(cachePoolName);
CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
}
} finally {
catalogLock_.writeLock().unlock();
}
}
}
/**
* Adds a list of cache directive IDs for the given table name. Asynchronously
* refreshes the table metadata once all cache directives complete.
*/
public void watchCacheDirs(List<Long> dirIds, TTableName tblName) {
tableLoadingMgr_.watchCacheDirs(dirIds, tblName);
}
/**
* Prioritizes the loading of the given list TCatalogObjects. Currently only support
* loading Table/View metadata since Db and Function metadata is not loaded lazily.
*/
public void prioritizeLoad(List<TCatalogObject> objectDescs) {
for (TCatalogObject catalogObject: objectDescs) {
Preconditions.checkState(catalogObject.isSetTable());
TTable table = catalogObject.getTable();
tableLoadingMgr_.prioritizeLoad(new TTableName(table.getDb_name().toLowerCase(),
table.getTbl_name().toLowerCase()));
}
}
/**
* Returns all known objects in the Catalog (Tables, Views, Databases, and
* Functions). Some metadata may be skipped for objects that have a catalog
* version < the specified "fromVersion". Takes a lock on the catalog to ensure this
* update contains a consistent snapshot of all items in the catalog. While holding the
* catalog lock, it locks each accessed table to protect against concurrent
* modifications.
*/
public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
resp.setObjects(new ArrayList<TCatalogObject>());
resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
catalogLock_.readLock().lock();
try {
for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
db.getCatalogVersion());
catalogDb.setDb(db.toThrift());
resp.addToObjects(catalogDb);
for (String tblName: db.getAllTableNames()) {
TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
Catalog.INITIAL_CATALOG_VERSION);
Table tbl = db.getTable(tblName);
if (tbl == null) {
LOG.error("Table: " + tblName + " was expected to be in the catalog " +
"cache. Skipping table for this update.");
continue;
}
// Protect the table from concurrent modifications.
tbl.getLock().lock();
try {
// Only add the extended metadata if this table's version is >=
// the fromVersion.
if (tbl.getCatalogVersion() >= fromVersion) {
try {
catalogTbl.setTable(tbl.toThrift());
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
db.getName(), tblName, e.getMessage()), e);
}
continue;
}
catalogTbl.setCatalog_version(tbl.getCatalogVersion());
} else {
catalogTbl.setTable(new TTable(db.getName(), tblName));
}
} finally {
tbl.getLock().unlock();
}
resp.addToObjects(catalogTbl);
}
for (Function fn: db.getFunctions(null, new PatternMatcher())) {
TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
fn.getCatalogVersion());
function.setFn(fn.toThrift());
resp.addToObjects(function);
}
}
for (DataSource dataSource: getDataSources()) {
TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
dataSource.getCatalogVersion());
catalogObj.setData_source(dataSource.toThrift());
resp.addToObjects(catalogObj);
}
for (HdfsCachePool cachePool: hdfsCachePools_) {
TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
cachePool.getCatalogVersion());
pool.setCache_pool(cachePool.toThrift());
resp.addToObjects(pool);
}
// Get all roles
for (Role role: authPolicy_.getAllRoles()) {
TCatalogObject thriftRole = new TCatalogObject();
thriftRole.setRole(role.toThrift());
thriftRole.setCatalog_version(role.getCatalogVersion());
thriftRole.setType(role.getCatalogObjectType());
resp.addToObjects(thriftRole);
for (RolePrivilege p: role.getPrivileges()) {
TCatalogObject privilege = new TCatalogObject();
privilege.setPrivilege(p.toThrift());
privilege.setCatalog_version(p.getCatalogVersion());
privilege.setType(p.getCatalogObjectType());
resp.addToObjects(privilege);
}
}
// Each update should contain a single "TCatalog" object which is used to
// pass overall state on the catalog, such as the current version and the
// catalog service id.
TCatalogObject catalog = new TCatalogObject();
catalog.setType(TCatalogObjectType.CATALOG);
// By setting the catalog version to the latest catalog version at this point,
// it ensure impalads will always bump their versions, even in the case where
// an object has been dropped.
catalog.setCatalog_version(getCatalogVersion());
catalog.setCatalog(new TCatalog(catalogServiceId_));
resp.addToObjects(catalog);
// The max version is the max catalog version of all items in the update.
resp.setMax_catalog_version(getCatalogVersion());
return resp;
} finally {
catalogLock_.readLock().unlock();
}
}
/**
* Returns all user defined functions (aggregate and scalar) in the specified database.
* Functions are not returned in a defined order.
*/
public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Contains map of overloaded function names to all functions matching that name.
HashMap<String, List<Function>> dbFns = db.getAllFunctions();
List<Function> fns = new ArrayList<Function>(dbFns.size());
for (List<Function> fnOverloads: dbFns.values()) {
for (Function fn: fnOverloads) {
fns.add(fn);
}
}
return fns;
}
/**
* Checks if the Hive function 'fn' is Impala compatible. A function is Impala
* compatible iff
*
* 1. The function is JAVA based,
* 2. Has exactly one binary resource associated (We don't support loading
* dependencies yet) and
* 3. The binary is of type JAR.
*
* Returns true if compatible and false otherwise. In case of incompatible
* functions 'incompatMsg' has the reason for the incompatibility.
* */
public static boolean isFunctionCompatible(
org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) {
boolean isCompatible = true;
if (fn.getFunctionType() != FunctionType.JAVA) {
isCompatible = false;
incompatMsg.append("Function type: " + fn.getFunctionType().name()
+ " is not supported. Only " + FunctionType.JAVA.name() + " functions "
+ "are supported.");
} else if (fn.getResourceUrisSize() == 0) {
isCompatible = false;
incompatMsg.append("No executable binary resource (like a JAR file) is " +
"associated with this function. To fix this, recreate the function by " +
"specifying a 'location' in the function create statement.");
} else if (fn.getResourceUrisSize() != 1) {
isCompatible = false;
List<String> resourceUris = Lists.newArrayList();
for (ResourceUri resource: fn.getResourceUris()) {
resourceUris.add(resource.getUri());
}
incompatMsg.append("Impala does not support multiple Jars for dependencies."
+ "(" + Joiner.on(",").join(resourceUris) + ") ");
} else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
isCompatible = false;
incompatMsg.append("Function binary type: " +
fn.getResourceUris().get(0).getResourceType().name()
+ " is not supported. Only " + ResourceType.JAR.name()
+ " type is supported.");
}
return isCompatible;
}
/**
* Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
* class referred to by the given Java function. This method copies the UDF Jar
* referenced by "function" to a temporary file in localLibraryPath_ and loads it
* into the jvm. Then we scan all the methods in the class using reflection and extract
* those methods and create corresponding Impala functions. Currently Impala supports
* only "JAR" files for symbols and also a single Jar containing all the dependent
* classes rather than a set of Jar files.
*/
public static List<Function> extractFunctions(String db,
org.apache.hadoop.hive.metastore.api.Function function)
throws ImpalaRuntimeException{
List<Function> result = Lists.newArrayList();
List<String> addedSignatures = Lists.newArrayList();
StringBuilder warnMessage = new StringBuilder();
if (!isFunctionCompatible(function, warnMessage)) {
LOG.warn("Skipping load of incompatible function: " +
function.getFunctionName() + ". " + warnMessage.toString());
return result;
}
String jarUri = function.getResourceUris().get(0).getUri();
Class<?> udfClass = null;
Path localJarPath = null;
try {
localJarPath = new Path(localLibraryPath_, UUID.randomUUID().toString() + ".jar");
try {
FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
} catch (IOException e) {
String errorMsg = "Error loading Java function: " + db + "." +
function.getFunctionName() + ". Couldn't copy " + jarUri +
" to local path: " + localJarPath.toString();
LOG.error(errorMsg, e);
throw new ImpalaRuntimeException(errorMsg);
}
URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};
URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls);
udfClass = urlClassLoader.loadClass(function.getClassName());
// Check if the class is of UDF type. Currently we don't support other functions
// TODO: Remove this once we support Java UDAF/UDTF
if (FunctionUtils.getUDFClassType(udfClass) != FunctionUtils.UDFClassType.UDF) {
LOG.warn("Ignoring load of incompatible Java function: " +
function.getFunctionName() + " as " + FunctionUtils.getUDFClassType(udfClass)
+ " is not a supported type. Only UDFs are supported");
return result;
}
// Load each method in the UDF class and create the corresponding Impala Function
// object.
for (Method m: udfClass.getMethods()) {
if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue;
Function fn = ScalarFunction.fromHiveFunction(db,
function.getFunctionName(), function.getClassName(),
m.getParameterTypes(), m.getReturnType(), jarUri);
if (fn == null) {
LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of " +
"Hive UDF:" + function.getFunctionName() + " from " + udfClass);
continue;
}
if (!addedSignatures.contains(fn.signatureString())) {
result.add(fn);
addedSignatures.add(fn.signatureString());
}
}
} catch (ClassNotFoundException c) {
String errorMsg = "Error loading Java function: " + db + "." +
function.getFunctionName() + ". Symbol class " + udfClass +
"not found in Jar: " + jarUri;
LOG.error(errorMsg);
throw new ImpalaRuntimeException(errorMsg, c);
} catch (Exception e) {
LOG.error("Skipping function load: " + function.getFunctionName(), e);
throw new ImpalaRuntimeException("Error extracting functions", e);
} catch (LinkageError e) {
String errorMsg = "Error resolving dependencies for Java function: " + db + "." +
function.getFunctionName();
LOG.error(errorMsg);
throw new ImpalaRuntimeException(errorMsg, e);
} finally {
if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath);
}
return result;
}
/**
* Extracts Impala functions stored in metastore db parameters and adds them to
* the catalog cache.
*/
private void loadFunctionsFromDbParams(Db db,
org.apache.hadoop.hive.metastore.api.Database msDb) {
if (msDb == null || msDb.getParameters() == null) return;
LOG.info("Loading native functions for database: " + db.getName());
TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
for (String key: msDb.getParameters().keySet()) {
if (!key.startsWith(Db.FUNCTION_INDEX_PREFIX)) continue;
try {
TFunction fn = new TFunction();
JniUtil.deserializeThrift(protocolFactory, fn,
Base64.decodeBase64(msDb.getParameters().get(key)));
Function addFn = Function.fromThrift(fn);
db.addFunction(addFn, false);
addFn.setCatalogVersion(incrementAndGetCatalogVersion());
} catch (ImpalaException e) {
LOG.error("Encountered an error during function load: key=" + key
+ ",continuing", e);
}
}
LOG.info("Loaded native functions for database: " + db.getName());
}
/**
* Loads Java functions into the catalog. For each function in "functions",
* we extract all Impala compatible evaluate() signatures and load them
* as separate functions in the catalog.
*/
private void loadJavaFunctions(Db db,
List<org.apache.hadoop.hive.metastore.api.Function> functions) {
Preconditions.checkNotNull(functions);
LOG.info("Loading Java functions for database: " + db.getName());
for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
try {
for (Function fn: extractFunctions(db.getName(), function)) {
db.addFunction(fn);
fn.setCatalogVersion(incrementAndGetCatalogVersion());
}
} catch (Exception e) {
LOG.error("Skipping function load: " + function.getFunctionName(), e);
}
}
LOG.info("Loaded Java functions for database: " + db.getName());
}
/**
* Reloads function metadata for 'dbName' database. Populates the 'addedFuncs' list
* with functions that were added as a result of this operation. Populates the
* 'removedFuncs' list with functions that were removed.
*/
public void refreshFunctions(MetaStoreClient msClient, String dbName,
List<TCatalogObject> addedFuncs, List<TCatalogObject> removedFuncs)
throws CatalogException {
// Create a temporary database that will contain all the functions from the HMS.
Db tmpDb;
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
Lists.newArrayList();
for (String javaFn : msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
}
// Contains native functions in it's params map.
org.apache.hadoop.hive.metastore.api.Database msDb =
msClient.getHiveClient().getDatabase(dbName);
tmpDb = new Db(dbName, this, null);
// Load native UDFs into the temporary db.
loadFunctionsFromDbParams(tmpDb, msDb);
// Load Java UDFs from HMS into the temporary db.
loadJavaFunctions(tmpDb, javaFns);
Db db = dbCache_.get().get(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Load transient functions into the temporary db.
for (Function fn: db.getTransientFunctions()) tmpDb.addFunction(fn);
// Compute the removed functions and remove them from the db.
for (Map.Entry<String, List<Function>> e: db.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
if (tmpDb.getFunction(
fn, Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
removedFuncs.add(fn.toTCatalogObject());
}
}
}
// We will re-add all the functions to the db because it's possible that a
// function was dropped and a different function (for example, the binary is
// different) with the same name and signature was re-added in Hive.
db.removeAllFunctions();
for (Map.Entry<String, List<Function>> e: tmpDb.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
// We do not need to increment and acquire a new catalog version for this
// function here because this already happens when the functions are loaded
// into tmpDb.
db.addFunction(fn);
addedFuncs.add(fn.toTCatalogObject());
}
}
} catch (Exception e) {
throw new CatalogException("Error refreshing functions in " + dbName + ": ", e);
}
}
/**
* Invalidates the database 'db'. This method can have potential race
* conditions with external changes to the Hive metastore and hence any
* conflicting changes to the objects can manifest in the form of exceptions
* from the HMS calls which are appropriately handled. Returns the invalidated
* 'Db' object along with list of tables to be loaded by the TableLoadingMgr.
* Returns null if the method encounters an exception during invalidation.
*/
private Pair<Db, List<TTableName>> invalidateDb(
MetaStoreClient msClient, String dbName, Db existingDb) {
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
Lists.newArrayList();
for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
}
org.apache.hadoop.hive.metastore.api.Database msDb =
msClient.getHiveClient().getDatabase(dbName);
Db newDb = new Db(dbName, this, msDb);
// existingDb is usually null when the Catalog loads for the first time.
// In that case we needn't restore any transient functions.
if (existingDb != null) {
// Restore UDFs that aren't persisted. They are only cleaned up on
// Catalog restart.
for (Function fn: existingDb.getTransientFunctions()) {
newDb.addFunction(fn);
fn.setCatalogVersion(incrementAndGetCatalogVersion());
}
}
// Reload native UDFs.
loadFunctionsFromDbParams(newDb, msDb);
// Reload Java UDFs from HMS.
loadJavaFunctions(newDb, javaFns);
newDb.setCatalogVersion(incrementAndGetCatalogVersion());
List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
Table incompleteTbl = IncompleteTable.createUninitializedTable(newDb, tableName);
incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
newDb.addTable(incompleteTbl);
if (loadInBackground_) {
tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
}
}
return Pair.create(newDb, tblsToBackgroundLoad);
} catch (Exception e) {
LOG.warn("Encountered an exception while invalidating database: " + dbName +
". Ignoring further load of this db.", e);
}
return null;
}
/**
* Resets this catalog instance by clearing all cached table and database metadata.
*/
public void reset() throws CatalogException {
LOG.info("Invalidating all metadata.");
// First update the policy metadata.
if (sentryProxy_ != null) {
// Sentry Service is enabled.
try {
// Update the authorization policy, waiting for the result to complete.
sentryProxy_.refresh();
} catch (Exception e) {
throw new CatalogException("Error updating authorization policy: ", e);
}
}
catalogLock_.writeLock().lock();
try {
// Not all Java UDFs are persisted to the metastore. The ones which aren't
// should be restored once the catalog has been invalidated.
Map<String, Db> oldDbCache = dbCache_.get();
// Build a new DB cache, populate it, and replace the existing cache in one
// step.
ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
try (MetaStoreClient msClient = getMetaStoreClient()) {
for (String dbName: msClient.getHiveClient().getAllDatabases()) {
dbName = dbName.toLowerCase();
Db oldDb = oldDbCache.get(dbName);
Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
dbName, oldDb);
if (invalidatedDb == null) continue;
newDbCache.put(dbName, invalidatedDb.first);
tblsToBackgroundLoad.addAll(invalidatedDb.second);
}
}
dbCache_.set(newDbCache);
// Submit tables for background loading.
for (TTableName tblName: tblsToBackgroundLoad) {
tableLoadingMgr_.backgroundLoad(tblName);
}
} catch (Exception e) {
LOG.error(e);
throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
} finally {
catalogLock_.writeLock().unlock();
}
LOG.info("Invalidated all metadata.");
}
/**
* Adds a database name to the metadata cache and returns the database's
* new Db object. Used by CREATE DATABASE statements.
*/
public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb)
throws ImpalaException {
Db newDb = new Db(dbName, this, msDb);
newDb.setCatalogVersion(incrementAndGetCatalogVersion());
addDb(newDb);
return newDb;
}
/**
* Removes a database from the metadata cache and returns the removed database,
* or null if the database did not exist in the cache.
* Used by DROP DATABASE statements.
*/
@Override
public Db removeDb(String dbName) {
Db removedDb = super.removeDb(dbName);
if (removedDb != null) {
removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
}
return removedDb;
}
/**
* Adds a table with the given name to the catalog and returns the new table,
* loading the metadata if needed.
*/
public Table addTable(String dbName, String tblName) {
Db db = getDb(dbName);
if (db == null) return null;
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(incompleteTable);
return db.getTable(tblName);
}
/**
* Gets the table with the given name, loading it if needed (if the existing catalog
* object is not yet loaded). Returns the matching Table or null if no table with this
* name exists in the catalog.
* If the existing table is dropped or modified (indicated by the catalog version
* changing) while the load is in progress, the loaded value will be discarded
* and the current cached value will be returned. This may mean that a missing table
* (not yet loaded table) will be returned.
*/
public Table getOrLoadTable(String dbName, String tblName)
throws CatalogException {
TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
TableLoadingMgr.LoadRequest loadReq;
long previousCatalogVersion;
// Return the table if it is already loaded or submit a new load request.
catalogLock_.readLock().lock();
try {
Table tbl = getTable(dbName, tblName);
if (tbl == null || tbl.isLoaded()) return tbl;
previousCatalogVersion = tbl.getCatalogVersion();
loadReq = tableLoadingMgr_.loadAsync(tableName);
} finally {
catalogLock_.readLock().unlock();
}
Preconditions.checkNotNull(loadReq);
try {
// The table may have been dropped/modified while the load was in progress, so only
// apply the update if the existing table hasn't changed.
return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
} finally {
loadReq.close();
}
}
/**
* Replaces an existing Table with a new value if it exists and has not changed
* (has the same catalog version as 'expectedCatalogVersion').
*/
private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
throws DatabaseNotFoundException {
catalogLock_.writeLock().lock();
try {
Db db = getDb(updatedTbl.getDb().getName());
if (db == null) {
throw new DatabaseNotFoundException(
"Database does not exist: " + updatedTbl.getDb().getName());
}
Table existingTbl = db.getTable(updatedTbl.getName());
// The existing table does not exist or has been modified. Instead of
// adding the loaded value, return the existing table.
if (existingTbl == null ||
existingTbl.getCatalogVersion() != expectedCatalogVersion) return existingTbl;
updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(updatedTbl);
return updatedTbl;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Removes a table from the catalog and increments the catalog version.
* Returns the removed Table, or null if the table or db does not exist.
*/
public Table removeTable(String dbName, String tblName) {
Db parentDb = getDb(dbName);
if (parentDb == null) return null;
Table removedTable = parentDb.removeTable(tblName);
if (removedTable != null) {
removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
}
return removedTable;
}
/**
* Removes a function from the catalog. Increments the catalog version and returns
* the Function object that was removed. If the function did not exist, null will
* be returned.
*/
@Override
public Function removeFunction(Function desc) {
Function removedFn = super.removeFunction(desc);
if (removedFn != null) {
removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
}
return removedFn;
}
/**
* Adds a function from the catalog, incrementing the catalog version. Returns true if
* the add was successful, false otherwise.
*/
@Override
public boolean addFunction(Function fn) {
Db db = getDb(fn.getFunctionName().getDb());
if (db == null) return false;
if (db.addFunction(fn)) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
return true;
}
return false;
}
/**
* Adds a data source to the catalog, incrementing the catalog version. Returns true
* if the add was successful, false otherwise.
*/
@Override
public boolean addDataSource(DataSource dataSource) {
if (dataSources_.add(dataSource)) {
dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
return true;
}
return false;
}
@Override
public DataSource removeDataSource(String dataSourceName) {
DataSource dataSource = dataSources_.remove(dataSourceName);
if (dataSource != null) {
dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
}
return dataSource;
}
/**
* Returns the table parameter 'transient_lastDdlTime', or -1 if it's not set.
* TODO: move this to a metastore helper class.
*/
public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) {
Preconditions.checkNotNull(msTbl);
Map<String, String> params = msTbl.getParameters();
String lastDdlTimeStr = params.get("transient_lastDdlTime");
if (lastDdlTimeStr != null) {
try {
return Long.parseLong(lastDdlTimeStr);
} catch (NumberFormatException e) {}
}
return -1;
}
/**
* Updates the cached lastDdlTime for the given table. The lastDdlTime is used during
* the metadata refresh() operations to determine if there have been any external
* (outside of Impala) modifications to the table.
*/
public void updateLastDdlTime(TTableName tblName, long ddlTime) {
Db db = getDb(tblName.getDb_name());
if (db == null) return;
Table tbl = db.getTable(tblName.getTable_name());
if (tbl == null) return;
tbl.updateLastDdlTime(ddlTime);
}
/**
* Renames a table. Equivalent to an atomic drop + add of the table. Returns
* the new Table object with an incremented catalog version or null if the
* drop or add were unsuccessful. If null is returned, then the catalog cache
* is in one of the following two states:
* 1. Old table was not removed, and new table was not added
* 2. Old table was removed, but new table was not added
*/
public Table renameTable(TTableName oldTableName, TTableName newTableName)
throws CatalogException {
// Remove the old table name from the cache and add the new table.
Db db = getDb(oldTableName.getDb_name());
if (db == null) return null;
Table oldTable = db.removeTable(oldTableName.getTable_name());
if (oldTable == null) return null;
return addTable(newTableName.getDb_name(), newTableName.getTable_name());
}
/**
* Reloads metadata for table 'tbl' which must not be an IncompleteTable. Updates the
* table metadata in-place by calling load() on the given table. Returns the
* TCatalogObject representing 'tbl'. Applies proper synchronization to protect the
* metadata load from concurrent table modifications and assigns a new catalog version.
* Throws a CatalogException if there is an error loading table metadata.
*/
public TCatalogObject reloadTable(Table tbl) throws CatalogException {
LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
String tblName = tbl.getName();
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error refreshing metadata for table " +
"%s due to lock contention", tbl.getFullName()));
}
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
catalogLock_.writeLock().unlock();
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl = null;
try {
msTbl = msClient.getHiveClient().getTable(dbName, tblName);
} catch (Exception e) {
throw new TableLoadingException("Error loading metadata for table: " +
dbName + "." + tblName, e);
}
tbl.load(true, msClient.getHiveClient(), msTbl);
}
tbl.setCatalogVersion(newCatalogVersion);
LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
return tbl.toTCatalogObject();
} finally {
Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
}
}
/**
* Drops the partitions specified in 'partitionSet' from 'tbl'. Throws a
* CatalogException if 'tbl' is not an HdfsTable. Returns the target table.
*/
public Table dropPartitions(Table tbl, List<List<TPartitionKeyValue>> partitionSet)
throws CatalogException {
Preconditions.checkNotNull(tbl);
Preconditions.checkNotNull(partitionSet);
Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
if (!(tbl instanceof HdfsTable)) {
throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table");
}
HdfsTable hdfsTable = (HdfsTable) tbl;
List<HdfsPartition> partitions =
hdfsTable.getPartitionsFromPartitionSet(partitionSet);
hdfsTable.dropPartitions(partitions);
return hdfsTable;
}
/**
* Adds a partition to its HdfsTable and returns the modified table.
*/
public Table addPartition(HdfsPartition partition) throws CatalogException {
Preconditions.checkNotNull(partition);
HdfsTable hdfsTable = partition.getTable();
hdfsTable.addPartition(partition);
return hdfsTable;
}
/**
* Invalidates the table in the catalog cache, potentially adding/removing the table
* from the cache based on whether it exists in the Hive Metastore.
* The invalidation logic is:
* - If the table exists in the Metastore, add it to the catalog as an uninitialized
* IncompleteTable (replacing any existing entry). The table metadata will be
* loaded lazily, on the next access. If the parent database for this table does not
* yet exist in Impala's cache it will also be added.
* - If the table does not exist in the Metastore, remove it from the catalog cache.
* - If we are unable to determine whether the table exists in the Metastore (there was
* an exception thrown making the RPC), invalidate any existing Table by replacing
* it with an uninitialized IncompleteTable.
* Returns the thrift representation of the added/updated/removed table, or null if
* the table was not present in the catalog cache or the Metastore.
* Sets tblWasRemoved to true if the table was absent from the Metastore and it was
* removed from the catalog cache.
* Sets dbWasAdded to true if both a new database and table were added to the catalog
* cache.
*/
public TCatalogObject invalidateTable(TTableName tableName,
Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded) {
tblWasRemoved.setRef(false);
dbWasAdded.setRef(false);
String dbName = tableName.getDb_name();
String tblName = tableName.getTable_name();
LOG.info(String.format("Invalidating table metadata: %s.%s", dbName, tblName));
// Stores whether the table exists in the metastore. Can have three states:
// 1) true - Table exists in metastore.
// 2) false - Table does not exist in metastore.
// 3) unknown (null) - There was exception thrown by the metastore client.
Boolean tableExistsInMetaStore;
Db db = null;
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Database msDb = null;
try {
tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
} catch (UnknownDBException e) {
// The parent database does not exist in the metastore. Treat this the same
// as if the table does not exist.
tableExistsInMetaStore = false;
} catch (TException e) {
LOG.error("Error executing tableExists() metastore call: " + tblName, e);
tableExistsInMetaStore = null;
}
if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
Table result = removeTable(dbName, tblName);
if (result == null) return null;
tblWasRemoved.setRef(true);
result.getLock().lock();
try {
return result.toTCatalogObject();
} finally {
result.getLock().unlock();
}
}
db = getDb(dbName);
if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) {
// The table does not exist in our cache AND it is unknown whether the
// table exists in the Metastore. Do nothing.
return null;
} else if (db == null && tableExistsInMetaStore) {
// The table exists in the Metastore, but our cache does not contain the parent
// database. A new db will be added to the cache along with the new table. msDb
// must be valid since tableExistsInMetaStore is true.
try {
msDb = msClient.getHiveClient().getDatabase(dbName);
Preconditions.checkNotNull(msDb);
db = new Db(dbName, this, msDb);
db.setCatalogVersion(incrementAndGetCatalogVersion());
addDb(db);
dbWasAdded.setRef(true);
} catch (TException e) {
// The Metastore database cannot be get. Log the error and return.
LOG.error("Error executing getDatabase() metastore call: " + dbName, e);
return null;
}
}
}
// Add a new uninitialized table to the table cache, effectively invalidating
// any existing entry. The metadata for the table will be loaded lazily, on the
// on the next access to the table.
Table newTable = IncompleteTable.createUninitializedTable(db, tblName);
newTable.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(newTable);
if (loadInBackground_) {
tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
tblName.toLowerCase()));
}
if (dbWasAdded.getRef()) {
// The database should always have a lower catalog version than the table because
// it needs to be created before the table can be added.
Preconditions.checkState(db.getCatalogVersion() < newTable.getCatalogVersion());
}
return newTable.toTCatalogObject();
}
/**
* Adds a new role with the given name and grant groups to the AuthorizationPolicy.
* If a role with the same name already exists it will be overwritten.
*/
public Role addRole(String roleName, Set<String> grantGroups) {
catalogLock_.writeLock().lock();
try {
Role role = new Role(roleName, grantGroups);
role.setCatalogVersion(incrementAndGetCatalogVersion());
authPolicy_.addRole(role);
return role;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Removes the role with the given name from the AuthorizationPolicy. Returns the
* removed role with an incremented catalog version, or null if no role with this name
* exists.
*/
public Role removeRole(String roleName) {
catalogLock_.writeLock().lock();
try {
Role role = authPolicy_.removeRole(roleName);
if (role == null) return null;
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Adds a grant group to the given role name and returns the modified Role with
* an updated catalog version. If the role does not exist a CatalogException is thrown.
*/
public Role addRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
catalogLock_.writeLock().lock();
try {
Role role = authPolicy_.addGrantGroup(roleName, groupName);
Preconditions.checkNotNull(role);
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Removes a grant group from the given role name and returns the modified Role with
* an updated catalog version. If the role does not exist a CatalogException is thrown.
*/
public Role removeRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
catalogLock_.writeLock().lock();
try {
Role role = authPolicy_.removeGrantGroup(roleName, groupName);
Preconditions.checkNotNull(role);
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Adds a privilege to the given role name. Returns the new RolePrivilege and
* increments the catalog version. If the parent role does not exist a CatalogException
* is thrown.
*/
public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
throws CatalogException {
catalogLock_.writeLock().lock();
try {
Role role = authPolicy_.getRole(roleName);
if (role == null) throw new CatalogException("Role does not exist: " + roleName);
RolePrivilege priv = RolePrivilege.fromThrift(thriftPriv);
priv.setCatalogVersion(incrementAndGetCatalogVersion());
authPolicy_.addPrivilege(priv);
return priv;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Removes a RolePrivilege from the given role name. Returns the removed
* RolePrivilege with an incremented catalog version or null if no matching privilege
* was found. Throws a CatalogException if no role exists with this name.
*/
public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
throws CatalogException {
catalogLock_.writeLock().lock();
try {
Role role = authPolicy_.getRole(roleName);
if (role == null) throw new CatalogException("Role does not exist: " + roleName);
RolePrivilege rolePrivilege =
role.removePrivilege(thriftPriv.getPrivilege_name());
if (rolePrivilege == null) return null;
rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
return rolePrivilege;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Gets a RolePrivilege from the given role name. Returns the privilege if it exists,
* or null if no privilege matching the privilege spec exist.
* Throws a CatalogException if the role does not exist.
*/
public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)
throws CatalogException {
catalogLock_.readLock().lock();
try {
Role role = authPolicy_.getRole(roleName);
if (role == null) throw new CatalogException("Role does not exist: " + roleName);
return role.getPrivilege(privSpec.getPrivilege_name());
} finally {
catalogLock_.readLock().unlock();
}
}
/**
* Increments the current Catalog version and returns the new value.
*/
public long incrementAndGetCatalogVersion() {
catalogLock_.writeLock().lock();
try {
return ++catalogVersion_;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Returns the current Catalog version.
*/
public long getCatalogVersion() {
catalogLock_.readLock().lock();
try {
return catalogVersion_;
} finally {
catalogLock_.readLock().unlock();
}
}
public ReentrantReadWriteLock getLock() { return catalogLock_; }
public SentryProxy getSentryProxy() { return sentryProxy_; }
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
/**
* Reloads metadata for the partition defined by the partition spec
* 'partitionSpec' in table 'tbl'. Returns the resulting table's TCatalogObject after
* the partition metadata was reloaded.
*/
public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
throws CatalogException {
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error reloading partition of table %s " +
"due to lock contention", tbl.getFullName()));
}
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
catalogLock_.writeLock().unlock();
HdfsTable hdfsTable = (HdfsTable) tbl;
HdfsPartition hdfsPartition = hdfsTable
.getPartitionFromThriftPartitionSpec(partitionSpec);
// Retrieve partition name from existing partition or construct it from
// the partition spec
String partitionName = hdfsPartition == null
? HdfsTable.constructPartitionName(partitionSpec)
: hdfsPartition.getPartitionName();
LOG.info(String.format("Refreshing partition metadata: %s %s",
hdfsTable.getFullName(), partitionName));
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
try {
hmsPartition = msClient.getHiveClient().getPartition(
hdfsTable.getDb().getName(), hdfsTable.getName(), partitionName);
} catch (NoSuchObjectException e) {
// If partition does not exist in Hive Metastore, remove it from the
// catalog
if (hdfsPartition != null) {
hdfsTable.dropPartition(partitionSpec);
hdfsTable.setCatalogVersion(newCatalogVersion);
}
return hdfsTable.toTCatalogObject();
} catch (Exception e) {
throw new CatalogException("Error loading metadata for partition: "
+ hdfsTable.getFullName() + " " + partitionName, e);
}
hdfsTable.reloadPartition(hdfsPartition, hmsPartition);
}
hdfsTable.setCatalogVersion(newCatalogVersion);
LOG.info(String.format("Refreshed partition metadata: %s %s",
hdfsTable.getFullName(), partitionName));
return hdfsTable.toTCatalogObject();
} finally {
Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
}
}
}