blob: 7798c3e78461f38f5ca8d88ff49b1067ec6d8b97 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.Reference;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.CatalogServiceConstants;
import org.apache.impala.thrift.TCatalog;
import org.apache.impala.thrift.TCatalogInfoSelector;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCatalogUpdateResult;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TGetCatalogUsageResponse;
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
import org.apache.impala.thrift.TGetPartitionStatsRequest;
import org.apache.impala.thrift.TPartialCatalogInfo;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.thrift.TPrincipalType;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableUsage;
import org.apache.impala.thrift.TTableUsageMetrics;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.CatalogBlacklistUtils;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.TUniqueIdUtil;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
* Specialized Catalog that implements the CatalogService specific Catalog
* APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
* and processing of DDL requests. The CatalogServiceCatalog maintains a global
* "catalog version". The version is incremented and assigned to a CatalogObject whenever
* it is added/modified/removed from the catalog. This means each CatalogObject will have
* a unique version and assigned versions are strictly increasing.
* Periodically, the CatalogServiceCatalog collects a delta of catalog updates (based on a
* specified catalog version) and constructs a topic update to be sent to the statestore.
* Each catalog topic update is defined by a range of catalog versions (from, to] and the
* CatalogServiceCatalog guarantees that every catalog object that has a version in the
* specified range is included in the catalog topic update. Concurrent DDL requests are
* allowed while a topic update is in progress. Hence, there is a non-zero probability
* that frequently modified catalog objects may keep skipping topic updates. That can
* happen when by the time a topic update thread tries to collect an object update, that
* object is being modified by another metadata operation, causing its version to surpass
* the 'to' version of the topic update. To ensure that all catalog updates
* are eventually included in a catalog topic update, we keep track of the number of times
* each catalog object has skipped a topic update and if that number exceeds a specified
* threshold, we add the catalog object to the next topic update even if its version is
* higher than the 'to' version of the topic update. As a result, the same version of an
* object might be sent in two subsequent topic updates.
* The CatalogServiceCatalog maintains two logs:
* - Delete log. Since deleted objects are removed from the cache, the cache itself is
* not useful for tracking deletions. This log is used for populating the list of
* deleted objects during a topic update by recording the catalog objects that
* have been removed from the catalog. An entry with a new version is added to this log
* every time an object is removed (e.g. dropTable). Incrementing an object's version
* and adding it to the delete log should be performed atomically. An entry is removed
* from this log by the topic update thread when the associated deletion entry is
* added to a topic update.
* - Topic update log. This log records information about the catalog objects that have
* been included in a catalog topic update. Only the thread that is processing the
* topic update is responsible for adding, updating, and removing entries from the log.
* All other operations (e.g. addTable) only read topic update log entries but never
* modify them. Each entry includes the number of times a catalog object has
* skipped a topic update, which version of the object was last sent in a topic update
* and what was the version of that topic update. Entries of the topic update log are
* garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
* update processing thread to prevent the log from growing indefinitely. Metadata
* operations using SYNC_DDL are inspecting this log to identify the catalog topic
* version that the issuing impalad must wait for in order to ensure that the effects
* of this operation have been broadcast to all the coordinators.
* Known anomalies with SYNC_DDL:
* The time-based cleanup process of the topic update log entries may cause metadata
* operations that use SYNC_DDL to hang while waiting for specific topic update log
* entries. That could happen if the thread processing the metadata operation stalls
* for a long period of time (longer than the time to process
* TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
* applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
* the probability of such an event, we set the value of the
* TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
* from hanging in that path due to unknown issues (e.g. bugs), operations using
* SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
* exception is thrown if the specified max wait time is exceeded. See
* waitForSyncDdlVersion() for more details.
* Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
* background by the TableLoadingMgr; tables can be prioritized for loading by calling
* prioritizeLoad(). Background loading can also be enabled for the catalog, in which
* case missing tables (tables that are not yet loaded) are submitted to the
* TableLoadingMgr any table metadata is invalidated and on startup. The metadata of
* fully loaded tables (e.g. HdfsTable, HBaseTable, etc) are updated in-place and don't
* trigger a background metadata load through the TableLoadingMgr. Accessing a table
* that is not yet loaded (via getTable()), will load the table's metadata on-demand,
* out-of-band of the table loading thread pool.
* See the class comments in CatalogOpExecutor for a description of the locking protocol
* that should be employed if both the version lock and table locks need to be held at
* the same time.
* TODO: Consider removing on-demand loading and have everything go through the table
* loading thread pool.
public class CatalogServiceCatalog extends Catalog {
public static final Logger LOG = LoggerFactory.getLogger(CatalogServiceCatalog.class);
private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
// Timeout for acquiring a table lock
// TODO: Make this configurable
private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
// Time to sleep before retrying to acquire a table lock
private static final int TBL_LOCK_RETRY_MS = 10;
private final TUniqueId catalogServiceId_;
// Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
// protects catalogVersion_, it can be used to perform atomic bulk catalog operations
// since catalogVersion_ cannot change externally while the lock is being held.
// In addition to protecting catalogVersion_, it is currently used for the
// following bulk operations:
// * Building a delta update to send to the statestore in getCatalogObjects(),
// so a snapshot of the catalog can be taken without any version changes.
// * During a catalog invalidation (call to reset()), which re-reads all dbs and tables
// from the metastore.
// * During renameTable(), because a table must be removed and added to the catalog
// atomically (potentially in a different database).
private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true);
// Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
// with each update to the Catalog. Continued across the lifetime of the object.
// Protected by versionLock_.
// TODO: Handle overflow of catalogVersion_ and nextTableId_.
// TODO: The name of this variable is misleading and can be interpreted as a property
// of the catalog server. Rename into something that indicates its role as a global
// sequence number assigned to catalog objects.
private long catalogVersion_ = INITIAL_CATALOG_VERSION;
// Manages the scheduling of background table loading.
private final TableLoadingMgr tableLoadingMgr_;
private final boolean loadInBackground_;
// Periodically polls HDFS to get the latest set of known cache pools.
private final ScheduledExecutorService cachePoolReader_ =
// Log of deleted catalog objects.
private final CatalogDeltaLog deleteLog_;
// Version of the last topic update returned to the statestore.
// The version of a topic update is the catalog version of the CATALOG object
// that is added to it.
private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1);
// Wait time for a topic update.
private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000;
private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
private final String localLibraryPath_;
private CatalogdTableInvalidator catalogdTableInvalidator_;
// Manages the event processing from metastore for issuing invalidates on tables
private ExternalEventsProcessor metastoreEventProcessor_;
* See the gflag definition in be/.../ for details on these modes.
private static enum TopicMode {
final TopicMode topicMode_;
private final long PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S = BackendConfig.INSTANCE
// Controls concurrent access to doGetPartialCatalogObject() call. Limits the number
// of parallel requests to --catalog_max_parallel_partial_fetch_rpc.
private final Semaphore partialObjectFetchAccess_ =
new Semaphore(MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT, /*fair =*/ true);
private AuthorizationManager authzManager_;
// Databases that will be skipped in loading.
private final Set<String> blacklistedDbs_;
// Tables that will be skipped in loading.
private final Set<TableName> blacklistedTables_;
* Initialize the CatalogServiceCatalog using a given MetastoreClientPool impl.
* @param loadInBackground If true, table metadata will be loaded in the background.
* @param numLoadingThreads Number of threads used to load table metadata.
* @param metaStoreClientPool A pool of HMS clients backing this Catalog.
* @throws ImpalaException
public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
TUniqueId catalogServiceId, String localLibraryPath,
MetaStoreClientPool metaStoreClientPool)
throws ImpalaException {
blacklistedDbs_ = CatalogBlacklistUtils.parseBlacklistedDbs(
BackendConfig.INSTANCE.getBlacklistedDbs(), LOG);
blacklistedTables_ = CatalogBlacklistUtils.parseBlacklistedTables(
BackendConfig.INSTANCE.getBlacklistedTables(), LOG);
catalogServiceId_ = catalogServiceId;
tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
loadInBackground_ = loadInBackground;
try {
// We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3,
// local, etc.)
if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
new CachePoolReader(false), 0, 1, TimeUnit.MINUTES);
} catch (IOException e) {
LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
localLibraryPath_ = localLibraryPath;
deleteLog_ = new CatalogDeltaLog();
topicMode_ = TopicMode.valueOf(
catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
metastoreEventProcessor_ = getEventsProcessor();
Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
// start polling for metastore events
* Initializes the Catalog using the default MetastoreClientPool impl.
* @param initialHmsCnxnTimeoutSec Time (in seconds) CatalogServiceCatalog will wait
* to establish an initial connection to the HMS before giving up.
public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
int initialHmsCnxnTimeoutSec, TUniqueId catalogServiceId, String localLibraryPath)
throws ImpalaException {
this(loadInBackground, numLoadingThreads, catalogServiceId, localLibraryPath,
* Check whether the database is in blacklist
public boolean isBlacklistedDb(String dbName) {
return blacklistedDbs_.contains(dbName);
* Check whether the table is in blacklist
public boolean isBlacklistedTable(TableName table) {
return blacklistedTables_.contains(table);
* Check whether the table is in blacklist
public boolean isBlacklistedTable(String db, String table) {
return isBlacklistedTable(new TableName(db, table));
public void setAuthzManager(AuthorizationManager authzManager) {
authzManager_ = Preconditions.checkNotNull(authzManager);
* Returns a Metastore event processor object if
* <code>BackendConfig#getHMSPollingIntervalInSeconds</code> returns a non-zero
*.value of polling interval. Otherwise, returns a no-op events processor. It is
* important to fetch the current notification event id at the Catalog service
* initialization time so that event processor starts to sync at the event id
* corresponding to the catalog start time.
private ExternalEventsProcessor getEventsProcessor() throws ImpalaException {
long eventPollingInterval = BackendConfig.INSTANCE.getHMSPollingIntervalInSeconds();
if (eventPollingInterval <= 0) {
.format("Metastore event processing is disabled. Event polling interval is %d",
return NoOpEventProcessor.getInstance();
try (MetaStoreClient metaStoreClient = getMetaStoreClient()) {
CurrentNotificationEventId currentNotificationId =
return MetastoreEventsProcessor.getInstance(
this, currentNotificationId.getEventId(), eventPollingInterval);
} catch (TException e) {
LOG.error("Unable to fetch the current notification event id from metastore."
+ "Metastore event processing will be disabled.", e);
throw new CatalogException(
"Fatal error while initializing metastore event processor", e);
public ExternalEventsProcessor getMetastoreEventProcessor() {
return metastoreEventProcessor_;
public boolean isExternalEventProcessingEnabled() {
return !(metastoreEventProcessor_ instanceof NoOpEventProcessor);
* Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
* successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
* when the function returns. Returns false otherwise and no lock is held in this case.
public boolean tryLockTable(Table tbl) {
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
"Attempting to lock table " + tbl.getFullName())) {
long begin = System.currentTimeMillis();
long end;
do {
if (tbl.getLock().tryLock()) {
if (LOG.isTraceEnabled()) {
end = System.currentTimeMillis();
LOG.trace(String.format("Lock for table %s was acquired in %d msec",
tbl.getFullName(), end - begin));
return true;
try {
// Sleep to avoid spinning and allow other operations to make progress.
} catch (InterruptedException e) {
// ignore
end = System.currentTimeMillis();
} while (end - begin < TBL_LOCK_TIMEOUT_MS);
return false;
* Reads the current set of cache pools from HDFS and updates the catalog.
* Called periodically by the cachePoolReader_.
protected class CachePoolReader implements Runnable {
// If true, existing cache pools will get a new catalog version and, consequently,
// they will be added to the next topic update, triggering an update in each
// coordinator's local catalog cache. This is needed for the case of INVALIDATE
// METADATA where a new catalog version needs to be assigned to every catalog object.
private final boolean incrementVersions_;
* This constructor is needed to create a non-threaded execution of the class.
public CachePoolReader(boolean incrementVersions) {
incrementVersions_ = incrementVersions;
public void run() {
if (LOG.isTraceEnabled()) LOG.trace("Reloading cache pool names from HDFS");
// Map of cache pool name to CachePoolInfo. Stored in a map to allow Set operations
// to be performed on the keys.
Map<String, CachePoolInfo> currentCachePools = new HashMap<>();
try {
DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem();
RemoteIterator<CachePoolEntry> itr = dfs.listCachePools();
while (itr.hasNext()) {
CachePoolInfo cachePoolInfo =;
currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo);
} catch (Exception e) {
LOG.error("Error loading cache pools: ", e);
try {
// Determine what has changed relative to what we have cached.
Set<String> droppedCachePoolNames = Sets.difference(
hdfsCachePools_.keySet(), currentCachePools.keySet());
Set<String> createdCachePoolNames = Sets.difference(
currentCachePools.keySet(), hdfsCachePools_.keySet());
Set<String> survivingCachePoolNames = Sets.difference(
hdfsCachePools_.keySet(), droppedCachePoolNames);
// Add all new cache pools.
for (String createdCachePool: createdCachePoolNames) {
HdfsCachePool cachePool = new HdfsCachePool(
// Remove dropped cache pools.
for (String cachePoolName: droppedCachePoolNames) {
HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
if (cachePool != null) {
TCatalogObject removedObject =
new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
if (incrementVersions_) {
// Increment the version of existing pools in order to be added to the next
// topic update.
for (String survivingCachePoolName: survivingCachePoolNames) {
HdfsCachePool cachePool = hdfsCachePools_.get(survivingCachePoolName);
} finally {
public int getPartialFetchRpcQueueLength() {
return partialObjectFetchAccess_.getQueueLength();
* Adds a list of cache directive IDs for the given table name. Asynchronously
* refreshes the table metadata once all cache directives complete.
public void watchCacheDirs(List<Long> dirIds, TTableName tblName, String reason) {
tableLoadingMgr_.watchCacheDirs(dirIds, tblName, reason);
* Prioritizes the loading of the given list TCatalogObjects. Currently only support
* loading Table/View metadata since Db and Function metadata is not loaded lazily.
public void prioritizeLoad(List<TCatalogObject> objectDescs) {
for (TCatalogObject catalogObject: objectDescs) {
TTable table = catalogObject.getTable();
tableLoadingMgr_.prioritizeLoad(new TTableName(table.getDb_name().toLowerCase(),
* Retrieves TPartitionStats as a map that associates partitions with their
* statistics. The table partitions are specified in
* TGetPartitionStatsRequest. If statistics are not available for a partition,
* a default TPartitionStats is used. Partitions are identified by their partitioning
* column string values.
public Map<String, ByteBuffer> getPartitionStats(TGetPartitionStatsRequest request)
throws CatalogException {
TTableName tableName = request.table_name;"Fetching partition statistics for: " + tableName.getDb_name() + "."
+ tableName.getTable_name());
Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
"needed to fetch partition stats");
// Table could be null if it does not exist anymore.
if (table == null) {
throw new CatalogException(
"Requested partition statistics for table that does not exist: "
+ request.table_name);
// Table could be incomplete, in which case an exception should be thrown.
if (table instanceof IncompleteTable) {
throw new CatalogException("No statistics available for incompletely"
+ " loaded table: " + request.table_name, ((IncompleteTable) table).getCause());
// Table must be a FeFsTable type at this point.
Preconditions.checkArgument(table instanceof HdfsTable,
"Partition statistics can only be requested for FS tables, type is: %s",
// Table must be loaded.
Map<String, ByteBuffer> stats = new HashMap<>();
HdfsTable hdfsTable = (HdfsTable) table;
try {
Collection<? extends PrunablePartition> partitions = hdfsTable.getPartitions();
for (PrunablePartition partition : partitions) {
Preconditions.checkState(partition instanceof FeFsPartition);
FeFsPartition fsPartition = (FeFsPartition) partition;
TPartitionStats partStats = fsPartition.getPartitionStats();
if (partStats != null) {
ByteBuffer compressedStats =
stats.put(FeCatalogUtils.getPartitionName(fsPartition), compressedStats);
} finally {
}"Fetched partition statistics for " + stats.size()
+ " partitions on: " + hdfsTable.getFullName());
return stats;
* The context for add*ToCatalogDelta(), called by getCatalogDelta. It contains
* callback information, version range and collected topics.
class GetCatalogDeltaContext {
// The CatalogServer pointer for NativeAddPendingTopicItem() callback.
long nativeCatalogServerPtr;
// The from and to version of this delta.
long fromVersion;
long toVersion;
// The keys of the updated topics.
Set<String> updatedCatalogObjects;
TSerializer serializer;
GetCatalogDeltaContext(long nativeCatalogServerPtr, long fromVersion, long toVersion)
this.nativeCatalogServerPtr = nativeCatalogServerPtr;
this.fromVersion = fromVersion;
this.toVersion = toVersion;
updatedCatalogObjects = new HashSet<>();
serializer = new TSerializer(new TBinaryProtocol.Factory());
void addCatalogObject(TCatalogObject obj, boolean delete) throws TException {
String key = Catalog.toCatalogObjectKey(obj);
if (obj.type != TCatalogObjectType.CATALOG) {
new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion));
if (!delete) updatedCatalogObjects.add(key);
// TODO: TSerializer.serialize() returns a copy of the internal byte array, which
// could be elided.
if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
byte[] data = serializer.serialize(obj);
if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
obj.catalog_version, data, delete)) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", delete="
+ delete + ", data_size=" + data.length);
if (topicMode_ == TopicMode.MINIMAL || topicMode_ == TopicMode.MIXED) {
// Serialize a minimal version of the object that can be used by impalads
// that are running in 'local-catalog' mode. This is used by those impalads
// to invalidate their local cache.
TCatalogObject minimalObject = getMinimalObjectForV2(obj);
if (minimalObject != null) {
byte[] data = serializer.serialize(minimalObject);
String v2Key = CatalogServiceConstants.CATALOG_TOPIC_V2_PREFIX + key;
if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v2Key,
obj.catalog_version, data, delete)) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v2Key + ", delete="
+ delete + ", data_size=" + data.length);
private TCatalogObject getMinimalObjectForV2(TCatalogObject obj) {
Preconditions.checkState(topicMode_ == TopicMode.MINIMAL ||
topicMode_ == TopicMode.MIXED);
TCatalogObject min = new TCatalogObject(obj.type, obj.catalog_version);
switch (obj.type) {
min.setDb(new TDatabase(obj.db.db_name));
case TABLE:
case VIEW:
min.setTable(new TTable(obj.table.db_name, obj.table.tbl_name));
// Sending the top-level catalog version is important for implementing SYNC_DDL.
// This also allows impalads to detect a catalogd restart and invalidate the
// whole catalog.
// TODO(todd) ensure that the impalad does this invalidation as required.
return obj;
// The caching of this data on the impalad side is somewhat complex
// and this code is also under some churn at the moment. So, we'll just publish
// the full information rather than doing fetch-on-demand.
return obj;
TFunction fnObject = new TFunction(obj.fn.getName());
// IMPALA-8486: add the hdfs location so coordinators can mark their libCache
// entry for this function to be stale.
if (obj.fn.hdfs_location != null) fnObject.setHdfs_location(obj.fn.hdfs_location);
// These are currently not cached by v2 impalad.
// TODO(todd): handle these items.
return null;
throw new AssertionError("Unexpected catalog object type: " + obj.type);
return min;
* Identifies the catalog objects that were added/modified/deleted in the catalog with
* versions > 'fromVersion'. It operates on a snaphsot of the catalog without holding
* the catalog lock which means that other concurrent metadata operations can still make
* progress while the catalog delta is computed. An entry in the topic update log is
* added for every catalog object that is included in the catalog delta. The log is
* examined by operations using SYNC_DDL to determine which topic update covers the
* result set of metadata operation. Once the catalog delta is computed, the entries in
* the delete log with versions less than 'fromVersion' are garbage collected.
* The catalog delta is passed to the backend by calling NativeAddPendingTopicItem().
public long getCatalogDelta(long nativeCatalogServerPtr, long fromVersion) throws
TException {
GetCatalogDeltaContext ctx = new GetCatalogDeltaContext(nativeCatalogServerPtr,
fromVersion, getCatalogVersion());
for (Db db: getAllDbs()) {
addDatabaseToCatalogDelta(db, ctx);
for (DataSource dataSource: getAllDataSources()) {
addDataSourceToCatalogDelta(dataSource, ctx);
for (HdfsCachePool cachePool: getAllHdfsCachePools()) {
addHdfsCachePoolToCatalogDelta(cachePool, ctx);
for (Role role: getAllRoles()) {
addPrincipalToCatalogDelta(role, ctx);
for (User user: getAllUsers()) {
addPrincipalToCatalogDelta(user, ctx);
for (AuthzCacheInvalidation authzCacheInvalidation: getAllAuthzCacheInvalidation()) {
addAuthzCacheInvalidationToCatalogDelta(authzCacheInvalidation, ctx);
// Identify the catalog objects that were removed from the catalog for which their
// versions are in range ('ctx.fromVersion', 'ctx.toVersion']. We need to make sure
// that we don't include "deleted" objects that were re-added to the catalog.
for (TCatalogObject removedObject:
getDeletedObjects(ctx.fromVersion, ctx.toVersion)) {
if (!ctx.updatedCatalogObjects.contains(
Catalog.toCatalogObjectKey(removedObject))) {
ctx.addCatalogObject(removedObject, true);
// Each topic update should contain a single "TCatalog" object which is used to
// pass overall state on the catalog, such as the current version and the
// catalog service id. By setting the catalog version to the latest catalog
// version at this point, it ensures impalads will always bump their versions,
// even in the case where an object has been dropped.
TCatalogObject catalog =
new TCatalogObject(TCatalogObjectType.CATALOG, ctx.toVersion);
catalog.setCatalog(new TCatalog(catalogServiceId_));
ctx.addCatalogObject(catalog, false);
// Garbage collect the delete and topic update log.
// Notify any operation that is waiting on the next topic update.
synchronized (topicUpdateLog_) {
return ctx.toVersion;
* Gets the list of versions for in-flight events for the given table. Applicable
* only when external event processing is enabled.
* @param dbName database name
* @param tblName table name
* @return List of previous version numbers for in-flight events on this table.
* If table is not laoded returns a empty list. If event processing is disabled,
* returns a empty list
public List<Long> getInFlightVersionsForEvents(String dbName, String tblName)
throws DatabaseNotFoundException, TableNotFoundException {
"Event processing should be enabled before calling this method");
List<Long> result = Collections.EMPTY_LIST;
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException(
String.format("Database %s not found", dbName));
if (tblName == null) {
return db.getVersionsForInflightEvents();
Table tbl = getTable(dbName, tblName);
if (tbl == null) {
throw new TableNotFoundException(
String.format("Table %s not found", new TableName(dbName, tblName)));
if (tbl instanceof IncompleteTable) return result;
return tbl.getVersionsForInflightEvents();
} finally {
* Removes a given version number from the catalog database/table's list of versions
* for in-flight events.
* If tblName is null, removes version number from database.
* If tblName not null and table is not incomplete, removes version number from table
* Applicable only when external event processing is enabled.
* @param dbName database name
* @param tblName table name
public void removeFromInFlightVersionsForEvents(String dbName, String tblName,
long versionNumber) throws DatabaseNotFoundException, TableNotFoundException {
"Event processing should be enabled when calling this method");
try {
Db db = getDb(dbName);
if (db == null) return;
if (tblName == null) {
Table tbl = getTable(dbName, tblName);
if (tbl == null) {
throw new TableNotFoundException(
String.format("Table %s not found", new TableName(dbName, tblName)));
if (tbl instanceof IncompleteTable) return;
} finally {
* Adds a given version number from the catalog table's list of versions for in-flight
* events. Applicable only when external event processing is enabled.
* @param tbl Catalog table
* @param versionNumber version number to be added
public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
if (!isExternalEventProcessingEnabled()) return;
try {
if (tbl instanceof IncompleteTable) return;
} finally {
* Adds a given version number from the catalog database's list of versions for
* in-flight events. Applicable only when external event processing is enabled.
* @param db Catalog database
* @param versionNumber version number to be added
public void addVersionsForInflightEvents(Db db, long versionNumber) {
if (!isExternalEventProcessingEnabled()) return;
try {
} finally {
* Get a snapshot view of all the catalog objects that were deleted between versions
* ('fromVersion', 'toVersion'].
private List<TCatalogObject> getDeletedObjects(long fromVersion, long toVersion) {
try {
return deleteLog_.retrieveObjects(fromVersion, toVersion);
} finally {
* Get a snapshot view of all the databases in the catalog.
List<Db> getAllDbs() {
try {
return ImmutableList.copyOf(dbCache_.get().values());
} finally {
* Get a snapshot view of all the data sources in the catalog.
private List<DataSource> getAllDataSources() {
try {
return ImmutableList.copyOf(getDataSources());
} finally {
* Get a snapshot view of all the Hdfs cache pools in the catalog.
private List<HdfsCachePool> getAllHdfsCachePools() {
try {
return ImmutableList.copyOf(hdfsCachePools_);
} finally {
* Get a snapshot view of all the roles in the catalog.
private List<Role> getAllRoles() {
try {
return ImmutableList.copyOf(authPolicy_.getAllRoles());
} finally {
* Get a snapshot view of all the users in the catalog.
private List<User> getAllUsers() {
try {
return ImmutableList.copyOf(authPolicy_.getAllUsers());
} finally {
* Get a snapshot view of all authz cache invalidation markers in the catalog.
private List<AuthzCacheInvalidation> getAllAuthzCacheInvalidation() {
try {
return ImmutableList.copyOf(authzCacheInvalidation_);
} finally {
* Adds a database in the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the tables and
* functions of this database to determine if they can be included in the topic update.
private void addDatabaseToCatalogDelta(Db db, GetCatalogDeltaContext ctx)
throws TException {
long dbVersion = db.getCatalogVersion();
if (dbVersion > ctx.fromVersion && dbVersion <= ctx.toVersion) {
TCatalogObject catalogDb =
new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion);
ctx.addCatalogObject(catalogDb, false);
for (Table tbl: getAllTables(db)) {
addTableToCatalogDelta(tbl, ctx);
for (Function fn: getAllFunctions(db)) {
addFunctionToCatalogDelta(fn, ctx);
* Get a snapshot view of all the tables in a database.
List<Table> getAllTables(Db db) {
try {
return ImmutableList.copyOf(db.getTables());
} finally {
* Get a snapshot view of all the functions in a database.
private List<Function> getAllFunctions(Db db) {
try {
return ImmutableList.copyOf(db.getFunctions(null, new PatternMatcher()));
} finally {
* Given a database name and a property key returns the value of the key from the
* parameters map of the HMS db object
* @param dbName name of the database
* @param propertyKey property key
* @return value of key from the db parameter. returns null if Db is not found or key
* does not exist in the parameters
public String getDbProperty(String dbName, String propertyKey) {
try {
Db db = getDb(dbName);
if (db == null) return null;
if (!db.getMetaStoreDb().isSetParameters()) return null;
return db.getMetaStoreDb().getParameters().get(propertyKey);
} finally {
* Given a dbname, table name and a key returns the value of the key from the cached
* Table object's parameters
* @return Value of the parameter which maps to property key, null if the table
* doesn't exist, if it is a incomplete table or if the parameter is not found
public List<String> getTableProperties(
String dbName, String tblName, List<String> propertyKeys) {
try {
Db db = getDb(dbName);
if (db == null) return null;
Table tbl = db.getTable(tblName);
if (tbl == null || tbl instanceof IncompleteTable) return null;
if (!tbl.getMetaStoreTable().isSetParameters()) return null;
List<String> propertyValues = new ArrayList<>(propertyKeys.size());
for (String propertyKey : propertyKeys) {
return propertyValues;
} finally {
* Updates the Db with the given metastore database object. Useful to doing in-place
* updates to the HMS db like in case of changing owner, adding comment or setting
* certain properties
* @param msDb The HMS database object to be used to update
* @return The updated Db object
* @throws DatabaseNotFoundException if Db with the name provided by given Database
* is not found in Catalog
public Db updateDb(Database msDb) throws DatabaseNotFoundException {
try {
Db db = getDb(msDb.getName());
if (db == null) {
throw new DatabaseNotFoundException("Database " + msDb.getName() + " not found");
db.setMetastoreDb(msDb.getName(), msDb);
return db;
} finally {
* Adds a table in the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. If the table's version is larger than
* 'ctx.toVersion' and the table has skipped a topic update
* 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is included in the topic update. This
* prevents tables that are updated frequently from skipping topic updates indefinitely,
* which would also violate the semantics of SYNC_DDL.
private void addTableToCatalogDelta(Table tbl, GetCatalogDeltaContext ctx)
throws TException {
if (tbl.getCatalogVersion() <= ctx.toVersion) {
addTableToCatalogDeltaHelper(tbl, ctx);
} else {
TopicUpdateLog.Entry topicUpdateEntry =
if (topicUpdateEntry.getNumSkippedTopicUpdates() == MAX_NUM_SKIPPED_TOPIC_UPDATES) {
addTableToCatalogDeltaHelper(tbl, ctx);
} else {"Table " + tbl.getFullName() + " is skipping topic update " +
new TopicUpdateLog.Entry(
topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
* Helper function that tries to add a table in a topic update. It acquires table's
* lock and checks if its version is in the ('ctx.fromVersion', 'ctx.toVersion'] range
* and how many consecutive times (if any) has the table skipped a topic update.
private void addTableToCatalogDeltaHelper(Table tbl, GetCatalogDeltaContext ctx)
throws TException {
TCatalogObject catalogTbl =
new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
try {
long tblVersion = tbl.getCatalogVersion();
if (tblVersion <= ctx.fromVersion) return;
String tableUniqueName = tbl.getUniqueName();
TopicUpdateLog.Entry topicUpdateEntry =
if (tblVersion > ctx.toVersion &&
topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) {"Table " + tbl.getFullName() + " is skipping topic update " +
new TopicUpdateLog.Entry(
topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
try {
} catch (Exception e) {
LOG.error(String.format("Error calling toThrift() on table %s: %s",
tbl.getFullName(), e.getMessage()), e);
ctx.addCatalogObject(catalogTbl, false);
} finally {
* Adds a function to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
private void addFunctionToCatalogDelta(Function fn, GetCatalogDeltaContext ctx)
throws TException {
long fnVersion = fn.getCatalogVersion();
if (fnVersion <= ctx.fromVersion || fnVersion > ctx.toVersion) return;
TCatalogObject function =
new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion);
ctx.addCatalogObject(function, false);
* Adds a data source to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
private void addDataSourceToCatalogDelta(DataSource dataSource,
GetCatalogDeltaContext ctx) throws TException {
long dsVersion = dataSource.getCatalogVersion();
if (dsVersion <= ctx.fromVersion || dsVersion > ctx.toVersion) return;
TCatalogObject catalogObj =
new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion);
ctx.addCatalogObject(catalogObj, false);
* Adds a HDFS cache pool to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool,
GetCatalogDeltaContext ctx) throws TException {
long cpVersion = cachePool.getCatalogVersion();
if (cpVersion <= ctx.fromVersion || cpVersion > ctx.toVersion) {
TCatalogObject pool =
new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion);
ctx.addCatalogObject(pool, false);
* Adds a principal to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the privileges of
* this principal to determine if they can be inserted in the topic update.
private void addPrincipalToCatalogDelta(Principal principal, GetCatalogDeltaContext ctx)
throws TException {
long principalVersion = principal.getCatalogVersion();
if (principalVersion > ctx.fromVersion && principalVersion <= ctx.toVersion) {
TCatalogObject thriftPrincipal =
new TCatalogObject(TCatalogObjectType.PRINCIPAL, principalVersion);
ctx.addCatalogObject(thriftPrincipal, false);
for (PrincipalPrivilege p: getAllPrivileges(principal)) {
addPrincipalPrivilegeToCatalogDelta(p, ctx);
* Get a snapshot view of all the privileges in a principal.
private List<PrincipalPrivilege> getAllPrivileges(Principal principal) {
try {
return ImmutableList.copyOf(principal.getPrivileges());
} finally {
* Adds a principal privilege to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
private void addPrincipalPrivilegeToCatalogDelta(PrincipalPrivilege priv,
GetCatalogDeltaContext ctx) throws TException {
long privVersion = priv.getCatalogVersion();
if (privVersion <= ctx.fromVersion || privVersion > ctx.toVersion) return;
TCatalogObject privilege =
new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion);
ctx.addCatalogObject(privilege, false);
* Adds an authz cache invalidation to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
private void addAuthzCacheInvalidationToCatalogDelta(
AuthzCacheInvalidation authzCacheInvalidation, GetCatalogDeltaContext ctx)
throws TException {
long authzCacheInvalidationVersion = authzCacheInvalidation.getCatalogVersion();
if (authzCacheInvalidationVersion <= ctx.fromVersion ||
authzCacheInvalidationVersion > ctx.toVersion) return;
TCatalogObject catalogObj = new TCatalogObject(
TCatalogObjectType.AUTHZ_CACHE_INVALIDATION, authzCacheInvalidationVersion);
ctx.addCatalogObject(catalogObj, false);
* Returns all user defined functions (aggregate and scalar) in the specified database.
* Functions are not returned in a defined order.
public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
// Contains map of overloaded function names to all functions matching that name.
Map<String, List<Function>> dbFns = db.getAllFunctions();
List<Function> fns = new ArrayList<>(dbFns.size());
for (List<Function> fnOverloads: dbFns.values()) {
for (Function fn: fnOverloads) {
return fns;
* Extracts Impala functions stored in metastore db parameters and adds them to
* the catalog cache.
private void loadFunctionsFromDbParams(Db db,
org.apache.hadoop.hive.metastore.api.Database msDb) {
if (msDb == null || msDb.getParameters() == null) return;"Loading native functions for database: " + db.getName());
List<Function> funcs = FunctionUtils.deserializeNativeFunctionsFromDbParams(
for (Function f : funcs) {
db.addFunction(f, false);
}"Loaded native functions for database: " + db.getName());
* Loads Java functions into the catalog. For each function in "functions",
* we extract all Impala compatible evaluate() signatures and load them
* as separate functions in the catalog.
private void loadJavaFunctions(Db db,
List<org.apache.hadoop.hive.metastore.api.Function> functions) {
if (BackendConfig.INSTANCE.disableCatalogDataOpsDebugOnly()) {"Skip loading Java functions: catalog data ops disabled.");
}"Loading Java functions for database: " + db.getName());
for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
try {
List<Function> fns = FunctionUtils.extractFunctions(db.getName(), function,
for (Function fn: fns) {
} catch (Exception e) {
LOG.error("Skipping function load: " + function.getFunctionName(), e);
}"Loaded Java functions for database: " + db.getName());
* Reloads function metadata for 'dbName' database. Populates the 'addedFuncs' list
* with functions that were added as a result of this operation. Populates the
* 'removedFuncs' list with functions that were removed.
public void refreshFunctions(MetaStoreClient msClient, String dbName,
List<TCatalogObject> addedFuncs, List<TCatalogObject> removedFuncs)
throws CatalogException {
// Create a temporary database that will contain all the functions from the HMS.
Db tmpDb;
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
new ArrayList<>();
for (String javaFn : msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
// Contains native functions in it's params map.
org.apache.hadoop.hive.metastore.api.Database msDb =
tmpDb = new Db(dbName, msDb);
// Load native UDFs into the temporary db.
loadFunctionsFromDbParams(tmpDb, msDb);
// Load Java UDFs from HMS into the temporary db.
loadJavaFunctions(tmpDb, javaFns);
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
// Load transient functions into the temporary db.
for (Function fn: db.getTransientFunctions()) tmpDb.addFunction(fn);
// Compute the removed functions and remove them from the db.
for (Map.Entry<String, List<Function>> e: db.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
if (tmpDb.getFunction(
fn, Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
// We will re-add all the functions to the db because it's possible that a
// function was dropped and a different function (for example, the binary is
// different) with the same name and signature was re-added in Hive.
for (Map.Entry<String, List<Function>> e: tmpDb.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
// We do not need to increment and acquire a new catalog version for this
// function here because this already happens when the functions are loaded
// into tmpDb.
} catch (Exception e) {
throw new CatalogException("Error refreshing functions in " + dbName + ": ", e);
* Invalidates the database 'db'. This method can have potential race
* conditions with external changes to the Hive metastore and hence any
* conflicting changes to the objects can manifest in the form of exceptions
* from the HMS calls which are appropriately handled. Returns the invalidated
* 'Db' object along with list of tables to be loaded by the TableLoadingMgr.
* Returns null if the method encounters an exception during invalidation.
private Pair<Db, List<TTableName>> invalidateDb(
MetaStoreClient msClient, String dbName, Db existingDb) {
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
new ArrayList<>();
for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
org.apache.hadoop.hive.metastore.api.Database msDb =
Db newDb = new Db(dbName, msDb);
// existingDb is usually null when the Catalog loads for the first time.
// In that case we needn't restore any transient functions.
if (existingDb != null) {
// Restore UDFs that aren't persisted. They are only cleaned up on
// Catalog restart.
for (Function fn: existingDb.getTransientFunctions()) {
// Reload native UDFs.
loadFunctionsFromDbParams(newDb, msDb);
// Reload Java UDFs from HMS.
loadJavaFunctions(newDb, javaFns);
List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
if (isBlacklistedTable(dbName, tableName.toLowerCase())) {"skip blacklisted table: " + dbName + "." + tableName);
Table incompleteTbl = IncompleteTable.createUninitializedTable(newDb, tableName);
if (loadInBackground_) {
tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
if (existingDb != null) {
// Identify any removed functions and add them to the delta log.
for (Map.Entry<String, List<Function>> e:
existingDb.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
if (newDb.getFunction(fn,
Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
// Identify any deleted tables and add them to the delta log
Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
for (String removedTableName: oldTableNames) {
Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
return Pair.create(newDb, tblsToBackgroundLoad);
} catch (Exception e) {
LOG.warn("Encountered an exception while invalidating database: " + dbName +
". Ignoring further load of this db.", e);
return null;
* Refreshes authorization metadata. When authorization is not enabled, this
* method is a no-op.
public AuthorizationDelta refreshAuthorization(boolean resetVersions)
throws CatalogException {
Preconditions.checkState(authzManager_ != null);
try {
return authzManager_.refreshAuthorization(resetVersions);
} catch (Exception e) {
throw new CatalogException("Error refreshing authorization policy: ", e);
* Resets this catalog instance by clearing all cached table and database metadata.
* Returns the current catalog version before reset has taken any effect. The
* requesting impalad will use that version to determine when the
* effects of reset have been applied to its local catalog cache.
public long reset() throws CatalogException {
long currentCatalogVersion = getCatalogVersion();"Invalidating all metadata. Version: " + currentCatalogVersion);
// First update the policy metadata.
// Even though we get the current notification event id before stopping the event
// processing here there is a small window of time where we could re-process some of
// the event ids, if there is external DDL activity on metastore during reset.
// Unfortunately, there is no good way to avoid this since HMS does not provide
// APIs which can fetch all the tables/databases at a given id. It is OKAY to
// re-process some of these events since event processor relies on creationTime of
// the objects to uniquely identify tables from create and drop events. In case of
// alter events, however it is likely that some tables would be unnecessarily
// invalidated. That would happen when during reset, there were external alter events
// and by the time we processed them, Catalog had already loaded them.
long currentEventId = metastoreEventProcessor_.getCurrentEventId();
// pause the event processing since the cache is anyways being cleared
// Update the HDFS cache pools
CachePoolReader reader = new CachePoolReader(true);;
// In case of an empty new catalog, the version should still change to reflect the
// reset operation itself and to unblock impalads by making the catalog version >
// INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
// Assign new versions to all the loaded data sources.
for (DataSource dataSource: getDataSources()) {
// Update db and table metadata
try {
// Not all Java UDFs are persisted to the metastore. The ones which aren't
// should be restored once the catalog has been invalidated.
Map<String, Db> oldDbCache = dbCache_.get();
// Build a new DB cache, populate it, and replace the existing cache in one
// step.
Map<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
try (MetaStoreClient msClient = getMetaStoreClient()) {
List<String> allDbs = msClient.getHiveClient().getAllDatabases();
int numComplete = 0;
for (String dbName: allDbs) {
if (isBlacklistedDb(dbName)) {"skip blacklisted db: " + dbName);
String annotation = String.format("invalidating metadata - %s/%s dbs complete",
numComplete++, allDbs.size());
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
dbName = dbName.toLowerCase();
Db oldDb = oldDbCache.get(dbName);
Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
dbName, oldDb);
if (invalidatedDb == null) continue;
newDbCache.put(dbName, invalidatedDb.first);
// Identify any deleted databases and add them to the delta log.
Set<String> oldDbNames = oldDbCache.keySet();
Set<String> newDbNames = newDbCache.keySet();
for (String dbName: oldDbNames) {
Db removedDb = oldDbCache.get(dbName);
// Submit tables for background loading.
for (TTableName tblName: tblsToBackgroundLoad) {
} catch (Exception e) {
LOG.error("Error initializing Catalog", e);
throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
} finally {
// restart the event processing for id just before the reset
}"Invalidated all metadata.");
return currentCatalogVersion;
* Adds a database name to the metadata cache and returns the database's
* new Db object. Used by CREATE DATABASE statements.
public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
Db newDb = new Db(dbName, msDb);
try {
return newDb;
} finally {
* Adds a database name to the metadata cache if not exists and returns the
* true is a new Db Object was added. Used by MetastoreEventProcessor to handle
public boolean addDbIfNotExists(
String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
try {
Db db = getDb(dbName);
if (db == null) {
return addDb(dbName, msDb) != null;
return false;
} finally {
* Removes a database from the metadata cache and returns the removed database,
* or null if the database did not exist in the cache.
* Used by DROP DATABASE statements.
public Db removeDb(String dbName) {
try {
Db removedDb = super.removeDb(dbName);
if (removedDb != null) updateDeleteLog(removedDb);
return removedDb;
} finally {
* @param msDb Metastore Database used to remove Db from Catalog
* @param dbFound Set to true if Database is found in Catalog
* @param dbMatched Set to true if Database is found in Catalog and it's CREATION_TIME
* is equal to the metastore DB
* @return the DB object removed. Return null if DB does not exist or was not removed
* because CREATION_TIME does not match.
public Db removeDbIfExists(org.apache.hadoop.hive.metastore.api.Database msDb,
Reference<Boolean> dbFound, Reference<Boolean> dbMatched) {
try {
String dbName = msDb.getName();
Db catalogDb = getDb(dbName);
if (catalogDb == null) return null;
// Remove the DB only if the CREATION_TIME matches with the metastore DB from event.
if (msDb.getCreateTime() == catalogDb.getMetaStoreDb().getCreateTime()) {
Db removedDb = removeDb(dbName);
if (removedDb != null) {
return removedDb;
return null;
} finally {
* Helper function to clean up the state associated with a removed database. It creates
* the entries in the delete log for 'db' as well as for its tables and functions
* (if any).
private void updateDeleteLog(Db db) {
if (!db.isSystemDb()) {
for (Table tbl: db.getTables()) {
for (Function fn: db.getFunctions(null, new PatternMatcher())) {
* Adds table with the given db and table name to the catalog if it does not exists.
* @return true if the table was successfully added and false if the table already
* exists
* @throws CatalogException if the db is not found
public boolean addTableIfNotExists(String dbName, String tblName)
throws CatalogException {
try {
Db db = getDb(dbName);
if (db == null) {
throw new CatalogException(String.format("Db %s does not exist", dbName));
Table existingTable = db.getTable(tblName);
if (existingTable != null) return false;
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
return true;
} finally {
* Adds a table with the given name to the catalog and returns the new table,
* loading the metadata if needed.
public Table addTable(String dbName, String tblName) {
Db db = getDb(dbName);
if (db == null) return null;
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
try {
} finally {
return db.getTable(tblName);
* Adds a table 'table' to the database 'db' and returns the table that was added.
public Table addTable(Db db, Table table) {
try {
} finally {
return table;
* Gets the table with the given name, loading it if needed (if the existing catalog
* object is not yet loaded). Returns the matching Table or null if no table with this
* name exists in the catalog.
* If the existing table is dropped or modified (indicated by the catalog version
* changing) while the load is in progress, the loaded value will be discarded
* and the current cached value will be returned. This may mean that a missing table
* (not yet loaded table) will be returned.
public Table getOrLoadTable(String dbName, String tblName, String reason)
throws CatalogException {
TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
TableLoadingMgr.LoadRequest loadReq;
long previousCatalogVersion;
// Return the table if it is already loaded or submit a new load request.
try {
Table tbl = getTable(dbName, tblName);
if (tbl == null || tbl.isLoaded()) return tbl;
previousCatalogVersion = tbl.getCatalogVersion();
loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
} finally {
try {
// The table may have been dropped/modified while the load was in progress, so only
// apply the update if the existing table hasn't changed.
return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
} finally {
* Replaces an existing Table with a new value if it exists and has not changed
* (has the same catalog version as 'expectedCatalogVersion').
private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
throws DatabaseNotFoundException {
try {
Db db = getDb(updatedTbl.getDb().getName());
if (db == null) {
throw new DatabaseNotFoundException(
"Database does not exist: " + updatedTbl.getDb().getName());
Table existingTbl = db.getTable(updatedTbl.getName());
// The existing table does not exist or has been modified. Instead of
// adding the loaded value, return the existing table.
if (existingTbl == null ||
existingTbl.getCatalogVersion() != expectedCatalogVersion) return existingTbl;
return updatedTbl;
} finally {
* Remove a catalog table based on the given metastore table if it exists and its
* createTime matches with the metastore table
* @param msTable Metastore table to be used to remove Table
* @param tblWasfound is set to true if the table was found in the catalog
* @param tblMatched is set to true if the table is found and it matched with the
* createTime of the cached metastore table in catalog or if the existing table is a
* incomplete table
* @return Removed table object. Return null if the table was not removed
public Table removeTableIfExists(org.apache.hadoop.hive.metastore.api.Table msTable,
Reference<Boolean> tblWasfound, Reference<Boolean> tblMatched) {
// make sure that the createTime of the input table is valid
Preconditions.checkState(msTable.getCreateTime() > 0);
try {
Db db = getDb(msTable.getDbName());
if (db == null) return null;
Table tblToBeRemoved = db.getTable(msTable.getTableName());
if (tblToBeRemoved == null) return null;
// make sure that you are removing the same instance of the table object which
// is given by comparing the metastore createTime. In case the found table is a
// Incomplete table remove it
if (tblToBeRemoved instanceof IncompleteTable
|| (msTable.getCreateTime()
== tblToBeRemoved.getMetaStoreTable().getCreateTime())) {
Table removedTbl = db.removeTable(tblToBeRemoved.getName());
return removedTbl;
return null;
} finally {
* Removes a table from the catalog and increments the catalog version.
* Returns the removed Table, or null if the table or db does not exist.
public Table removeTable(String dbName, String tblName) {
Db parentDb = getDb(dbName);
if (parentDb == null) return null;
try {
Table removedTable = parentDb.removeTable(tblName);
if (removedTable != null) {
return removedTable;
} finally {
* Removes a function from the catalog. Increments the catalog version and returns
* the Function object that was removed. If the function did not exist, null will
* be returned.
public Function removeFunction(Function desc) {
try {
Function removedFn = super.removeFunction(desc);
if (removedFn != null) {
return removedFn;
} finally {
* Adds a function from the catalog, incrementing the catalog version. Returns true if
* the add was successful, false otherwise.
public boolean addFunction(Function fn) {
Db db = getDb(fn.getFunctionName().getDb());
if (db == null) return false;
try {
if (db.addFunction(fn)) {
return true;
} finally {
return false;
* Adds a data source to the catalog, incrementing the catalog version. Returns true
* if the add was successful, false otherwise.
public boolean addDataSource(DataSource dataSource) {
try {
if (dataSources_.add(dataSource)) {
return true;
} finally {
return false;
public DataSource removeDataSource(String dataSourceName) {
try {
DataSource dataSource = dataSources_.remove(dataSourceName);
if (dataSource != null) {
return dataSource;
} finally {
* Renames a table. Equivalent to an atomic drop + add of the table. Returns
* a pair of tables containing the removed table (or null if the table drop was not
* successful) and the new table (or null if either the drop of the old one or the
* add of the new table was not successful). Depending on the return value, the catalog
* cache is in one of the following states:
* 1. null, null: Old table was not removed and new table was not added.
* 2. null, T_new: Invalid configuration
* 3. T_old, null: Old table was removed but new table was not added.
* 4. T_old, T_new: Old table was removed and new table was added.
public Pair<Table, Table> renameTable(
TTableName oldTableName, TTableName newTableName) {
// Remove the old table name from the cache and add the new table.
Db db = getDb(oldTableName.getDb_name());
if (db == null) return null;
try {
Table oldTable =
removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
if (oldTable == null) return Pair.create(null, null);
return Pair.create(oldTable,
addTable(newTableName.getDb_name(), newTableName.getTable_name()));
} finally {
* Renames the table by atomically removing oldTable and adding the newTable.
* @return true if oldTable was removed and newTable was added, false if oldTable or
* it's db are not in catalog.
public boolean renameTableIfExists(TTableName oldTableName,
TTableName newTableName) {
boolean tableRenamed = false;
try {
Db db = getDb(oldTableName.db_name);
if (db != null) {
Table existingTable = removeTable(oldTableName.db_name, oldTableName.table_name);
// Add the newTable only if oldTable existed.
if (existingTable != null) {
Table incompleteTable = IncompleteTable.createUninitializedTable(db,
tableRenamed = true;
return tableRenamed;
} finally {
* Reloads metadata for table 'tbl' which must not be an IncompleteTable. Updates the
* table metadata in-place by calling load() on the given table. Returns the
* TCatalogObject representing 'tbl'. Applies proper synchronization to protect the
* metadata load from concurrent table modifications and assigns a new catalog version.
* Throws a CatalogException if there is an error loading table metadata.
public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {"Refreshing table metadata: %s", tbl.getFullName()));
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
String tblName = tbl.getName();
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error refreshing metadata for table " +
"%s due to lock contention", tbl.getFullName()));
final Timer.Context context =
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl = null;
try {
msTbl = msClient.getHiveClient().getTable(dbName, tblName);
} catch (Exception e) {
throw new TableLoadingException("Error loading metadata for table: " +
dbName + "." + tblName, e);
tbl.load(true, msClient.getHiveClient(), msTbl, reason);
tbl.setCatalogVersion(newCatalogVersion);"Refreshed table metadata: %s", tbl.getFullName()));
return tbl.toTCatalogObject();
} finally {
* Drops the partitions specified in 'partitionSet' from 'tbl'. Throws a
* CatalogException if 'tbl' is not an HdfsTable. Returns the target table.
public Table dropPartitions(Table tbl, List<List<TPartitionKeyValue>> partitionSet)
throws CatalogException {
if (!(tbl instanceof HdfsTable)) {
throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table");
HdfsTable hdfsTable = (HdfsTable) tbl;
List<HdfsPartition> partitions =
return hdfsTable;
* Adds a partition to its HdfsTable and returns the modified table.
public Table addPartition(HdfsPartition partition) throws CatalogException {
HdfsTable hdfsTable = partition.getTable();
return hdfsTable;
* Invalidates the table in the catalog cache, potentially adding/removing the table
* from the cache based on whether it exists in the Hive Metastore.
* The invalidation logic is:
* - If the table exists in the Metastore, add it to the catalog as an uninitialized
* IncompleteTable (replacing any existing entry). The table metadata will be
* loaded lazily, on the next access. If the parent database for this table does not
* yet exist in Impala's cache it will also be added.
* - If the table does not exist in the Metastore, remove it from the catalog cache.
* - If we are unable to determine whether the table exists in the Metastore (there was
* an exception thrown making the RPC), invalidate any existing Table by replacing
* it with an uninitialized IncompleteTable.
* Returns the thrift representation of the added/updated/removed table, or null if
* the table was not present in the catalog cache or the Metastore.
* Sets tblWasRemoved to true if the table was absent from the Metastore and it was
* removed from the catalog cache.
* Sets dbWasAdded to true if both a new database and table were added to the catalog
* cache.
public TCatalogObject invalidateTable(TTableName tableName,
Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded) {
String dbName = tableName.getDb_name();
String tblName = tableName.getTable_name();
if (isBlacklistedTable(dbName, tblName)) {"Skip invalidating blacklisted table: " + tableName);
return null;
}"Invalidating table metadata: %s.%s", dbName, tblName));
// Stores whether the table exists in the metastore. Can have three states:
// 1) true - Table exists in metastore.
// 2) false - Table does not exist in metastore.
// 3) unknown (null) - There was exception thrown by the metastore client.
Boolean tableExistsInMetaStore;
Db db = null;
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Database msDb = null;
try {
tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
} catch (UnknownDBException e) {
// The parent database does not exist in the metastore. Treat this the same
// as if the table does not exist.
tableExistsInMetaStore = false;
} catch (TException e) {
LOG.error("Error executing tableExists() metastore call: " + tblName, e);
tableExistsInMetaStore = null;
if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
Table result = removeTable(dbName, tblName);
if (result == null) return null;
try {
return result.toTCatalogObject();
} finally {
db = getDb(dbName);
if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) {
// The table does not exist in our cache AND it is unknown whether the
// table exists in the Metastore. Do nothing.
return null;
} else if (db == null && tableExistsInMetaStore) {
// The table exists in the Metastore, but our cache does not contain the parent
// database. A new db will be added to the cache along with the new table. msDb
// must be valid since tableExistsInMetaStore is true.
try {
msDb = msClient.getHiveClient().getDatabase(dbName);
addDb(dbName, msDb);
} catch (TException e) {
// The Metastore database cannot be get. Log the error and return.
LOG.error("Error executing getDatabase() metastore call: " + dbName, e);
return null;
// Add a new uninitialized table to the table cache, effectively invalidating
// any existing entry. The metadata for the table will be loaded lazily, on the
// on the next access to the table.
Table newTable = addTable(dbName, tblName);
if (loadInBackground_) {
tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
if (dbWasAdded.getRef()) {
// The database should always have a lower catalog version than the table because
// it needs to be created before the table can be added.
Db addedDb = newTable.getDb();
addedDb.getCatalogVersion() < newTable.getCatalogVersion());
return newTable.toTCatalogObject();
* Invalidate the table if it exists by overwriting existing entry by a Incomplete
* Table.
* @return null if the table does not exist else return the invalidated table
public Table invalidateTableIfExists(String dbName, String tblName) {
Table incompleteTable;
try {
Db db = getDb(dbName);
if (db == null) return null;
if (!db.containsTable(tblName)) return null;
incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
} finally {
if (loadInBackground_) {
new TTableName(dbName.toLowerCase(), tblName.toLowerCase()));
return incompleteTable;
* Refresh partition if it exists. Returns true if reload of the partition succeeds,
* false otherwise.
* @throws CatalogException if partition reload is unsuccessful.
* @throws DatabaseNotFoundException if Db doesn't exist.
public boolean reloadPartitionIfExists(String dbName, String tblName,
List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
Table table = getTable(dbName, tblName);
if (table == null || table instanceof IncompleteTable) return false;
reloadPartition(table, tPartSpec, reason);
return true;
* Refresh table if exists. Returns true if reloadTable() succeeds, false
* otherwise. Throws CatalogException if reloadTable() is unsuccessful. Throws
* DatabaseNotFoundException if Db doesn't exist.
public boolean reloadTableIfExists(String dbName, String tblName, String reason)
throws CatalogException {
Table table = getTable(dbName, tblName);
if (table == null || table instanceof IncompleteTable) return false;
reloadTable(table, reason);
return true;
* Update DB if it exists in catalog. Returns true if updateDb() succeeds, false
* otherwise.
public boolean updateDbIfExists(Database msdb) {
try {
} catch (DatabaseNotFoundException e) {
return false;
return true;
* Adds a new role with the given name and grant groups to the AuthorizationPolicy.
* If a role with the same name already exists it will be overwritten.
public Role addRole(String roleName, Set<String> grantGroups) {
Principal role = addPrincipal(roleName, grantGroups, TPrincipalType.ROLE);
Preconditions.checkState(role instanceof Role);
return (Role) role;
* Adds a new user with the given name to the AuthorizationPolicy.
* If a user with the same name already exists it will be overwritten.
public User addUser(String userName) {
Principal user = addPrincipal(userName, new HashSet<>(),
Preconditions.checkState(user instanceof User);
return (User) user;
* Add a user to the catalog if it doesn't exist. This is necessary so privileges
* can be added for a user. example: owner privileges.
public User addUserIfNotExists(String owner, Reference<Boolean> existingUser) {
try {
User user = getAuthPolicy().getUser(owner);
if (user == null) {
user = addUser(owner);
return user;
} finally {
private Principal addPrincipal(String principalName, Set<String> grantGroups,
TPrincipalType type) {
try {
Principal principal = Principal.newInstance(principalName, type, grantGroups);
return principal;
} finally {
* Removes the role with the given name from the AuthorizationPolicy. Returns the
* removed role with an incremented catalog version, or null if no role with this name
* exists.
public Role removeRole(String roleName) {
Principal role = removePrincipal(roleName, TPrincipalType.ROLE);
if (role == null) return null;
Preconditions.checkState(role instanceof Role);
return (Role) role;
* Removes the user with the given name from the AuthorizationPolicy. Returns the
* removed user with an incremented catalog version, or null if no user with this name
* exists.
public User removeUser(String userName) {
Principal user = removePrincipal(userName, TPrincipalType.USER);
if (user == null) return null;
Preconditions.checkState(user instanceof User);
return (User) user;
private Principal removePrincipal(String principalName, TPrincipalType type) {
try {
Principal principal = authPolicy_.removePrincipal(principalName, type);
// TODO(todd): does this end up leaking the privileges associated
// with this principal into the CatalogObjectVersionSet on the catalogd?
if (principal == null) return null;
for (PrincipalPrivilege priv: principal.getPrivileges()) {
return principal;
} finally {
* Adds a grant group to the given role name and returns the modified Role with
* an updated catalog version. If the role does not exist a CatalogException is thrown.
public Role addRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
try {
Role role = authPolicy_.addRoleGrantGroup(roleName, groupName);
return role;
} finally {
* Removes a grant group from the given role name and returns the modified Role with
* an updated catalog version. If the role does not exist a CatalogException is thrown.
public Role removeRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
try {
Role role = authPolicy_.removeRoleGrantGroup(roleName, groupName);
return role;
} finally {
* Adds a privilege to the given role name. Returns the new PrincipalPrivilege and
* increments the catalog version. If the parent role does not exist a CatalogException
* is thrown.
public PrincipalPrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
throws CatalogException {
Preconditions.checkArgument(thriftPriv.getPrincipal_type() == TPrincipalType.ROLE);
return addPrincipalPrivilege(roleName, thriftPriv, TPrincipalType.ROLE);
* Adds a privilege to the given user name. Returns the new PrincipalPrivilege and
* increments the catalog version. If the user does not exist a CatalogException is
* thrown.
public PrincipalPrivilege addUserPrivilege(String userName, TPrivilege thriftPriv)
throws CatalogException {
Preconditions.checkArgument(thriftPriv.getPrincipal_type() == TPrincipalType.USER);
return addPrincipalPrivilege(userName, thriftPriv, TPrincipalType.USER);
private PrincipalPrivilege addPrincipalPrivilege(String principalName,
TPrivilege thriftPriv, TPrincipalType type) throws CatalogException {
try {
Principal principal = authPolicy_.getPrincipal(principalName, type);
if (principal == null) {
throw new CatalogException(String.format("%s does not exist: %s",
Principal.toString(type), principalName));
PrincipalPrivilege priv = PrincipalPrivilege.fromThrift(thriftPriv);
return priv;
} finally {
* Removes a PrincipalPrivilege from the given role name and privilege name. Returns
* the removed PrincipalPrivilege with an incremented catalog version or null if no
* matching privilege was found. Throws a CatalogException if no role exists with this
* name.
public PrincipalPrivilege removeRolePrivilege(String roleName, String privilegeName)
throws CatalogException {
return removePrincipalPrivilege(roleName, privilegeName, TPrincipalType.ROLE);
* Removes a PrincipalPrivilege from the given user name and privilege name. Returns
* the removed PrincipalPrivilege with an incremented catalog version or null if no
* matching privilege was found. Throws a CatalogException if no user exists with this
* name.
public PrincipalPrivilege removeUserPrivilege(String userName, String privilegeName)
throws CatalogException {
return removePrincipalPrivilege(userName, privilegeName, TPrincipalType.USER);
private PrincipalPrivilege removePrincipalPrivilege(String principalName,
String privilegeName, TPrincipalType type) throws CatalogException {
try {
Principal principal = authPolicy_.getPrincipal(principalName, type);
if (principal == null) {
throw new CatalogException(String.format("%s does not exist: %s",
Principal.toString(type), principalName));
PrincipalPrivilege principalPrivilege = principal.removePrivilege(privilegeName);
if (principalPrivilege == null) return null;
return principalPrivilege;
} finally {
* Gets a PrincipalPrivilege from the given principal name. Returns the privilege
* if it exists, or null if no privilege matching the privilege spec exist.
* Throws a CatalogException if the principal does not exist.
public PrincipalPrivilege getPrincipalPrivilege(String principalName,
TPrivilege privSpec) throws CatalogException {
String privilegeName = PrincipalPrivilege.buildPrivilegeName(privSpec);
try {
Principal principal = authPolicy_.getPrincipal(principalName,
if (principal == null) {
throw new CatalogException(Principal.toString(privSpec.getPrincipal_type()) +
" does not exist: " + principalName);
return principal.getPrivilege(privilegeName);
} finally {
public AuthzCacheInvalidation getAuthzCacheInvalidation(String markerName) {
try {
return authzCacheInvalidation_.get(markerName);
} finally {
* Gets the {@link AuthzCacheInvalidation} for a given marker name or creates a new
* {@link AuthzCacheInvalidation} if it does not exist and increment the catalog
* version of {@link AuthzCacheInvalidation}. A catalog version update indicates a
* an authorization cache invalidation notification.
* @param markerName the authorization cache invalidation marker name
* @return the updated {@link AuthzCacheInvalidation} instance
public AuthzCacheInvalidation incrementAuthzCacheInvalidationVersion(
String markerName) {
try {
AuthzCacheInvalidation authzCacheInvalidation = getAuthzCacheInvalidation(
if (authzCacheInvalidation == null) {
authzCacheInvalidation = new AuthzCacheInvalidation(markerName);
return authzCacheInvalidation;
} finally {
* Increments the current Catalog version and returns the new value.
public long incrementAndGetCatalogVersion() {
try {
return ++catalogVersion_;
} finally {
* Returns the current Catalog version.
public long getCatalogVersion() {
try {
return catalogVersion_;
} finally {
public ReentrantReadWriteLock getLock() { return versionLock_; }
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
* Reloads metadata for the partition defined by the partition spec
* 'partitionSpec' in table 'tbl'. Returns the resulting table's TCatalogObject after
* the partition metadata was reloaded.
public TCatalogObject reloadPartition(Table tbl,
List<TPartitionKeyValue> partitionSpec, String reason) throws CatalogException {
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error reloading partition of table %s " +
"due to lock contention", tbl.getFullName()));
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
HdfsTable hdfsTable = (HdfsTable) tbl;
HdfsPartition hdfsPartition = hdfsTable
// Retrieve partition name from existing partition or construct it from
// the partition spec
String partitionName = hdfsPartition == null
? HdfsTable.constructPartitionName(partitionSpec)
: hdfsPartition.getPartitionName();"Refreshing partition metadata: %s %s (%s)",
hdfsTable.getFullName(), partitionName, reason));
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
try {
hmsPartition = msClient.getHiveClient().getPartition(
hdfsTable.getDb().getName(), hdfsTable.getName(), partitionName);
} catch (NoSuchObjectException e) {
// If partition does not exist in Hive Metastore, remove it from the
// catalog
if (hdfsPartition != null) {
return hdfsTable.toTCatalogObject();
} catch (Exception e) {
throw new CatalogException("Error loading metadata for partition: "
+ hdfsTable.getFullName() + " " + partitionName, e);
hdfsTable.reloadPartition(hdfsPartition, hmsPartition);
hdfsTable.setCatalogVersion(newCatalogVersion);"Refreshed partition metadata: %s %s",
hdfsTable.getFullName(), partitionName));
return hdfsTable.toTCatalogObject();
} finally {
public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
* Returns the version of the topic update that an operation using SYNC_DDL must wait
* for in order to ensure that its result set ('result') has been broadcast to all the
* coordinators. For operations that don't produce a result set, e.g. INVALIDATE
* METADATA, return the version specified in 'result.version'.
public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
if (!result.isSetUpdated_catalog_objects() &&
!result.isSetRemoved_catalog_objects()) {
return result.getVersion();
long lastSentTopicUpdate = lastSentTopicUpdate_.get();
// Maximum number of attempts (topic updates) to find the catalog topic version that
// an operation using SYNC_DDL must wait for.
long maxNumAttempts = 5;
if (result.isSetUpdated_catalog_objects()) {
maxNumAttempts = Math.max(maxNumAttempts,
result.getUpdated_catalog_objects().size() *
long numAttempts = 0;
long begin = System.currentTimeMillis();
long versionToWaitFor = -1;
while (versionToWaitFor == -1) {
if (LOG.isTraceEnabled()) {
LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
// Examine the topic update log to determine the latest topic update that
// covers the added/modified/deleted objects in 'result'.
long topicVersionForUpdates =
long topicVersionForDeletes =
if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
// Wait for the next topic update.
synchronized(topicUpdateLog_) {
try {
} catch (InterruptedException e) {
// Ignore
long currentTopicUpdate = lastSentTopicUpdate_.get();
// Don't count time-based exits from the wait() toward the maxNumAttempts
// threshold.
if (lastSentTopicUpdate != currentTopicUpdate) {
if (numAttempts > maxNumAttempts) {
LOG.error(String.format("Couldn't retrieve the covering topic version for "
+ "catalog objects. Updated objects: %s, deleted objects: %s",
throw new CatalogException("Couldn't retrieve the catalog topic version " +
"for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
"The operation has been successfully executed but its effects may have " +
"not been broadcast to all the coordinators.");
lastSentTopicUpdate = currentTopicUpdate;
} else {
versionToWaitFor = Math.max(topicVersionForDeletes, topicVersionForUpdates);
Preconditions.checkState(versionToWaitFor >= 0);"Operation using SYNC_DDL is waiting for catalog topic version: " +
versionToWaitFor + ". Time to identify topic version (msec): " +
(System.currentTimeMillis() - begin));
return versionToWaitFor;
* Returns the version of the topic update that covers a set of TCatalogObjects.
* A topic update U covers a TCatalogObject T, corresponding to a catalog object O,
* if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >=
* last_topic_update(O). The first condition indicates that a version of O that is
* larger or equal to the version in T has been added to a topic update. The second
* condition indicates that U is either the update to include O or an update following
* the one to include O. Returns -1 if there is a catalog object in 'tCatalogObjects'
* which doesn't satisfy the above conditions.
private long getCoveringTopicUpdateVersion(List<TCatalogObject> tCatalogObjects) {
if (tCatalogObjects == null || tCatalogObjects.isEmpty()) {
return lastSentTopicUpdate_.get();
long versionToWaitFor = -1;
for (TCatalogObject tCatalogObject: tCatalogObjects) {
TopicUpdateLog.Entry topicUpdateEntry =
// There are two reasons for which a topic update log entry cannot be found:
// a) It corresponds to a new catalog object that hasn't been processed by a catalog
// update yet.
// b) It corresponds to a catalog object that hasn't been modified for at least
// TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
// collected.
// In both cases, -1 is returned to indicate that we're waiting for the
// entry to show up in the topic update log.
if (topicUpdateEntry == null ||
topicUpdateEntry.getLastSentVersion() < tCatalogObject.getCatalog_version()) {
return -1;
versionToWaitFor =
Math.max(versionToWaitFor, topicUpdateEntry.getLastSentCatalogUpdate());
return versionToWaitFor;
* Retrieves information about the current catalog usage including the most frequently
* accessed tables as well as the tables with the highest memory requirements.
public TGetCatalogUsageResponse getCatalogUsage() {
TGetCatalogUsageResponse usage = new TGetCatalogUsageResponse();
usage.setLarge_tables(new ArrayList<>());
usage.setFrequently_accessed_tables(new ArrayList<>());
usage.setHigh_file_count_tables(new ArrayList<>());
for (Table largeTable: CatalogUsageMonitor.INSTANCE.getLargestTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(largeTable.getTableName().toThrift());
for (Table frequentTable:
CatalogUsageMonitor.INSTANCE.getFrequentlyAccessedTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(frequentTable.getTableName().toThrift());
for (Table mostFilesTable:
CatalogUsageMonitor.INSTANCE.getHighFileCountTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(mostFilesTable.getTableName().toThrift());
return usage;
* Gets the events processor metrics. Used for publishing metrics on the webUI
public TEventProcessorMetrics getEventProcessorMetrics() {
return metastoreEventProcessor_.getEventProcessorMetrics();
* Gets the events processor summary. Used for populating the contents of the events
* processor detailed view page
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
return metastoreEventProcessor_.getEventProcessorSummary();
* Retrieves the stored metrics of the specified table and returns a pretty-printed
* string representation. Throws an exception if table metrics were not available
* because the table was not loaded or because another concurrent operation was holding
* the table lock.
public String getTableMetrics(TTableName tTableName) throws CatalogException {
String dbName = tTableName.db_name;
String tblName = tTableName.table_name;
Table tbl = getTable(dbName, tblName);
if (tbl == null) {
throw new CatalogException("Table " + dbName + "." + tblName + " was not found.");
String result;
if (tbl instanceof IncompleteTable) {
result = "No metrics available for table " + dbName + "." + tblName +
". Table not yet loaded.";
return result;
if (!tbl.getLock().tryLock()) {
result = "Metrics for table " + dbName + "." + tblName + "are not available " +
"because the table is currently modified by another operation.";
return result;
try {
return tbl.getMetrics().toString();
} finally {
* A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
* invocations.
public TGetPartialCatalogObjectResponse getPartialCatalogObject(
TGetPartialCatalogObjectRequest req) throws CatalogException {
try {
if (!partialObjectFetchAccess_.tryAcquire(1,
// Timed out trying to acquire the semaphore permit.
throw new CatalogException("Timed out while fetching partial object metadata. " +
"Please check the metric 'catalog.partial-fetch-rpc.queue-len' for the " +
"current queue length and consider increasing " +
"'catalog_partial_fetch_rpc_queue_timeout_s' and/or " +
// Acquired the permit at this point, should be released before we exit out of
// this method.
// Is there a chance that this thread can get interrupted at this point before it
// enters the try block, eventually leading to the semaphore permit not
// getting released? It can probably happen if the JVM is already in a bad shape.
// In the worst case, every permit is blocked and the subsequent requests throw
// the timeout exception and the user can monitor the queue metric to see that it
// is full, so the issue should be easy to diagnose.
// TODO: Figure out if such a race is possible.
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
"Get Partial Catalog Object - " +
Catalog.toCatalogObjectKey(req.object_desc))) {
return doGetPartialCatalogObject(req);
} finally {
} catch (InterruptedException e) {
throw new CatalogException("Error running getPartialCatalogObject(): ", e);
* Gets the id for this catalog service
public String getCatalogServiceId() {
return TUniqueIdUtil.PrintId(catalogServiceId_).intern();
* Returns the number of currently running partial RPCs.
public int getConcurrentPartialRpcReqCount() {
// Calculated based on number of currently available semaphore permits.
return MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT - partialObjectFetchAccess_
* Return a partial view of information about a given catalog object. This services
* the CatalogdMetaProvider running on impalads when they are configured in
* "local-catalog" mode. If required objects are not present, for example, the database
* from which a table is requested, the types of the missing objects will be set in the
* response's lookup_status.
private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
TGetPartialCatalogObjectRequest req) throws CatalogException {
TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
"missing object_desc");
switch (objectDesc.type) {
return getPartialCatalogInfo(req);
TDatabase dbDesc = Preconditions.checkNotNull(req.object_desc.db);
try {
Db db = getDb(dbDesc.getDb_name());
if (db == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
return db.getPartialInfo(req);
} finally {
case TABLE:
case VIEW: {
Table table;
try {
table = getOrLoadTable(
objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
"needed by coordinator");
} catch (DatabaseNotFoundException e) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
if (table == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND);
} else if (!table.isLoaded()) {
// Table can still remain in an incomplete state if there was a concurrent
// invalidate request.
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
// TODO(todd): consider a read-write lock here.
try {
return table.getPartialInfo(req);
} finally {
case FUNCTION: {
try {
Db db = getDb(;
if (db == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
List<Function> funcs = db.getFunctions(;
if (funcs.isEmpty()) {
return createGetPartialCatalogObjectError(
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
List<TFunction> thriftFuncs = Lists.newArrayListWithCapacity(funcs.size());
for (Function f : funcs) thriftFuncs.add(f.toThrift());
return resp;
} finally {
throw new CatalogException("Unable to fetch partial info for type: " +
private static TGetPartialCatalogObjectResponse createGetPartialCatalogObjectError(
CatalogLookupStatus status) {
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
return resp;
* Return a partial view of information about global parts of the catalog (eg
* the list of tables, etc).
private TGetPartialCatalogObjectResponse getPartialCatalogInfo(
TGetPartialCatalogObjectRequest req) {
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
resp.catalog_info = new TPartialCatalogInfo();
TCatalogInfoSelector sel = Preconditions.checkNotNull(req.catalog_info_selector,
"no catalog_info_selector in request");
if (sel.want_db_names) {
resp.catalog_info.db_names = ImmutableList.copyOf(dbCache_.get().keySet());
// TODO(todd) implement data sources and other global information.
return resp;
* Set the last used time of specified tables to now.
* TODO: Make use of TTableUsage.num_usages.
public void updateTableUsage(TUpdateTableUsageRequest req) {
for (TTableUsage usage : req.usages) {
Table table = null;
try {
table = getTable(usage.table_name.db_name, usage.table_name.table_name);
} catch (DatabaseNotFoundException e) {
// do nothing
if (table != null) table.refreshLastUsedTime();
CatalogdTableInvalidator getCatalogdTableInvalidator() {
return catalogdTableInvalidator_;
void setCatalogdTableInvalidator(CatalogdTableInvalidator cleaner) {
catalogdTableInvalidator_ = cleaner;
public void setMetastoreEventProcessor(
ExternalEventsProcessor metastoreEventProcessor) {
this.metastoreEventProcessor_ = metastoreEventProcessor;