blob: 0670c3c6d18b0f1de0e92b59327cf0c144fece23 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Stopwatch;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.events.SelfEventContext;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.Reference;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.CatalogServiceConstants;
import org.apache.impala.thrift.TCatalog;
import org.apache.impala.thrift.TCatalogInfoSelector;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCatalogUpdateResult;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TGetCatalogUsageResponse;
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
import org.apache.impala.thrift.TGetPartitionStatsRequest;
import org.apache.impala.thrift.TPartialCatalogInfo;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.thrift.TPrincipalType;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableUsage;
import org.apache.impala.thrift.TTableUsageMetrics;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.CatalogBlacklistUtils;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.TUniqueIdUtil;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* Specialized Catalog that implements the CatalogService specific Catalog
* APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
* and processing of DDL requests. The CatalogServiceCatalog maintains a global
* "catalog version". The version is incremented and assigned to a CatalogObject whenever
* it is added/modified/removed from the catalog. This means each CatalogObject will have
* a unique version and assigned versions are strictly increasing.
*
* Periodically, the CatalogServiceCatalog collects a delta of catalog updates (based on a
* specified catalog version) and constructs a topic update to be sent to the statestore.
* Each catalog topic update is defined by a range of catalog versions (from, to] and the
* CatalogServiceCatalog guarantees that every catalog object that has a version in the
* specified range is included in the catalog topic update. Concurrent DDL requests are
* allowed while a topic update is in progress. Hence, there is a non-zero probability
* that frequently modified catalog objects may keep skipping topic updates. That can
* happen when by the time a topic update thread tries to collect an object update, that
* object is being modified by another metadata operation, causing its version to surpass
* the 'to' version of the topic update. To ensure that all catalog updates
* are eventually included in a catalog topic update, we keep track of the number of times
* each catalog object has skipped a topic update and if that number exceeds a specified
* threshold, we add the catalog object to the next topic update even if its version is
* higher than the 'to' version of the topic update. As a result, the same version of an
* object might be sent in two subsequent topic updates.
*
* The CatalogServiceCatalog maintains two logs:
* - Delete log. Since deleted objects are removed from the cache, the cache itself is
* not useful for tracking deletions. This log is used for populating the list of
* deleted objects during a topic update by recording the catalog objects that
* have been removed from the catalog. An entry with a new version is added to this log
* every time an object is removed (e.g. dropTable). Incrementing an object's version
* and adding it to the delete log should be performed atomically. An entry is removed
* from this log by the topic update thread when the associated deletion entry is
* added to a topic update.
* - Topic update log. This log records information about the catalog objects that have
* been included in a catalog topic update. Only the thread that is processing the
* topic update is responsible for adding, updating, and removing entries from the log.
* All other operations (e.g. addTable) only read topic update log entries but never
* modify them. Each entry includes the number of times a catalog object has
* skipped a topic update, which version of the object was last sent in a topic update
* and what was the version of that topic update. Entries of the topic update log are
* garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
* update processing thread to prevent the log from growing indefinitely. Metadata
* operations using SYNC_DDL are inspecting this log to identify the catalog topic
* version that the issuing impalad must wait for in order to ensure that the effects
* of this operation have been broadcast to all the coordinators.
*
* Known anomalies with SYNC_DDL:
* The time-based cleanup process of the topic update log entries may cause metadata
* operations that use SYNC_DDL to hang while waiting for specific topic update log
* entries. That could happen if the thread processing the metadata operation stalls
* for a long period of time (longer than the time to process
* TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
* applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
* the probability of such an event, we set the value of the
* TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
* from hanging in that path due to unknown issues (e.g. bugs), operations using
* SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
* exception is thrown if the specified max wait time is exceeded. See
* waitForSyncDdlVersion() for more details.
*
* Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
* background by the TableLoadingMgr; tables can be prioritized for loading by calling
* prioritizeLoad(). Background loading can also be enabled for the catalog, in which
* case missing tables (tables that are not yet loaded) are submitted to the
* TableLoadingMgr any table metadata is invalidated and on startup. The metadata of
* fully loaded tables (e.g. HdfsTable, HBaseTable, etc) are updated in-place and don't
* trigger a background metadata load through the TableLoadingMgr. Accessing a table
* that is not yet loaded (via getTable()), will load the table's metadata on-demand,
* out-of-band of the table loading thread pool.
*
* See the class comments in CatalogOpExecutor for a description of the locking protocol
* that should be employed if both the version lock and table locks need to be held at
* the same time.
*
* TODO: Consider removing on-demand loading and have everything go through the table
* loading thread pool.
*/
public class CatalogServiceCatalog extends Catalog {
public static final Logger LOG = LoggerFactory.getLogger(CatalogServiceCatalog.class);
private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
// Timeout for acquiring a table lock
// TODO: Make this configurable
private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
// Time to sleep before retrying to acquire a table lock
private static final int TBL_LOCK_RETRY_MS = 10;
private final TUniqueId catalogServiceId_;
// Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
// protects catalogVersion_, it can be used to perform atomic bulk catalog operations
// since catalogVersion_ cannot change externally while the lock is being held.
// In addition to protecting catalogVersion_, it is currently used for the
// following bulk operations:
// * Building a delta update to send to the statestore in getCatalogObjects(),
// so a snapshot of the catalog can be taken without any version changes.
// * During a catalog invalidation (call to reset()), which re-reads all dbs and tables
// from the metastore.
// * During renameTable(), because a table must be removed and added to the catalog
// atomically (potentially in a different database).
private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true);
// Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
// with each update to the Catalog. Continued across the lifetime of the object.
// Protected by versionLock_.
// TODO: Handle overflow of catalogVersion_ and nextTableId_.
// TODO: The name of this variable is misleading and can be interpreted as a property
// of the catalog server. Rename into something that indicates its role as a global
// sequence number assigned to catalog objects.
private long catalogVersion_ = INITIAL_CATALOG_VERSION;
// The catalog version when we ran reset() last time. Protected by versionLock_.
private long lastResetStartVersion_ = INITIAL_CATALOG_VERSION;
// Manages the scheduling of background table loading.
private final TableLoadingMgr tableLoadingMgr_;
private final boolean loadInBackground_;
// Periodically polls HDFS to get the latest set of known cache pools.
private final ScheduledExecutorService cachePoolReader_ =
Executors.newScheduledThreadPool(1);
// Log of deleted catalog objects.
private final CatalogDeltaLog deleteLog_;
// Version of the last topic update returned to the statestore.
// The version of a topic update is the catalog version of the CATALOG object
// that is added to it.
private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1);
// Wait time for a topic update.
private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000;
private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
private final String localLibraryPath_;
private CatalogdTableInvalidator catalogdTableInvalidator_;
// Manages the event processing from metastore for issuing invalidates on tables
private ExternalEventsProcessor metastoreEventProcessor_;
/**
* See the gflag definition in be/.../catalog-server.cc for details on these modes.
*/
private static enum TopicMode {
FULL,
MIXED,
MINIMAL
};
final TopicMode topicMode_;
private final long PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S = BackendConfig.INSTANCE
.getCatalogPartialFetchRpcQueueTimeoutS();
private final int MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT = BackendConfig.INSTANCE
.getCatalogMaxParallelPartialFetchRpc();
// Controls concurrent access to doGetPartialCatalogObject() call. Limits the number
// of parallel requests to --catalog_max_parallel_partial_fetch_rpc.
private final Semaphore partialObjectFetchAccess_ =
new Semaphore(MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT, /*fair =*/ true);
private AuthorizationManager authzManager_;
// Databases that will be skipped in loading.
private final Set<String> blacklistedDbs_;
// Tables that will be skipped in loading.
private final Set<TableName> blacklistedTables_;
/**
* Initialize the CatalogServiceCatalog using a given MetastoreClientPool impl.
*
* @param loadInBackground If true, table metadata will be loaded in the background.
* @param numLoadingThreads Number of threads used to load table metadata.
* @param metaStoreClientPool A pool of HMS clients backing this Catalog.
* @throws ImpalaException
*/
public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
TUniqueId catalogServiceId, String localLibraryPath,
MetaStoreClientPool metaStoreClientPool)
throws ImpalaException {
super(metaStoreClientPool);
blacklistedDbs_ = CatalogBlacklistUtils.parseBlacklistedDbs(
BackendConfig.INSTANCE.getBlacklistedDbs(), LOG);
blacklistedTables_ = CatalogBlacklistUtils.parseBlacklistedTables(
BackendConfig.INSTANCE.getBlacklistedTables(), LOG);
catalogServiceId_ = catalogServiceId;
tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
loadInBackground_ = loadInBackground;
try {
// We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3,
// local, etc.)
if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
cachePoolReader_.scheduleAtFixedRate(
new CachePoolReader(false), 0, 1, TimeUnit.MINUTES);
}
} catch (IOException e) {
LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
}
localLibraryPath_ = localLibraryPath;
deleteLog_ = new CatalogDeltaLog();
topicMode_ = TopicMode.valueOf(
BackendConfig.INSTANCE.getBackendCfg().catalog_topic_mode.toUpperCase());
catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
BackendConfig.INSTANCE);
metastoreEventProcessor_ = getEventsProcessor();
Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
// start polling for metastore events
metastoreEventProcessor_.start();
}
/**
* Initializes the Catalog using the default MetastoreClientPool impl.
* @param initialHmsCnxnTimeoutSec Time (in seconds) CatalogServiceCatalog will wait
* to establish an initial connection to the HMS before giving up.
*/
public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
int initialHmsCnxnTimeoutSec, TUniqueId catalogServiceId, String localLibraryPath)
throws ImpalaException {
this(loadInBackground, numLoadingThreads, catalogServiceId, localLibraryPath,
new MetaStoreClientPool(INITIAL_META_STORE_CLIENT_POOL_SIZE,
initialHmsCnxnTimeoutSec));
}
/**
* Check whether the database is in blacklist
*/
public boolean isBlacklistedDb(String dbName) {
return blacklistedDbs_.contains(dbName);
}
/**
* Check whether the table is in blacklist
*/
public boolean isBlacklistedTable(TableName table) {
return blacklistedTables_.contains(table);
}
/**
* Check whether the table is in blacklist
*/
public boolean isBlacklistedTable(String db, String table) {
return isBlacklistedTable(new TableName(db, table));
}
public void setAuthzManager(AuthorizationManager authzManager) {
authzManager_ = Preconditions.checkNotNull(authzManager);
}
/**
* Returns a Metastore event processor object if
* <code>BackendConfig#getHMSPollingIntervalInSeconds</code> returns a non-zero
*.value of polling interval. Otherwise, returns a no-op events processor. It is
* important to fetch the current notification event id at the Catalog service
* initialization time so that event processor starts to sync at the event id
* corresponding to the catalog start time.
*/
private ExternalEventsProcessor getEventsProcessor() throws ImpalaException {
long eventPollingInterval = BackendConfig.INSTANCE.getHMSPollingIntervalInSeconds();
if (eventPollingInterval <= 0) {
LOG.info(String
.format("Metastore event processing is disabled. Event polling interval is %d",
eventPollingInterval));
return NoOpEventProcessor.getInstance();
}
try (MetaStoreClient metaStoreClient = getMetaStoreClient()) {
CurrentNotificationEventId currentNotificationId =
metaStoreClient.getHiveClient().getCurrentNotificationEventId();
return MetastoreEventsProcessor.getInstance(
this, currentNotificationId.getEventId(), eventPollingInterval);
} catch (TException e) {
LOG.error("Unable to fetch the current notification event id from metastore."
+ "Metastore event processing will be disabled.", e);
throw new CatalogException(
"Fatal error while initializing metastore event processor", e);
}
}
@VisibleForTesting
public ExternalEventsProcessor getMetastoreEventProcessor() {
return metastoreEventProcessor_;
}
public boolean isEventProcessingActive() {
return metastoreEventProcessor_ instanceof MetastoreEventsProcessor
&& EventProcessorStatus.ACTIVE
.equals(((MetastoreEventsProcessor) metastoreEventProcessor_).getStatus());
}
/**
* Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
* successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
* when the function returns. Returns false otherwise and no lock is held in this case.
*/
public boolean tryLockTable(Table tbl) {
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
"Attempting to lock table " + tbl.getFullName())) {
long begin = System.currentTimeMillis();
long end;
do {
versionLock_.writeLock().lock();
if (tbl.getLock().tryLock()) {
if (LOG.isTraceEnabled()) {
end = System.currentTimeMillis();
LOG.trace(String.format("Lock for table %s was acquired in %d msec",
tbl.getFullName(), end - begin));
}
return true;
}
versionLock_.writeLock().unlock();
try {
// Sleep to avoid spinning and allow other operations to make progress.
Thread.sleep(TBL_LOCK_RETRY_MS);
} catch (InterruptedException e) {
// ignore
}
end = System.currentTimeMillis();
} while (end - begin < TBL_LOCK_TIMEOUT_MS);
return false;
}
}
/**
* Reads the current set of cache pools from HDFS and updates the catalog.
* Called periodically by the cachePoolReader_.
*/
protected class CachePoolReader implements Runnable {
// If true, existing cache pools will get a new catalog version and, consequently,
// they will be added to the next topic update, triggering an update in each
// coordinator's local catalog cache. This is needed for the case of INVALIDATE
// METADATA where a new catalog version needs to be assigned to every catalog object.
private final boolean incrementVersions_;
/**
* This constructor is needed to create a non-threaded execution of the class.
*/
public CachePoolReader(boolean incrementVersions) {
super();
incrementVersions_ = incrementVersions;
}
@Override
public void run() {
if (LOG.isTraceEnabled()) LOG.trace("Reloading cache pool names from HDFS");
// Map of cache pool name to CachePoolInfo. Stored in a map to allow Set operations
// to be performed on the keys.
Map<String, CachePoolInfo> currentCachePools = new HashMap<>();
try {
DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem();
RemoteIterator<CachePoolEntry> itr = dfs.listCachePools();
while (itr.hasNext()) {
CachePoolInfo cachePoolInfo = itr.next().getInfo();
currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo);
}
} catch (Exception e) {
LOG.error("Error loading cache pools: ", e);
return;
}
versionLock_.writeLock().lock();
try {
// Determine what has changed relative to what we have cached.
Set<String> droppedCachePoolNames = Sets.difference(
hdfsCachePools_.keySet(), currentCachePools.keySet());
Set<String> createdCachePoolNames = Sets.difference(
currentCachePools.keySet(), hdfsCachePools_.keySet());
Set<String> survivingCachePoolNames = Sets.difference(
hdfsCachePools_.keySet(), droppedCachePoolNames);
// Add all new cache pools.
for (String createdCachePool: createdCachePoolNames) {
HdfsCachePool cachePool = new HdfsCachePool(
currentCachePools.get(createdCachePool));
cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
hdfsCachePools_.add(cachePool);
}
// Remove dropped cache pools.
for (String cachePoolName: droppedCachePoolNames) {
HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
if (cachePool != null) {
cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
TCatalogObject removedObject =
new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
cachePool.getCatalogVersion());
removedObject.setCache_pool(cachePool.toThrift());
deleteLog_.addRemovedObject(removedObject);
}
}
if (incrementVersions_) {
// Increment the version of existing pools in order to be added to the next
// topic update.
for (String survivingCachePoolName: survivingCachePoolNames) {
HdfsCachePool cachePool = hdfsCachePools_.get(survivingCachePoolName);
Preconditions.checkNotNull(cachePool);
cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
}
}
} finally {
versionLock_.writeLock().unlock();
}
}
}
public int getPartialFetchRpcQueueLength() {
return partialObjectFetchAccess_.getQueueLength();
}
/**
* Adds a list of cache directive IDs for the given table name. Asynchronously
* refreshes the table metadata once all cache directives complete.
*/
public void watchCacheDirs(List<Long> dirIds, TTableName tblName, String reason) {
tableLoadingMgr_.watchCacheDirs(dirIds, tblName, reason);
}
/**
* Prioritizes the loading of the given list TCatalogObjects. Currently only support
* loading Table/View metadata since Db and Function metadata is not loaded lazily.
*/
public void prioritizeLoad(List<TCatalogObject> objectDescs) {
for (TCatalogObject catalogObject: objectDescs) {
Preconditions.checkState(catalogObject.isSetTable());
TTable table = catalogObject.getTable();
tableLoadingMgr_.prioritizeLoad(new TTableName(table.getDb_name().toLowerCase(),
table.getTbl_name().toLowerCase()));
}
}
/**
* Retrieves TPartitionStats as a map that associates partitions with their
* statistics. The table partitions are specified in
* TGetPartitionStatsRequest. If statistics are not available for a partition,
* a default TPartitionStats is used. Partitions are identified by their partitioning
* column string values.
*/
public Map<String, ByteBuffer> getPartitionStats(TGetPartitionStatsRequest request)
throws CatalogException {
Preconditions.checkState(!RuntimeEnv.INSTANCE.isTestEnv());
TTableName tableName = request.table_name;
LOG.info("Fetching partition statistics for: " + tableName.getDb_name() + "."
+ tableName.getTable_name());
Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
"needed to fetch partition stats");
// Table could be null if it does not exist anymore.
if (table == null) {
throw new CatalogException(
"Requested partition statistics for table that does not exist: "
+ request.table_name);
}
// Table could be incomplete, in which case an exception should be thrown.
if (table instanceof IncompleteTable) {
throw new CatalogException("No statistics available for incompletely"
+ " loaded table: " + request.table_name, ((IncompleteTable) table).getCause());
}
// Table must be a FeFsTable type at this point.
Preconditions.checkArgument(table instanceof HdfsTable,
"Partition statistics can only be requested for FS tables, type is: %s",
table.getClass().getCanonicalName());
// Table must be loaded.
Preconditions.checkState(table.isLoaded());
Map<String, ByteBuffer> stats = new HashMap<>();
HdfsTable hdfsTable = (HdfsTable) table;
hdfsTable.getLock().lock();
try {
Collection<? extends PrunablePartition> partitions = hdfsTable.getPartitions();
for (PrunablePartition partition : partitions) {
Preconditions.checkState(partition instanceof FeFsPartition);
FeFsPartition fsPartition = (FeFsPartition) partition;
TPartitionStats partStats = fsPartition.getPartitionStats();
if (partStats != null) {
ByteBuffer compressedStats =
ByteBuffer.wrap(fsPartition.getPartitionStatsCompressed());
stats.put(FeCatalogUtils.getPartitionName(fsPartition), compressedStats);
}
}
} finally {
hdfsTable.getLock().unlock();
}
LOG.info("Fetched partition statistics for " + stats.size()
+ " partitions on: " + hdfsTable.getFullName());
return stats;
}
/**
* The context for add*ToCatalogDelta(), called by getCatalogDelta. It contains
* callback information, version range and collected topics.
*/
class GetCatalogDeltaContext {
// The CatalogServer pointer for NativeAddPendingTopicItem() callback.
long nativeCatalogServerPtr;
// The from and to version of this delta.
long fromVersion;
long toVersion;
long lastResetStartVersion;
// The keys of the updated topics.
Set<String> updatedCatalogObjects;
TSerializer serializer;
GetCatalogDeltaContext(long nativeCatalogServerPtr, long fromVersion, long toVersion,
long lastResetStartVersion)
{
this.nativeCatalogServerPtr = nativeCatalogServerPtr;
this.fromVersion = fromVersion;
this.toVersion = toVersion;
this.lastResetStartVersion = lastResetStartVersion;
updatedCatalogObjects = new HashSet<>();
serializer = new TSerializer(new TBinaryProtocol.Factory());
}
void addCatalogObject(TCatalogObject obj, boolean delete) throws TException {
String key = Catalog.toCatalogObjectKey(obj);
if (obj.type != TCatalogObjectType.CATALOG) {
topicUpdateLog_.add(key,
new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion));
if (!delete) updatedCatalogObjects.add(key);
}
// TODO: TSerializer.serialize() returns a copy of the internal byte array, which
// could be elided.
if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
byte[] data = serializer.serialize(obj);
if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
obj.catalog_version, data, delete)) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", delete="
+ delete + ", data_size=" + data.length);
}
}
if (topicMode_ == TopicMode.MINIMAL || topicMode_ == TopicMode.MIXED) {
// Serialize a minimal version of the object that can be used by impalads
// that are running in 'local-catalog' mode. This is used by those impalads
// to invalidate their local cache.
TCatalogObject minimalObject = getMinimalObjectForV2(obj);
if (minimalObject != null) {
byte[] data = serializer.serialize(minimalObject);
String v2Key = CatalogServiceConstants.CATALOG_TOPIC_V2_PREFIX + key;
if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v2Key,
obj.catalog_version, data, delete)) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v2Key + ", delete="
+ delete + ", data_size=" + data.length);
}
}
}
}
private TCatalogObject getMinimalObjectForV2(TCatalogObject obj) {
Preconditions.checkState(topicMode_ == TopicMode.MINIMAL ||
topicMode_ == TopicMode.MIXED);
TCatalogObject min = new TCatalogObject(obj.type, obj.catalog_version);
switch (obj.type) {
case DATABASE:
min.setDb(new TDatabase(obj.db.db_name));
break;
case TABLE:
case VIEW:
min.setTable(new TTable(obj.table.db_name, obj.table.tbl_name));
break;
case CATALOG:
// Sending the top-level catalog version is important for implementing SYNC_DDL.
// This also allows impalads to detect a catalogd restart and invalidate the
// whole catalog.
// TODO(todd) ensure that the impalad does this invalidation as required.
return obj;
case PRIVILEGE:
case PRINCIPAL:
case AUTHZ_CACHE_INVALIDATION:
// The caching of this data on the impalad side is somewhat complex
// and this code is also under some churn at the moment. So, we'll just publish
// the full information rather than doing fetch-on-demand.
return obj;
case FUNCTION:
TFunction fnObject = new TFunction(obj.fn.getName());
// IMPALA-8486: add the hdfs location so coordinators can mark their libCache
// entry for this function to be stale.
if (obj.fn.hdfs_location != null) fnObject.setHdfs_location(obj.fn.hdfs_location);
min.setFn(fnObject);
break;
case DATA_SOURCE:
case HDFS_CACHE_POOL:
// These are currently not cached by v2 impalad.
// TODO(todd): handle these items.
return null;
default:
throw new AssertionError("Unexpected catalog object type: " + obj.type);
}
return min;
}
public boolean versionNotInRange(long version) {
return version <= fromVersion || version > toVersion;
}
}
/**
* Identifies the catalog objects that were added/modified/deleted in the catalog with
* versions > 'fromVersion'. It operates on a snaphsot of the catalog without holding
* the catalog lock which means that other concurrent metadata operations can still make
* progress while the catalog delta is computed. An entry in the topic update log is
* added for every catalog object that is included in the catalog delta. The log is
* examined by operations using SYNC_DDL to determine which topic update covers the
* result set of metadata operation. Once the catalog delta is computed, the entries in
* the delete log with versions less than 'fromVersion' are garbage collected.
* The catalog delta is passed to the backend by calling NativeAddPendingTopicItem().
*/
public long getCatalogDelta(long nativeCatalogServerPtr, long fromVersion) throws
TException {
GetCatalogDeltaContext ctx;
// Get lock to read catalogVersion_ and lastResetStartVersion_
versionLock_.readLock().lock();
try {
ctx = new GetCatalogDeltaContext(nativeCatalogServerPtr, fromVersion,
catalogVersion_, lastResetStartVersion_);
} finally {
versionLock_.readLock().unlock();
}
for (Db db: getAllDbs()) {
addDatabaseToCatalogDelta(db, ctx);
}
for (DataSource dataSource: getAllDataSources()) {
addDataSourceToCatalogDelta(dataSource, ctx);
}
for (HdfsCachePool cachePool: getAllHdfsCachePools()) {
addHdfsCachePoolToCatalogDelta(cachePool, ctx);
}
for (Role role: getAllRoles()) {
addPrincipalToCatalogDelta(role, ctx);
}
for (User user: getAllUsers()) {
addPrincipalToCatalogDelta(user, ctx);
}
for (AuthzCacheInvalidation authzCacheInvalidation: getAllAuthzCacheInvalidation()) {
addAuthzCacheInvalidationToCatalogDelta(authzCacheInvalidation, ctx);
}
// Identify the catalog objects that were removed from the catalog for which their
// versions are in range ('ctx.fromVersion', 'ctx.toVersion']. We need to make sure
// that we don't include "deleted" objects that were re-added to the catalog.
for (TCatalogObject removedObject:
getDeletedObjects(ctx.fromVersion, ctx.toVersion)) {
if (!ctx.updatedCatalogObjects.contains(
Catalog.toCatalogObjectKey(removedObject))) {
ctx.addCatalogObject(removedObject, true);
}
}
// Each topic update should contain a single "TCatalog" object which is used to
// pass overall state on the catalog, such as the current version and the
// catalog service id. By setting the catalog version to the latest catalog
// version at this point, it ensures impalads will always bump their versions,
// even in the case where an object has been dropped. Also pass the catalog version
// when we reset the entire catalog last time. So coordinators in local catalog mode
// can safely forward their min catalog version.
TCatalogObject catalog =
new TCatalogObject(TCatalogObjectType.CATALOG, ctx.toVersion);
catalog.setCatalog(new TCatalog(catalogServiceId_, ctx.lastResetStartVersion));
ctx.addCatalogObject(catalog, false);
// Garbage collect the delete and topic update log.
deleteLog_.garbageCollect(ctx.toVersion);
topicUpdateLog_.garbageCollectUpdateLogEntries(ctx.toVersion);
lastSentTopicUpdate_.set(ctx.toVersion);
// Notify any operation that is waiting on the next topic update.
synchronized (topicUpdateLog_) {
topicUpdateLog_.notifyAll();
}
return ctx.toVersion;
}
/**
* Gets the list of versions for in-flight events for the given table. Applicable
* only when external event processing is enabled.
* @param dbName database name
* @param tblName table name
* @return List of previous version numbers for in-flight events on this table.
* If table is not laoded returns a empty list. If event processing is disabled,
* returns a empty list
*/
public List<Long> getInFlightVersionsForEvents(String dbName, String tblName)
throws DatabaseNotFoundException, TableNotFoundException {
Preconditions.checkState(isEventProcessingActive(),
"Event processing should be enabled before calling this method");
List<Long> result = Collections.EMPTY_LIST;
versionLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException(
String.format("Database %s not found", dbName));
}
if (tblName == null) {
return db.getVersionsForInflightEvents();
}
Table tbl = getTable(dbName, tblName);
if (tbl == null) {
throw new TableNotFoundException(
String.format("Table %s not found", new TableName(dbName, tblName)));
}
if (tbl instanceof IncompleteTable) return result;
return tbl.getVersionsForInflightEvents();
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Evaluates if the information from an event (serviceId and versionNumber) matches to
* the catalog object. If there is match, the in-flight version for that object is
* removed and method returns true. If it does not match, returns false
* @param ctx self context which provides all the information needed to
* evaluate if this is a self-event or not
* @return true if given event information evaluates to a self-event, false otherwise
*/
public boolean evaluateSelfEvent(SelfEventContext ctx)
throws CatalogException {
Preconditions.checkState(isEventProcessingActive(),
"Event processing should be enabled when calling this method");
long versionNumber = ctx.getVersionNumberFromEvent();
String serviceIdFromEvent = ctx.getServiceIdFromEvent();
// no version info or service id in the event
if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) return false;
// if the service id from event doesn't match with our service id this is not a
// self-event
if (!getCatalogServiceId().equals(serviceIdFromEvent)) return false;
Db db = getDb(ctx.getDbName());
if (db == null) {
throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " not found");
}
// if the given tblName is null we look db's in-flight events
if (ctx.getTblName() == null) {
return db.removeFromVersionsForInflightEvents(versionNumber);
}
Table tbl = getTable(ctx.getDbName(), ctx.getTblName());
if (tbl == null) {
throw new TableNotFoundException(
String.format("Table %s.%s not found", ctx.getDbName(), ctx.getTblName()));
}
// we should acquire the table lock so that we wait for any other updates
// happening to this table at the same time
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error during self-event evaluation "
+ "for table %s due to lock contention", tbl.getFullName()));
}
versionLock_.writeLock().unlock();
try {
List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
// if the partitionKeyValues is null, we look for tbl's in-flight events
if (partitionKeyValues == null) {
return tbl.removeFromVersionsForInflightEvents(versionNumber);
}
if (tbl instanceof HdfsTable) {
List<String> failingPartitions = new ArrayList<>();
for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
HdfsPartition hdfsPartition =
((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
if (hdfsPartition == null || !hdfsPartition
.removeFromVersionsForInflightEvents(versionNumber)) {
// even if this is an error condition we should not bail out early since we
// should clean up the self-event state on the rest of the partitions
String partName = HdfsTable.constructPartitionName(partitionKeyValue);
if (hdfsPartition == null) {
LOG.warn(String.format("Partition %s not found during self-event "
+ "evaluation for the table %s", partName, tbl.getFullName()));
}
failingPartitions.add(partName);
}
}
return failingPartitions.isEmpty();
}
} finally {
tbl.getLock().unlock();
}
return false;
}
/**
* Adds a given version number from the catalog table's list of versions for in-flight
* events. Applicable only when external event processing is enabled.
*
* @param tbl Catalog table
* @param versionNumber version number to be added
*/
public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
if (!isEventProcessingActive()) return;
// we generally don't take locks on Incomplete tables since they are atomically
// replaced during load
Preconditions.checkState(
tbl instanceof IncompleteTable || tbl.getLock().isHeldByCurrentThread());
tbl.addToVersionsForInflightEvents(versionNumber);
}
/**
* Adds a given version number from the catalog database's list of versions for
* in-flight events. Applicable only when external event processing is enabled.
*
* @param db Catalog database
* @param versionNumber version number to be added
*/
public void addVersionsForInflightEvents(Db db, long versionNumber) {
if (!isEventProcessingActive()) return;
db.addToVersionsForInflightEvents(versionNumber);
}
/**
* Get a snapshot view of all the catalog objects that were deleted between versions
* ('fromVersion', 'toVersion'].
*/
private List<TCatalogObject> getDeletedObjects(long fromVersion, long toVersion) {
versionLock_.readLock().lock();
try {
return deleteLog_.retrieveObjects(fromVersion, toVersion);
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all the databases in the catalog.
*/
List<Db> getAllDbs() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(dbCache_.get().values());
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all the data sources in the catalog.
*/
private List<DataSource> getAllDataSources() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(getDataSources());
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all the Hdfs cache pools in the catalog.
*/
private List<HdfsCachePool> getAllHdfsCachePools() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(hdfsCachePools_);
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all the roles in the catalog.
*/
private List<Role> getAllRoles() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(authPolicy_.getAllRoles());
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all the users in the catalog.
*/
private List<User> getAllUsers() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(authPolicy_.getAllUsers());
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all authz cache invalidation markers in the catalog.
*/
private List<AuthzCacheInvalidation> getAllAuthzCacheInvalidation() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(authzCacheInvalidation_);
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Adds a database in the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the tables and
* functions of this database to determine if they can be included in the topic update.
*/
private void addDatabaseToCatalogDelta(Db db, GetCatalogDeltaContext ctx)
throws TException {
long dbVersion = db.getCatalogVersion();
if (dbVersion > ctx.fromVersion && dbVersion <= ctx.toVersion) {
TCatalogObject catalogDb =
new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion);
catalogDb.setDb(db.toThrift());
ctx.addCatalogObject(catalogDb, false);
}
for (Table tbl: getAllTables(db)) {
addTableToCatalogDelta(tbl, ctx);
}
for (Function fn: getAllFunctions(db)) {
addFunctionToCatalogDelta(fn, ctx);
}
}
/**
* Get a snapshot view of all the tables in a database.
*/
List<Table> getAllTables(Db db) {
Preconditions.checkNotNull(db);
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(db.getTables());
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Get a snapshot view of all the functions in a database.
*/
private List<Function> getAllFunctions(Db db) {
Preconditions.checkNotNull(db);
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(db.getFunctions(null, new PatternMatcher()));
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Given a database name and a property key returns the value of the key from the
* parameters map of the HMS db object
* @param dbName name of the database
* @param propertyKey property key
* @return value of key from the db parameter. returns null if Db is not found or key
* does not exist in the parameters
*/
public String getDbProperty(String dbName, String propertyKey) {
Preconditions.checkNotNull(dbName);
Preconditions.checkNotNull(propertyKey);
versionLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) return null;
if (!db.getMetaStoreDb().isSetParameters()) return null;
return db.getMetaStoreDb().getParameters().get(propertyKey);
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Given a dbname, table name and a key returns the value of the key from the cached
* Table object's parameters
* @return Value of the parameter which maps to property key, null if the table
* doesn't exist, if it is a incomplete table or if the parameter is not found
*/
public List<String> getTableProperties(
String dbName, String tblName, List<String> propertyKeys) {
Preconditions.checkNotNull(dbName);
Preconditions.checkNotNull(tblName);
Preconditions.checkNotNull(propertyKeys);
versionLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) return null;
Table tbl = db.getTable(tblName);
if (tbl == null || tbl instanceof IncompleteTable) return null;
if (!tbl.getMetaStoreTable().isSetParameters()) return null;
List<String> propertyValues = new ArrayList<>(propertyKeys.size());
for (String propertyKey : propertyKeys) {
propertyValues.add(tbl.getMetaStoreTable().getParameters().get(propertyKey));
}
return propertyValues;
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Updates the Db with the given metastore database object. Useful to doing in-place
* updates to the HMS db like in case of changing owner, adding comment or setting
* certain properties
* @param msDb The HMS database object to be used to update
* @return The updated Db object
* @throws DatabaseNotFoundException if Db with the name provided by given Database
* is not found in Catalog
*/
public Db updateDb(Database msDb) throws DatabaseNotFoundException {
Preconditions.checkNotNull(msDb);
Preconditions.checkNotNull(msDb.getName());
versionLock_.writeLock().lock();
try {
Db db = getDb(msDb.getName());
if (db == null) {
throw new DatabaseNotFoundException("Database " + msDb.getName() + " not found");
}
db.setMetastoreDb(msDb.getName(), msDb);
db.setCatalogVersion(incrementAndGetCatalogVersion());
return db;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a table in the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. If the table's version is larger than
* 'ctx.toVersion' and the table has skipped a topic update
* 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is included in the topic update. This
* prevents tables that are updated frequently from skipping topic updates indefinitely,
* which would also violate the semantics of SYNC_DDL.
*/
private void addTableToCatalogDelta(Table tbl, GetCatalogDeltaContext ctx)
throws TException {
if (tbl.getCatalogVersion() <= ctx.toVersion) {
addTableToCatalogDeltaHelper(tbl, ctx);
} else {
TopicUpdateLog.Entry topicUpdateEntry =
topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
Preconditions.checkNotNull(topicUpdateEntry);
if (topicUpdateEntry.getNumSkippedTopicUpdates() == MAX_NUM_SKIPPED_TOPIC_UPDATES) {
addTableToCatalogDeltaHelper(tbl, ctx);
} else {
LOG.info("Table {} (version={}) is skipping topic update ({}, {}]",
tbl.getFullName(), tbl.getCatalogVersion(), ctx.fromVersion, ctx.toVersion);
topicUpdateLog_.add(tbl.getUniqueName(),
new TopicUpdateLog.Entry(
topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
topicUpdateEntry.getLastSentVersion(),
topicUpdateEntry.getLastSentCatalogUpdate()));
}
}
}
/**
* Helper function that tries to add a table in a topic update. It acquires table's
* lock and checks if its version is in the ('ctx.fromVersion', 'ctx.toVersion'] range
* and how many consecutive times (if any) has the table skipped a topic update.
*/
private void addTableToCatalogDeltaHelper(Table tbl, GetCatalogDeltaContext ctx)
throws TException {
TCatalogObject catalogTbl =
new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
tbl.getLock().lock();
try {
long tblVersion = tbl.getCatalogVersion();
if (tblVersion <= ctx.fromVersion) return;
String tableUniqueName = tbl.getUniqueName();
TopicUpdateLog.Entry topicUpdateEntry =
topicUpdateLog_.getOrCreateLogEntry(tableUniqueName);
if (tblVersion > ctx.toVersion &&
topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) {
LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
ctx.toVersion);
topicUpdateLog_.add(tableUniqueName,
new TopicUpdateLog.Entry(
topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
topicUpdateEntry.getLastSentVersion(),
topicUpdateEntry.getLastSentCatalogUpdate()));
return;
}
try {
catalogTbl.setTable(tbl.toThrift());
} catch (Exception e) {
LOG.error(String.format("Error calling toThrift() on table %s: %s",
tbl.getFullName(), e.getMessage()), e);
return;
}
catalogTbl.setCatalog_version(tbl.getCatalogVersion());
ctx.addCatalogObject(catalogTbl, false);
} finally {
tbl.getLock().unlock();
}
}
/**
* Adds a function to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
*/
private void addFunctionToCatalogDelta(Function fn, GetCatalogDeltaContext ctx)
throws TException {
long fnVersion = fn.getCatalogVersion();
if (ctx.versionNotInRange(fnVersion)) return;
TCatalogObject function =
new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion);
function.setFn(fn.toThrift());
ctx.addCatalogObject(function, false);
}
/**
* Adds a data source to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
*/
private void addDataSourceToCatalogDelta(DataSource dataSource,
GetCatalogDeltaContext ctx) throws TException {
long dsVersion = dataSource.getCatalogVersion();
if (ctx.versionNotInRange(dsVersion)) return;
TCatalogObject catalogObj =
new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion);
catalogObj.setData_source(dataSource.toThrift());
ctx.addCatalogObject(catalogObj, false);
}
/**
* Adds a HDFS cache pool to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
*/
private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool,
GetCatalogDeltaContext ctx) throws TException {
long cpVersion = cachePool.getCatalogVersion();
if (ctx.versionNotInRange(cpVersion)) return;
TCatalogObject pool =
new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion);
pool.setCache_pool(cachePool.toThrift());
ctx.addCatalogObject(pool, false);
}
/**
* Adds a principal to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the privileges of
* this principal to determine if they can be inserted in the topic update.
*/
private void addPrincipalToCatalogDelta(Principal principal, GetCatalogDeltaContext ctx)
throws TException {
long principalVersion = principal.getCatalogVersion();
if (!ctx.versionNotInRange(principalVersion)) {
TCatalogObject thriftPrincipal =
new TCatalogObject(TCatalogObjectType.PRINCIPAL, principalVersion);
thriftPrincipal.setPrincipal(principal.toThrift());
ctx.addCatalogObject(thriftPrincipal, false);
}
for (PrincipalPrivilege p: getAllPrivileges(principal)) {
addPrincipalPrivilegeToCatalogDelta(p, ctx);
}
}
/**
* Get a snapshot view of all the privileges in a principal.
*/
private List<PrincipalPrivilege> getAllPrivileges(Principal principal) {
Preconditions.checkNotNull(principal);
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(principal.getPrivileges());
} finally {
versionLock_.readLock().unlock();
}
}
/**
* Adds a principal privilege to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
*/
private void addPrincipalPrivilegeToCatalogDelta(PrincipalPrivilege priv,
GetCatalogDeltaContext ctx) throws TException {
long privVersion = priv.getCatalogVersion();
if (ctx.versionNotInRange(privVersion)) return;
TCatalogObject privilege =
new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion);
privilege.setPrivilege(priv.toThrift());
ctx.addCatalogObject(privilege, false);
}
/**
* Adds an authz cache invalidation to the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion'].
*/
private void addAuthzCacheInvalidationToCatalogDelta(
AuthzCacheInvalidation authzCacheInvalidation, GetCatalogDeltaContext ctx)
throws TException {
long authzCacheInvalidationVersion = authzCacheInvalidation.getCatalogVersion();
if (ctx.versionNotInRange(authzCacheInvalidationVersion)) return;
TCatalogObject catalogObj = new TCatalogObject(
TCatalogObjectType.AUTHZ_CACHE_INVALIDATION, authzCacheInvalidationVersion);
catalogObj.setAuthz_cache_invalidation(authzCacheInvalidation.toThrift());
ctx.addCatalogObject(catalogObj, false);
}
/**
* Returns all user defined functions (aggregate and scalar) in the specified database.
* Functions are not returned in a defined order.
*/
public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Contains map of overloaded function names to all functions matching that name.
Map<String, List<Function>> dbFns = db.getAllFunctions();
List<Function> fns = new ArrayList<>(dbFns.size());
for (List<Function> fnOverloads: dbFns.values()) {
for (Function fn: fnOverloads) {
fns.add(fn);
}
}
return fns;
}
/**
* Extracts Impala functions stored in metastore db parameters and adds them to
* the catalog cache.
*/
private void loadFunctionsFromDbParams(Db db,
org.apache.hadoop.hive.metastore.api.Database msDb) {
if (msDb == null || msDb.getParameters() == null) return;
LOG.info("Loading native functions for database: " + db.getName());
List<Function> funcs = FunctionUtils.deserializeNativeFunctionsFromDbParams(
msDb.getParameters());
for (Function f : funcs) {
db.addFunction(f, false);
f.setCatalogVersion(incrementAndGetCatalogVersion());
}
LOG.info("Loaded native functions for database: " + db.getName());
}
/**
* Loads Java functions into the catalog. For each function in "functions",
* we extract all Impala compatible evaluate() signatures and load them
* as separate functions in the catalog.
*/
private void loadJavaFunctions(Db db,
List<org.apache.hadoop.hive.metastore.api.Function> functions) {
Preconditions.checkNotNull(functions);
if (BackendConfig.INSTANCE.disableCatalogDataOpsDebugOnly()) {
LOG.info("Skip loading Java functions: catalog data ops disabled.");
return;
}
LOG.info("Loading Java functions for database: " + db.getName());
for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
try {
List<Function> fns = FunctionUtils.extractFunctions(db.getName(), function,
localLibraryPath_);
for (Function fn: fns) {
db.addFunction(fn);
fn.setCatalogVersion(incrementAndGetCatalogVersion());
}
} catch (Exception e) {
LOG.error("Skipping function load: " + function.getFunctionName(), e);
}
}
LOG.info("Loaded Java functions for database: " + db.getName());
}
/**
* Reloads function metadata for 'dbName' database. Populates the 'addedFuncs' list
* with functions that were added as a result of this operation. Populates the
* 'removedFuncs' list with functions that were removed.
*/
public void refreshFunctions(MetaStoreClient msClient, String dbName,
List<TCatalogObject> addedFuncs, List<TCatalogObject> removedFuncs)
throws CatalogException {
// Create a temporary database that will contain all the functions from the HMS.
Db tmpDb;
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
new ArrayList<>();
for (String javaFn : msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
}
// Contains native functions in it's params map.
org.apache.hadoop.hive.metastore.api.Database msDb =
msClient.getHiveClient().getDatabase(dbName);
tmpDb = new Db(dbName, msDb);
// Load native UDFs into the temporary db.
loadFunctionsFromDbParams(tmpDb, msDb);
// Load Java UDFs from HMS into the temporary db.
loadJavaFunctions(tmpDb, javaFns);
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Load transient functions into the temporary db.
for (Function fn: db.getTransientFunctions()) tmpDb.addFunction(fn);
// Compute the removed functions and remove them from the db.
for (Map.Entry<String, List<Function>> e: db.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
if (tmpDb.getFunction(
fn, Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
removedFuncs.add(fn.toTCatalogObject());
}
}
}
// We will re-add all the functions to the db because it's possible that a
// function was dropped and a different function (for example, the binary is
// different) with the same name and signature was re-added in Hive.
db.removeAllFunctions();
for (Map.Entry<String, List<Function>> e: tmpDb.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
// We do not need to increment and acquire a new catalog version for this
// function here because this already happens when the functions are loaded
// into tmpDb.
db.addFunction(fn);
addedFuncs.add(fn.toTCatalogObject());
}
}
} catch (Exception e) {
throw new CatalogException("Error refreshing functions in " + dbName + ": ", e);
}
}
/**
* Invalidates the database 'db'. This method can have potential race
* conditions with external changes to the Hive metastore and hence any
* conflicting changes to the objects can manifest in the form of exceptions
* from the HMS calls which are appropriately handled. Returns the invalidated
* 'Db' object along with list of tables to be loaded by the TableLoadingMgr.
* Returns null if the method encounters an exception during invalidation.
*/
private Pair<Db, List<TTableName>> invalidateDb(
MetaStoreClient msClient, String dbName, Db existingDb) {
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
new ArrayList<>();
for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
}
org.apache.hadoop.hive.metastore.api.Database msDb =
msClient.getHiveClient().getDatabase(dbName);
Db newDb = new Db(dbName, msDb);
// existingDb is usually null when the Catalog loads for the first time.
// In that case we needn't restore any transient functions.
if (existingDb != null) {
// Restore UDFs that aren't persisted. They are only cleaned up on
// Catalog restart.
for (Function fn: existingDb.getTransientFunctions()) {
newDb.addFunction(fn);
fn.setCatalogVersion(incrementAndGetCatalogVersion());
}
}
// Reload native UDFs.
loadFunctionsFromDbParams(newDb, msDb);
// Reload Java UDFs from HMS.
loadJavaFunctions(newDb, javaFns);
newDb.setCatalogVersion(incrementAndGetCatalogVersion());
List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
if (isBlacklistedTable(dbName, tableName.toLowerCase())) {
LOG.info("skip blacklisted table: " + dbName + "." + tableName);
continue;
}
Table incompleteTbl = IncompleteTable.createUninitializedTable(newDb, tableName);
incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
newDb.addTable(incompleteTbl);
if (loadInBackground_) {
tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
}
}
if (existingDb != null) {
// Identify any removed functions and add them to the delta log.
for (Map.Entry<String, List<Function>> e:
existingDb.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
if (newDb.getFunction(fn,
Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(fn.toTCatalogObject());
}
}
}
// Identify any deleted tables and add them to the delta log
Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
oldTableNames.removeAll(newTableNames);
for (String removedTableName: oldTableNames) {
Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
removedTableName);
removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(removedTable.toTCatalogObject());
}
}
return Pair.create(newDb, tblsToBackgroundLoad);
} catch (Exception e) {
LOG.warn("Encountered an exception while invalidating database: " + dbName +
". Ignoring further load of this db.", e);
}
return null;
}
/**
* Refreshes authorization metadata. When authorization is not enabled, this
* method is a no-op.
*/
public AuthorizationDelta refreshAuthorization(boolean resetVersions)
throws CatalogException {
Preconditions.checkState(authzManager_ != null);
try {
return authzManager_.refreshAuthorization(resetVersions);
} catch (Exception e) {
throw new CatalogException("Error refreshing authorization policy: ", e);
}
}
/**
* Resets this catalog instance by clearing all cached table and database metadata.
* Returns the current catalog version before reset has taken any effect. The
* requesting impalad will use that version to determine when the
* effects of reset have been applied to its local catalog cache.
*/
public long reset() throws CatalogException {
long startVersion = getCatalogVersion();
LOG.info("Invalidating all metadata. Version: " + startVersion);
// First update the policy metadata.
refreshAuthorization(true);
// Even though we get the current notification event id before stopping the event
// processing here there is a small window of time where we could re-process some of
// the event ids, if there is external DDL activity on metastore during reset.
// Unfortunately, there is no good way to avoid this since HMS does not provide
// APIs which can fetch all the tables/databases at a given id. It is OKAY to
// re-process some of these events since event processor relies on creationTime of
// the objects to uniquely identify tables from create and drop events. In case of
// alter events, however it is likely that some tables would be unnecessarily
// invalidated. That would happen when during reset, there were external alter events
// and by the time we processed them, Catalog had already loaded them.
long currentEventId = metastoreEventProcessor_.getCurrentEventId();
// pause the event processing since the cache is anyways being cleared
metastoreEventProcessor_.pause();
// Update the HDFS cache pools
CachePoolReader reader = new CachePoolReader(true);
reader.run();
versionLock_.writeLock().lock();
// In case of an empty new catalog, the version should still change to reflect the
// reset operation itself and to unblock impalads by making the catalog version >
// INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
++catalogVersion_;
// Assign new versions to all the loaded data sources.
for (DataSource dataSource: getDataSources()) {
dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
}
// Update db and table metadata
try {
// Not all Java UDFs are persisted to the metastore. The ones which aren't
// should be restored once the catalog has been invalidated.
Map<String, Db> oldDbCache = dbCache_.get();
// Build a new DB cache, populate it, and replace the existing cache in one
// step.
Map<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
try (MetaStoreClient msClient = getMetaStoreClient()) {
List<String> allDbs = msClient.getHiveClient().getAllDatabases();
int numComplete = 0;
for (String dbName: allDbs) {
if (isBlacklistedDb(dbName)) {
LOG.info("skip blacklisted db: " + dbName);
continue;
}
String annotation = String.format("invalidating metadata - %s/%s dbs complete",
numComplete++, allDbs.size());
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
dbName = dbName.toLowerCase();
Db oldDb = oldDbCache.get(dbName);
// invalidateDb() will return empty table list
// if loadInBackground_ is set to false
Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
dbName, oldDb);
if (invalidatedDb == null) continue;
newDbCache.put(dbName, invalidatedDb.first);
tblsToBackgroundLoad.addAll(invalidatedDb.second);
}
}
}
dbCache_.set(newDbCache);
// Identify any deleted databases and add them to the delta log.
Set<String> oldDbNames = oldDbCache.keySet();
Set<String> newDbNames = newDbCache.keySet();
oldDbNames.removeAll(newDbNames);
for (String dbName: oldDbNames) {
Db removedDb = oldDbCache.get(dbName);
updateDeleteLog(removedDb);
}
// Submit tables for background loading.
for (TTableName tblName: tblsToBackgroundLoad) {
tableLoadingMgr_.backgroundLoad(tblName);
}
} catch (Exception e) {
LOG.error("Error initializing Catalog", e);
throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
} finally {
// It's possible that concurrent reset() gets a startVersion later than us but
// acquires the version lock before us so the lastResetStartVersion_ is already
// bumped. Don't need to update it in this case.
if (lastResetStartVersion_ < startVersion) lastResetStartVersion_ = startVersion;
versionLock_.writeLock().unlock();
// restart the event processing for id just before the reset
metastoreEventProcessor_.start(currentEventId);
}
LOG.info("Invalidated all metadata.");
return startVersion;
}
/**
* Adds a database name to the metadata cache and returns the database's
* new Db object. Used by CREATE DATABASE statements.
*/
public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
Db newDb = new Db(dbName, msDb);
versionLock_.writeLock().lock();
try {
newDb.setCatalogVersion(incrementAndGetCatalogVersion());
addDb(newDb);
return newDb;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a database name to the metadata cache if not exists and returns the
* true is a new Db Object was added. Used by MetastoreEventProcessor to handle
* CREATE_DATABASE events
*/
public boolean addDbIfNotExists(
String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
versionLock_.writeLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
return addDb(dbName, msDb) != null;
}
return false;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Removes a database from the metadata cache and returns the removed database,
* or null if the database did not exist in the cache.
* Used by DROP DATABASE statements.
*/
@Override
public Db removeDb(String dbName) {
versionLock_.writeLock().lock();
try {
Db removedDb = super.removeDb(dbName);
if (removedDb != null) updateDeleteLog(removedDb);
return removedDb;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* @param msDb Metastore Database used to remove Db from Catalog
* @param dbFound Set to true if Database is found in Catalog
* @param dbMatched Set to true if Database is found in Catalog and it's CREATION_TIME
* is equal to the metastore DB
* @return the DB object removed. Return null if DB does not exist or was not removed
* because CREATION_TIME does not match.
*/
public Db removeDbIfExists(org.apache.hadoop.hive.metastore.api.Database msDb,
Reference<Boolean> dbFound, Reference<Boolean> dbMatched) {
dbFound.setRef(false);
dbMatched.setRef(false);
versionLock_.writeLock().lock();
try {
String dbName = msDb.getName();
Db catalogDb = getDb(dbName);
if (catalogDb == null) return null;
dbFound.setRef(true);
// Remove the DB only if the CREATION_TIME matches with the metastore DB from event.
if (msDb.getCreateTime() == catalogDb.getMetaStoreDb().getCreateTime()) {
Db removedDb = removeDb(dbName);
if (removedDb != null) {
dbMatched.setRef(true);
return removedDb;
}
}
return null;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Helper function to clean up the state associated with a removed database. It creates
* the entries in the delete log for 'db' as well as for its tables and functions
* (if any).
*/
private void updateDeleteLog(Db db) {
Preconditions.checkNotNull(db);
Preconditions.checkState(versionLock_.isWriteLockedByCurrentThread());
if (!db.isSystemDb()) {
for (Table tbl: db.getTables()) {
tbl.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(tbl.toMinimalTCatalogObject());
}
for (Function fn: db.getFunctions(null, new PatternMatcher())) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(fn.toTCatalogObject());
}
}
db.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(db.toTCatalogObject());
}
/**
* Adds table with the given db and table name to the catalog if it does not exists.
* @return true if the table was successfully added and false if the table already
* exists
* @throws CatalogException if the db is not found
*/
public boolean addTableIfNotExists(String dbName, String tblName)
throws CatalogException {
versionLock_.writeLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new CatalogException(String.format("Db %s does not exist", dbName));
}
Table existingTable = db.getTable(tblName);
if (existingTable != null) return false;
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(incompleteTable);
return true;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a table with the given name to the catalog and returns the new table.
*/
public Table addIncompleteTable(String dbName, String tblName) {
versionLock_.writeLock().lock();
try {
// IMPALA-9211: get db object after holding the writeLock in case of getting stale
// db object due to concurrent INVALIDATE METADATA
Db db = getDb(dbName);
if (db == null) return null;
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(incompleteTable);
return db.getTable(tblName);
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a table 'table' to the database 'db' and returns the table that was added.
*/
public Table addTable(Db db, Table table) {
versionLock_.writeLock().lock();
try {
Preconditions.checkNotNull(db).addTable(Preconditions.checkNotNull(table));
} finally {
versionLock_.writeLock().unlock();
}
return table;
}
/**
* Gets the table with the given name, loading it if needed (if the existing catalog
* object is not yet loaded). Returns the matching Table or null if no table with this
* name exists in the catalog.
* If the existing table is dropped or modified (indicated by the catalog version
* changing) while the load is in progress, the loaded value will be discarded
* and the current cached value will be returned. This may mean that a missing table
* (not yet loaded table) will be returned.
*/
public Table getOrLoadTable(String dbName, String tblName, String reason)
throws CatalogException {
TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
TableLoadingMgr.LoadRequest loadReq;
long previousCatalogVersion;
// Return the table if it is already loaded or submit a new load request.
versionLock_.readLock().lock();
try {
Table tbl = getTable(dbName, tblName);
if (tbl == null || tbl.isLoaded()) return tbl;
previousCatalogVersion = tbl.getCatalogVersion();
loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
} finally {
versionLock_.readLock().unlock();
}
Preconditions.checkNotNull(loadReq);
try {
// The table may have been dropped/modified while the load was in progress, so only
// apply the update if the existing table hasn't changed.
return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
} finally {
loadReq.close();
}
}
/**
* Replaces an existing Table with a new value if it exists and has not changed
* (has the same catalog version as 'expectedCatalogVersion').
*/
private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
throws DatabaseNotFoundException {
versionLock_.writeLock().lock();
try {
Db db = getDb(updatedTbl.getDb().getName());
if (db == null) {
throw new DatabaseNotFoundException(
"Database does not exist: " + updatedTbl.getDb().getName());
}
Table existingTbl = db.getTable(updatedTbl.getName());
// The existing table does not exist or has been modified. Instead of
// adding the loaded value, return the existing table.
if (existingTbl == null ||
existingTbl.getCatalogVersion() != expectedCatalogVersion) return existingTbl;
updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(updatedTbl);
return updatedTbl;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Remove a catalog table based on the given metastore table if it exists and its
* createTime matches with the metastore table
*
* @param msTable Metastore table to be used to remove Table
* @param tblWasfound is set to true if the table was found in the catalog
* @param tblMatched is set to true if the table is found and it matched with the
* createTime of the cached metastore table in catalog or if the existing table is a
* incomplete table
* @return Removed table object. Return null if the table was not removed
*/
public Table removeTableIfExists(org.apache.hadoop.hive.metastore.api.Table msTable,
Reference<Boolean> tblWasfound, Reference<Boolean> tblMatched) {
tblWasfound.setRef(false);
tblMatched.setRef(false);
// make sure that the createTime of the input table is valid
Preconditions.checkState(msTable.getCreateTime() > 0);
versionLock_.writeLock().lock();
try {
Db db = getDb(msTable.getDbName());
if (db == null) return null;
Table tblToBeRemoved = db.getTable(msTable.getTableName());
if (tblToBeRemoved == null) return null;
tblWasfound.setRef(true);
// make sure that you are removing the same instance of the table object which
// is given by comparing the metastore createTime. In case the found table is a
// Incomplete table remove it
if (tblToBeRemoved instanceof IncompleteTable
|| (msTable.getCreateTime()
== tblToBeRemoved.getMetaStoreTable().getCreateTime())) {
tblMatched.setRef(true);
Table removedTbl = db.removeTable(tblToBeRemoved.getName());
removedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(removedTbl.toMinimalTCatalogObject());
return removedTbl;
}
return null;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Removes a table from the catalog and increments the catalog version.
* Returns the removed Table, or null if the table or db does not exist.
*/
public Table removeTable(String dbName, String tblName) {
Db parentDb = getDb(dbName);
if (parentDb == null) return null;
versionLock_.writeLock().lock();
try {
Table removedTable = parentDb.removeTable(tblName);
if (removedTable != null) {
removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(removedTable.toMinimalTCatalogObject());
}
return removedTable;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Removes a function from the catalog. Increments the catalog version and returns
* the Function object that was removed. If the function did not exist, null will
* be returned.
*/
@Override
public Function removeFunction(Function desc) {
versionLock_.writeLock().lock();
try {
Function removedFn = super.removeFunction(desc);
if (removedFn != null) {
removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(removedFn.toTCatalogObject());
}
return removedFn;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a function from the catalog, incrementing the catalog version. Returns true if
* the add was successful, false otherwise.
*/
@Override
public boolean addFunction(Function fn) {
Db db = getDb(fn.getFunctionName().getDb());
if (db == null) return false;
versionLock_.writeLock().lock();
try {
if (db.addFunction(fn)) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
return true;
}
} finally {
versionLock_.writeLock().unlock();
}
return false;
}
/**
* Adds a data source to the catalog, incrementing the catalog version. Returns true
* if the add was successful, false otherwise.
*/
@Override
public boolean addDataSource(DataSource dataSource) {
versionLock_.writeLock().lock();
try {
if (dataSources_.add(dataSource)) {
dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
return true;
}
} finally {
versionLock_.writeLock().unlock();
}
return false;
}
@Override
public DataSource removeDataSource(String dataSourceName) {
versionLock_.writeLock().lock();
try {
DataSource dataSource = dataSources_.remove(dataSourceName);
if (dataSource != null) {
dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(dataSource.toTCatalogObject());
}
return dataSource;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Renames a table. Equivalent to an atomic drop + add of the table. Returns
* a pair of tables containing the removed table (or null if the table drop was not
* successful) and the new table (or null if either the drop of the old one or the
* add of the new table was not successful). Depending on the return value, the catalog
* cache is in one of the following states:
* 1. null, null: Old table was not removed and new table was not added.
* 2. null, T_new: Invalid configuration
* 3. T_old, null: Old table was removed but new table was not added.
* 4. T_old, T_new: Old table was removed and new table was added.
*/
public Pair<Table, Table> renameTable(
TTableName oldTableName, TTableName newTableName) {
// Remove the old table name from the cache and add the new table.
Db db = getDb(oldTableName.getDb_name());
if (db == null) return null;
versionLock_.writeLock().lock();
try {
Table oldTable =
removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
if (oldTable == null) return Pair.create(null, null);
return Pair.create(oldTable,
addIncompleteTable(newTableName.getDb_name(), newTableName.getTable_name()));
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Renames the table by atomically removing oldTable and adding the newTable.
* @return true if oldTable was removed and newTable was added, false if oldTable or
* either of oldDb or newDb is not in catalog.
*/
public boolean renameTableIfExists(TTableName oldTableName,
TTableName newTableName) {
boolean tableRenamed = false;
versionLock_.writeLock().lock();
try {
Db oldDb = getDb(oldTableName.db_name);
Db newDb = getDb(newTableName.db_name);
if (oldDb != null && newDb != null) {
Table existingTable = removeTable(oldTableName.db_name, oldTableName.table_name);
// Add the newTable only if oldTable existed.
if (existingTable != null) {
Table incompleteTable = IncompleteTable.createUninitializedTable(newDb,
newTableName.getTable_name());
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
newDb.addTable(incompleteTable);
tableRenamed = true;
}
}
return tableRenamed;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Reloads metadata for table 'tbl' which must not be an IncompleteTable. Updates the
* table metadata in-place by calling load() on the given table. Returns the
* TCatalogObject representing 'tbl'. Applies proper synchronization to protect the
* metadata load from concurrent table modifications and assigns a new catalog version.
* Throws a CatalogException if there is an error loading table metadata.
*/
public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
String tblName = tbl.getName();
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error refreshing metadata for table " +
"%s due to lock contention", tbl.getFullName()));
}
final Timer.Context context =
tbl.getMetrics().getTimer(Table.REFRESH_DURATION_METRIC).time();
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
versionLock_.writeLock().unlock();
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl = null;
Stopwatch hmsLoadSW = new Stopwatch().start();
long hmsLoadTime;
try {
msTbl = msClient.getHiveClient().getTable(dbName, tblName);
} catch (Exception e) {
throw new TableLoadingException("Error loading metadata for table: " +
dbName + "." + tblName, e);
} finally {
hmsLoadTime = hmsLoadSW.elapsed(TimeUnit.NANOSECONDS);
}
tbl.updateHMSLoadTableSchemaTime(hmsLoadTime);
tbl.load(true, msClient.getHiveClient(), msTbl, reason);
}
tbl.setCatalogVersion(newCatalogVersion);
LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
return tbl.toTCatalogObject();
} finally {
context.stop();
Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
}
}
/**
* Drops the partitions specified in 'partitionSet' from 'tbl'. Throws a
* CatalogException if 'tbl' is not an HdfsTable. Returns the target table.
*/
public Table dropPartitions(Table tbl, List<List<TPartitionKeyValue>> partitionSet)
throws CatalogException {
Preconditions.checkNotNull(tbl);
Preconditions.checkNotNull(partitionSet);
Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
if (!(tbl instanceof HdfsTable)) {
throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table");
}
HdfsTable hdfsTable = (HdfsTable) tbl;
List<HdfsPartition> partitions =
hdfsTable.getPartitionsFromPartitionSet(partitionSet);
hdfsTable.dropPartitions(partitions);
return hdfsTable;
}
/**
* Adds a partition to its HdfsTable and returns the modified table.
*/
public Table addPartition(HdfsPartition partition) throws CatalogException {
Preconditions.checkNotNull(partition);
HdfsTable hdfsTable = partition.getTable();
hdfsTable.addPartition(partition);
return hdfsTable;
}
/**
* Invalidates the table in the catalog cache, potentially adding/removing the table
* from the cache based on whether it exists in the Hive Metastore.
* The invalidation logic is:
* - If the table exists in the Metastore, add it to the catalog as an uninitialized
* IncompleteTable (replacing any existing entry). The table metadata will be
* loaded lazily, on the next access. If the parent database for this table does not
* yet exist in Impala's cache it will also be added.
* - If the table does not exist in the Metastore, remove it from the catalog cache.
* - If we are unable to determine whether the table exists in the Metastore (there was
* an exception thrown making the RPC), invalidate any existing Table by replacing
* it with an uninitialized IncompleteTable.
* Returns the thrift representation of the added/updated/removed table, or null if
* the table was not present in the catalog cache or the Metastore.
* Sets tblWasRemoved to true if the table was absent from the Metastore and it was
* removed from the catalog cache.
* Sets dbWasAdded to true if both a new database and table were added to the catalog
* cache.
*/
public TCatalogObject invalidateTable(TTableName tableName,
Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded) {
tblWasRemoved.setRef(false);
dbWasAdded.setRef(false);
String dbName = tableName.getDb_name();
String tblName = tableName.getTable_name();
if (isBlacklistedTable(dbName, tblName)) {
LOG.info("Skip invalidating blacklisted table: " + tableName);
return null;
}
LOG.info(String.format("Invalidating table metadata: %s.%s", dbName, tblName));
// Stores whether the table exists in the metastore. Can have three states:
// 1) true - Table exists in metastore.
// 2) false - Table does not exist in metastore.
// 3) unknown (null) - There was exception thrown by the metastore client.
Boolean tableExistsInMetaStore;
Db db = null;
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Database msDb = null;
try {
tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
} catch (UnknownDBException e) {
// The parent database does not exist in the metastore. Treat this the same
// as if the table does not exist.
tableExistsInMetaStore = false;
} catch (TException e) {
LOG.error("Error executing tableExists() metastore call: " + tblName, e);
tableExistsInMetaStore = null;
}
if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
Table result = removeTable(dbName, tblName);
if (result == null) return null;
tblWasRemoved.setRef(true);
result.getLock().lock();
try {
return result.toTCatalogObject();
} finally {
result.getLock().unlock();
}
}
db = getDb(dbName);
if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) {
// The table does not exist in our cache AND it is unknown whether the
// table exists in the Metastore. Do nothing.
return null;
} else if (db == null && tableExistsInMetaStore) {
// The table exists in the Metastore, but our cache does not contain the parent
// database. A new db will be added to the cache along with the new table. msDb
// must be valid since tableExistsInMetaStore is true.
try {
msDb = msClient.getHiveClient().getDatabase(dbName);
Preconditions.checkNotNull(msDb);
addDb(dbName, msDb);
dbWasAdded.setRef(true);
} catch (TException e) {
// The Metastore database cannot be get. Log the error and return.
LOG.error("Error executing getDatabase() metastore call: " + dbName, e);
return null;
}
}
}
// Add a new uninitialized table to the table cache, effectively invalidating
// any existing entry. The metadata for the table will be loaded lazily, on the
// on the next access to the table.
Table newTable = addIncompleteTable(dbName, tblName);
Preconditions.checkNotNull(newTable);
if (loadInBackground_) {
tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
tblName.toLowerCase()));
}
if (dbWasAdded.getRef()) {
// The database should always have a lower catalog version than the table because
// it needs to be created before the table can be added.
Db addedDb = newTable.getDb();
Preconditions.checkNotNull(addedDb);
Preconditions.checkState(
addedDb.getCatalogVersion() < newTable.getCatalogVersion());
}
return newTable.toTCatalogObject();
}
/**
* Invalidate the table if it exists by overwriting existing entry by a Incomplete
* Table.
* @return null if the table does not exist else return the invalidated table
*/
public Table invalidateTableIfExists(String dbName, String tblName) {
Table incompleteTable;
versionLock_.writeLock().lock();
try {
Db db = getDb(dbName);
if (db == null) return null;
if (!db.containsTable(tblName)) return null;
incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(incompleteTable);
} finally {
versionLock_.writeLock().unlock();
}
if (loadInBackground_) {
tableLoadingMgr_.backgroundLoad(
new TTableName(dbName.toLowerCase(), tblName.toLowerCase()));
}
return incompleteTable;
}
/**
* Refresh partition if it exists.
*
* @return true if partition was reloaded, else false.
* @throws CatalogException if partition reload threw an error.
* @throws DatabaseNotFoundException if Db doesn't exist.
* @throws TableNotFoundException if table doesn't exist.
* @throws TableNotLoadedException if table is not loaded in Catalog.
*/
public boolean reloadPartitionIfExists(String dbName, String tblName,
List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
Table table = getTable(dbName, tblName);
if (table == null) {
throw new TableNotFoundException(dbName + "." + tblName + " not found");
}
if (table instanceof IncompleteTable) {
throw new TableNotLoadedException(dbName + "." + tblName + " is not loaded");
}
Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
reloadPartition(table, tPartSpec, wasPartitionRefreshed, reason);
return wasPartitionRefreshed.getRef();
}
/**
* Refresh table if exists. Returns true if reloadTable() succeeds, false
* otherwise. Throws CatalogException if reloadTable() is unsuccessful. Throws
* DatabaseNotFoundException if Db doesn't exist.
*/
public boolean reloadTableIfExists(String dbName, String tblName, String reason)
throws CatalogException {
Table table = getTable(dbName, tblName);
if (table == null || table instanceof IncompleteTable) return false;
reloadTable(table, reason);
return true;
}
/**
* Update DB if it exists in catalog. Returns true if updateDb() succeeds, false
* otherwise.
*/
public boolean updateDbIfExists(Database msdb) {
try {
updateDb(msdb);
} catch (DatabaseNotFoundException e) {
return false;
}
return true;
}
/**
* Adds a new role with the given name and grant groups to the AuthorizationPolicy.
* If a role with the same name already exists it will be overwritten.
*/
public Role addRole(String roleName, Set<String> grantGroups) {
Principal role = addPrincipal(roleName, grantGroups, TPrincipalType.ROLE);
Preconditions.checkState(role instanceof Role);
return (Role) role;
}
/**
* Adds a new user with the given name to the AuthorizationPolicy.
* If a user with the same name already exists it will be overwritten.
*/
public User addUser(String userName) {
Principal user = addPrincipal(userName, new HashSet<>(),
TPrincipalType.USER);
Preconditions.checkState(user instanceof User);
return (User) user;
}
/**
* Add a user to the catalog if it doesn't exist. This is necessary so privileges
* can be added for a user. example: owner privileges.
*/
public User addUserIfNotExists(String owner, Reference<Boolean> existingUser) {
versionLock_.writeLock().lock();
try {
User user = getAuthPolicy().getUser(owner);
existingUser.setRef(Boolean.TRUE);
if (user == null) {
user = addUser(owner);
existingUser.setRef(Boolean.FALSE);
}
return user;
} finally {
versionLock_.writeLock().unlock();
}
}
private Principal addPrincipal(String principalName, Set<String> grantGroups,
TPrincipalType type) {
versionLock_.writeLock().lock();
try {
Principal principal = Principal.newInstance(principalName, type, grantGroups);
principal.setCatalogVersion(incrementAndGetCatalogVersion());
authPolicy_.addPrincipal(principal);
return principal;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Removes the role with the given name from the AuthorizationPolicy. Returns the
* removed role with an incremented catalog version, or null if no role with this name
* exists.
*/
public Role removeRole(String roleName) {
Principal role = removePrincipal(roleName, TPrincipalType.ROLE);
if (role == null) return null;
Preconditions.checkState(role instanceof Role);
return (Role) role;
}
/**
* Removes the user with the given name from the AuthorizationPolicy. Returns the
* removed user with an incremented catalog version, or null if no user with this name
* exists.
*/
public User removeUser(String userName) {
Principal user = removePrincipal(userName, TPrincipalType.USER);
if (user == null) return null;
Preconditions.checkState(user instanceof User);
return (User) user;
}
private Principal removePrincipal(String principalName, TPrincipalType type) {
versionLock_.writeLock().lock();
try {
Principal principal = authPolicy_.removePrincipal(principalName, type);
// TODO(todd): does this end up leaking the privileges associated
// with this principal into the CatalogObjectVersionSet on the catalogd?
if (principal == null) return null;
for (PrincipalPrivilege priv: principal.getPrivileges()) {
priv.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(priv.toTCatalogObject());
}
principal.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(principal.toTCatalogObject());
return principal;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a grant group to the given role name and returns the modified Role with
* an updated catalog version. If the role does not exist a CatalogException is thrown.
*/
public Role addRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
versionLock_.writeLock().lock();
try {
Role role = authPolicy_.addRoleGrantGroup(roleName, groupName);
Preconditions.checkNotNull(role);
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Removes a grant group from the given role name and returns the modified Role with
* an updated catalog version. If the role does not exist a CatalogException is thrown.
*/
public Role removeRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
versionLock_.writeLock().lock();
try {
Role role = authPolicy_.removeRoleGrantGroup(roleName, groupName);
Preconditions.checkNotNull(role);
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Adds a privilege to the given role name. Returns the new PrincipalPrivilege and
* increments the catalog version. If the parent role does not exist a CatalogException
* is thrown.
*/
public PrincipalPrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
throws CatalogException {
Preconditions.checkArgument(thriftPriv.getPrincipal_type() == TPrincipalType.ROLE);
return addPrincipalPrivilege(roleName, thriftPriv, TPrincipalType.ROLE);
}
/**
* Adds a privilege to the given user name. Returns the new PrincipalPrivilege and
* increments the catalog version. If the user does not exist a CatalogException is
* thrown.
*/
public PrincipalPrivilege addUserPrivilege(String userName, TPrivilege thriftPriv)
throws CatalogException {
Preconditions.checkArgument(thriftPriv.getPrincipal_type() == TPrincipalType.USER);
return addPrincipalPrivilege(userName, thriftPriv, TPrincipalType.USER);
}
private PrincipalPrivilege addPrincipalPrivilege(String principalName,
TPrivilege thriftPriv, TPrincipalType type) throws CatalogException {
versionLock_.writeLock().lock();
try {
Principal principal = authPolicy_.getPrincipal(principalName, type);
if (principal == null) {
throw new CatalogException(String.format("%s does not exist: %s",
Principal.toString(type), principalName));
}
PrincipalPrivilege priv = PrincipalPrivilege.fromThrift(thriftPriv);
priv.setCatalogVersion(incrementAndGetCatalogVersion());
authPolicy_.addPrivilege(priv);
return priv;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Removes a PrincipalPrivilege from the given role name and privilege name. Returns
* the removed PrincipalPrivilege with an incremented catalog version or null if no
* matching privilege was found. Throws a CatalogException if no role exists with this
* name.
*/
public PrincipalPrivilege removeRolePrivilege(String roleName, String privilegeName)
throws CatalogException {
return removePrincipalPrivilege(roleName, privilegeName, TPrincipalType.ROLE);
}
/**
* Removes a PrincipalPrivilege from the given user name and privilege name. Returns
* the removed PrincipalPrivilege with an incremented catalog version or null if no
* matching privilege was found. Throws a CatalogException if no user exists with this
* name.
*/
public PrincipalPrivilege removeUserPrivilege(String userName, String privilegeName)
throws CatalogException {
return removePrincipalPrivilege(userName, privilegeName, TPrincipalType.USER);
}
private PrincipalPrivilege removePrincipalPrivilege(String principalName,
String privilegeName, TPrincipalType type) throws CatalogException {
versionLock_.writeLock().lock();
try {
Principal principal = authPolicy_.getPrincipal(principalName, type);
if (principal == null) {
throw new CatalogException(String.format("%s does not exist: %s",
Principal.toString(type), principalName));
}
PrincipalPrivilege principalPrivilege = principal.removePrivilege(privilegeName);
if (principalPrivilege == null) return null;
principalPrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(principalPrivilege.toTCatalogObject());
return principalPrivilege;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Gets a PrincipalPrivilege from the given principal name. Returns the privilege
* if it exists, or null if no privilege matching the privilege spec exist.
* Throws a CatalogException if the principal does not exist.
*/
public PrincipalPrivilege getPrincipalPrivilege(String principalName,
TPrivilege privSpec) throws CatalogException {
String privilegeName = PrincipalPrivilege.buildPrivilegeName(privSpec);
versionLock_.readLock().lock();
try {
Principal principal = authPolicy_.getPrincipal(principalName,
privSpec.getPrincipal_type());
if (principal == null) {
throw new CatalogException(Principal.toString(privSpec.getPrincipal_type()) +
" does not exist: " + principalName);
}
return principal.getPrivilege(privilegeName);
} finally {
versionLock_.readLock().unlock();
}
}
@Override
public AuthzCacheInvalidation getAuthzCacheInvalidation(String markerName) {
versionLock_.readLock().lock();
try {
return authzCacheInvalidation_.get(markerName);
} finally {
versionLock_.readLock().unlock();;
}
}
/**
* Gets the {@link AuthzCacheInvalidation} for a given marker name or creates a new
* {@link AuthzCacheInvalidation} if it does not exist and increment the catalog
* version of {@link AuthzCacheInvalidation}. A catalog version update indicates a
* an authorization cache invalidation notification.
*
* @param markerName the authorization cache invalidation marker name
* @return the updated {@link AuthzCacheInvalidation} instance
*/
public AuthzCacheInvalidation incrementAuthzCacheInvalidationVersion(
String markerName) {
versionLock_.writeLock().lock();
try {
AuthzCacheInvalidation authzCacheInvalidation = getAuthzCacheInvalidation(
markerName);
if (authzCacheInvalidation == null) {
authzCacheInvalidation = new AuthzCacheInvalidation(markerName);
authzCacheInvalidation_.add(authzCacheInvalidation);
}
authzCacheInvalidation.setCatalogVersion(incrementAndGetCatalogVersion());
return authzCacheInvalidation;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Increments the current Catalog version and returns the new value.
*/
public long incrementAndGetCatalogVersion() {
versionLock_.writeLock().lock();
try {
return ++catalogVersion_;
} finally {
versionLock_.writeLock().unlock();
}
}
/**
* Returns the current Catalog version.
*/
public long getCatalogVersion() {
versionLock_.readLock().lock();
try {
return catalogVersion_;
} finally {
versionLock_.readLock().unlock();
}
}
public ReentrantReadWriteLock getLock() { return versionLock_; }
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
/**
* Reloads metadata for the partition defined by the partition spec
* 'partitionSpec' in table 'tbl'. Returns the resulting table's TCatalogObject after
* the partition metadata was reloaded.
*/
public TCatalogObject reloadPartition(Table tbl,
List<TPartitionKeyValue> partitionSpec,
Reference<Boolean> wasPartitionReloaded, String reason) throws CatalogException {
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error reloading partition of table %s " +
"due to lock contention", tbl.getFullName()));
}
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
versionLock_.writeLock().unlock();
HdfsTable hdfsTable = (HdfsTable) tbl;
wasPartitionReloaded.setRef(false);
HdfsPartition hdfsPartition = hdfsTable
.getPartitionFromThriftPartitionSpec(partitionSpec);
// Retrieve partition name from existing partition or construct it from
// the partition spec
String partitionName = hdfsPartition == null
? HdfsTable.constructPartitionName(partitionSpec)
: hdfsPartition.getPartitionName();
LOG.info(String.format("Refreshing partition metadata: %s %s (%s)",
hdfsTable.getFullName(), partitionName, reason));
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
try {
hmsPartition = msClient.getHiveClient().getPartition(
hdfsTable.getDb().getName(), hdfsTable.getName(), partitionName);
} catch (NoSuchObjectException e) {
// If partition does not exist in Hive Metastore, remove it from the
// catalog
if (hdfsPartition != null) {
hdfsTable.dropPartition(partitionSpec);
hdfsTable.setCatalogVersion(newCatalogVersion);
// non-existing partition was dropped from catalog, so we mark it as refreshed
wasPartitionReloaded.setRef(true);
} else {
LOG.info(String.format("Partition metadata for %s was not refreshed since "
+ "it does not exist in metastore anymore",
hdfsTable.getFullName() + " " + partitionName));
}
return hdfsTable.toTCatalogObject();
} catch (Exception e) {
throw new CatalogException("Error loading metadata for partition: "
+ hdfsTable.getFullName() + " " + partitionName, e);
}
hdfsTable.reloadPartition(msClient.getHiveClient(), hdfsPartition, hmsPartition);
}
hdfsTable.setCatalogVersion(newCatalogVersion);
wasPartitionReloaded.setRef(true);
LOG.info(String.format("Refreshed partition metadata: %s %s",
hdfsTable.getFullName(), partitionName));
return hdfsTable.toTCatalogObject();
} finally {
Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
}
}
public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
/**
* Returns the version of the topic update that an operation using SYNC_DDL must wait
* for in order to ensure that its result set ('result') has been broadcast to all the
* coordinators. For operations that don't produce a result set, e.g. INVALIDATE
* METADATA, return the version specified in 'result.version'.
*/
public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
if (!result.isSetUpdated_catalog_objects() &&
!result.isSetRemoved_catalog_objects()) {
return result.getVersion();
}
long lastSentTopicUpdate = lastSentTopicUpdate_.get();
// Maximum number of attempts (topic updates) to find the catalog topic version that
// an operation using SYNC_DDL must wait for.
long maxNumAttempts = 5;
if (result.isSetUpdated_catalog_objects()) {
maxNumAttempts = Math.max(maxNumAttempts,
result.getUpdated_catalog_objects().size() *
(MAX_NUM_SKIPPED_TOPIC_UPDATES + 1));
}
long numAttempts = 0;
long begin = System.currentTimeMillis();
long versionToWaitFor = -1;
while (versionToWaitFor == -1) {
if (LOG.isTraceEnabled()) {
LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
}
// Examine the topic update log to determine the latest topic update that
// covers the added/modified/deleted objects in 'result'.
long topicVersionForUpdates =
getCoveringTopicUpdateVersion(result.getUpdated_catalog_objects());
long topicVersionForDeletes =
getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects());
if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
LOG.info("Topic version for {} not found yet. Last sent topic version: {}. " +
"Updated objects: {}, deleted objects: {}",
topicVersionForUpdates == -1 ? "updates" : "deletes",
lastSentTopicUpdate_.get(),
FeCatalogUtils.debugString(result.updated_catalog_objects),
FeCatalogUtils.debugString(result.removed_catalog_objects));
// Wait for the next topic update.
synchronized(topicUpdateLog_) {
try {
topicUpdateLog_.wait(TOPIC_UPDATE_WAIT_TIMEOUT_MS);
} catch (InterruptedException e) {
// Ignore
}
}
long currentTopicUpdate = lastSentTopicUpdate_.get();
// Don't count time-based exits from the wait() toward the maxNumAttempts
// threshold.
if (lastSentTopicUpdate != currentTopicUpdate) {
++numAttempts;
if (numAttempts > maxNumAttempts) {
LOG.error(String.format("Couldn't retrieve the covering topic version for "
+ "catalog objects. Updated objects: %s, deleted objects: %s",
FeCatalogUtils.debugString(result.updated_catalog_objects),
FeCatalogUtils.debugString(result.removed_catalog_objects)));
throw new CatalogException("Couldn't retrieve the catalog topic version " +
"for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
"The operation has been successfully executed but its effects may have " +
"not been broadcast to all the coordinators.");
}
lastSentTopicUpdate = currentTopicUpdate;
}
} else {
versionToWaitFor = Math.max(topicVersionForDeletes, topicVersionForUpdates);
}
}
Preconditions.checkState(versionToWaitFor >= 0);
LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " +
versionToWaitFor + ". Time to identify topic version (msec): " +
(System.currentTimeMillis() - begin));
return versionToWaitFor;
}
/**
* Returns the version of the topic update that covers a set of TCatalogObjects.
* A topic update U covers a TCatalogObject T, corresponding to a catalog object O,
* if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >=
* last_topic_update(O). The first condition indicates that a version of O that is
* larger or equal to the version in T has been added to a topic update. The second
* condition indicates that U is either the update to include O or an update following
* the one to include O. Returns -1 if there is a catalog object in 'tCatalogObjects'
* which doesn't satisfy the above conditions.
*/
private long getCoveringTopicUpdateVersion(List<TCatalogObject> tCatalogObjects) {
if (tCatalogObjects == null || tCatalogObjects.isEmpty()) {
return lastSentTopicUpdate_.get();
}
long versionToWaitFor = -1;
for (TCatalogObject tCatalogObject: tCatalogObjects) {
String key = Catalog.toCatalogObjectKey(tCatalogObject);
TopicUpdateLog.Entry topicUpdateEntry = topicUpdateLog_.get(key);
// There are two reasons for which a topic update log entry cannot be found:
// a) It corresponds to a new catalog object that hasn't been processed by a catalog
// update yet.
// b) It corresponds to a catalog object that hasn't been modified for at least
// TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
// collected.
// In both cases, -1 is returned to indicate that we're waiting for the
// entry to show up in the topic update log.
if (topicUpdateEntry == null) return -1;
if (topicUpdateEntry.getLastSentVersion() < tCatalogObject.getCatalog_version()) {
// TODO: This may be too verbose. Remove this after IMPALA-9135 is fixed.
LOG.info("Should wait for next update for {}: older version {} is sent. " +
"Expects a version >= {}.", key,
topicUpdateEntry.getLastSentVersion(), tCatalogObject.getCatalog_version());
return -1;
}
versionToWaitFor =
Math.max(versionToWaitFor, topicUpdateEntry.getLastSentCatalogUpdate());
}
return versionToWaitFor;
}
/**
* Retrieves information about the current catalog usage including:
* 1. the tables with the most frequently accessed.
* 2. the tables with the highest memory requirements.
* 3. the tables with the highest file counts.
* 4. the tables with the longest table loading time.
*/
public TGetCatalogUsageResponse getCatalogUsage() {
TGetCatalogUsageResponse usage = new TGetCatalogUsageResponse();
usage.setLarge_tables(new ArrayList<>());
usage.setFrequently_accessed_tables(new ArrayList<>());
usage.setHigh_file_count_tables(new ArrayList<>());
usage.setLong_metadata_loading_tables(new ArrayList<>());
for (Table largeTable: CatalogUsageMonitor.INSTANCE.getLargestTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(largeTable.getTableName().toThrift());
tableUsageMetrics.setMemory_estimate_bytes(largeTable.getEstimatedMetadataSize());
usage.addToLarge_tables(tableUsageMetrics);
}
for (Table frequentTable:
CatalogUsageMonitor.INSTANCE.getFrequentlyAccessedTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(frequentTable.getTableName().toThrift());
tableUsageMetrics.setNum_metadata_operations(frequentTable.getMetadataOpsCount());
usage.addToFrequently_accessed_tables(tableUsageMetrics);
}
for (Table mostFilesTable:
CatalogUsageMonitor.INSTANCE.getHighFileCountTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(mostFilesTable.getTableName().toThrift());
tableUsageMetrics.setNum_files(mostFilesTable.getNumFiles());
usage.addToHigh_file_count_tables(tableUsageMetrics);
}
for (Table longestLoadingTable:
CatalogUsageMonitor.INSTANCE.getLongMetadataLoadingTables()) {
TTableUsageMetrics tableUsageMetrics =
new TTableUsageMetrics(longestLoadingTable.getTableName().toThrift());
tableUsageMetrics.setMedian_table_loading_ns(
longestLoadingTable.getMedianTableLoadingTime());
tableUsageMetrics.setMax_table_loading_ns(
longestLoadingTable.getMaxTableLoadingTime());
tableUsageMetrics.setP75_loading_time_ns(
longestLoadingTable.get75TableLoadingTime());
tableUsageMetrics.setP95_loading_time_ns(
longestLoadingTable.get95TableLoadingTime());
tableUsageMetrics.setP99_loading_time_ns(
longestLoadingTable.get99TableLoadingTime());
tableUsageMetrics.setNum_table_loading(
longestLoadingTable.getTableLoadingCounts());
usage.addToLong_metadata_loading_tables(tableUsageMetrics);
}
return usage;
}
/**
* Gets the events processor metrics. Used for publishing metrics on the webUI
*/
public TEventProcessorMetrics getEventProcessorMetrics() {
return metastoreEventProcessor_.getEventProcessorMetrics();
}
/**
* Gets the events processor summary. Used for populating the contents of the events
* processor detailed view page
*/
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
return metastoreEventProcessor_.getEventProcessorSummary();
}
/**
* Retrieves the stored metrics of the specified table and returns a pretty-printed
* string representation. Throws an exception if table metrics were not available
* because the table was not loaded or because another concurrent operation was holding
* the table lock.
*/
public String getTableMetrics(TTableName tTableName) throws CatalogException {
String dbName = tTableName.db_name;
String tblName = tTableName.table_name;
Table tbl = getTable(dbName, tblName);
if (tbl == null) {
throw new CatalogException("Table " + dbName + "." + tblName + " was not found.");
}
String result;
if (tbl instanceof IncompleteTable) {
result = "No metrics available for table " + dbName + "." + tblName +
". Table not yet loaded.";
return result;
}
if (!tbl.getLock().tryLock()) {
result = "Metrics for table " + dbName + "." + tblName + "are not available " +
"because the table is currently modified by another operation.";
return result;
}
try {
return tbl.getMetrics().toString();
} finally {
tbl.getLock().unlock();
}
}
/**
* A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
* invocations.
*/
public TGetPartialCatalogObjectResponse getPartialCatalogObject(
TGetPartialCatalogObjectRequest req) throws CatalogException {
try {
if (!partialObjectFetchAccess_.tryAcquire(1,
PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S, TimeUnit.SECONDS)) {
// Timed out trying to acquire the semaphore permit.
throw new CatalogException("Timed out while fetching partial object metadata. " +
"Please check the metric 'catalog.partial-fetch-rpc.queue-len' for the " +
"current queue length and consider increasing " +
"'catalog_partial_fetch_rpc_queue_timeout_s' and/or " +
"'catalog_max_parallel_partial_fetch_rpc'");
}
// Acquired the permit at this point, should be released before we exit out of
// this method.
//
// Is there a chance that this thread can get interrupted at this point before it
// enters the try block, eventually leading to the semaphore permit not
// getting released? It can probably happen if the JVM is already in a bad shape.
// In the worst case, every permit is blocked and the subsequent requests throw
// the timeout exception and the user can monitor the queue metric to see that it
// is full, so the issue should be easy to diagnose.
// TODO: Figure out if such a race is possible.
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
"Get Partial Catalog Object - " +
Catalog.toCatalogObjectKey(req.object_desc))) {
return doGetPartialCatalogObject(req);
} finally {
partialObjectFetchAccess_.release();
}
} catch (InterruptedException e) {
throw new CatalogException("Error running getPartialCatalogObject(): ", e);
}
}
/**
* Gets the id for this catalog service
*/
public String getCatalogServiceId() {
return TUniqueIdUtil.PrintId(catalogServiceId_).intern();
}
/**
* Returns the number of currently running partial RPCs.
*/
@VisibleForTesting
public int getConcurrentPartialRpcReqCount() {
// Calculated based on number of currently available semaphore permits.
return MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT - partialObjectFetchAccess_
.availablePermits();
}
/**
* Return a partial view of information about a given catalog object. This services
* the CatalogdMetaProvider running on impalads when they are configured in
* "local-catalog" mode. If required objects are not present, for example, the database
* from which a table is requested, the types of the missing objects will be set in the
* response's lookup_status.
*/
private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
TGetPartialCatalogObjectRequest req) throws CatalogException {
TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
"missing object_desc");
switch (objectDesc.type) {
case CATALOG:
return getPartialCatalogInfo(req);
case DATABASE:
TDatabase dbDesc = Preconditions.checkNotNull(req.object_desc.db);
versionLock_.readLock().lock();
try {
Db db = getDb(dbDesc.getDb_name());
if (db == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
}
return db.getPartialInfo(req);
} finally {
versionLock_.readLock().unlock();
}
case TABLE:
case VIEW: {
Table table;
try {
table = getOrLoadTable(
objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
"needed by coordinator");
} catch (DatabaseNotFoundException e) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
}
if (table == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND);
} else if (!table.isLoaded()) {
// Table can still remain in an incomplete state if there was a concurrent
// invalidate request.
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
}
// TODO(todd): consider a read-write lock here.
table.getLock().lock();
try {
return table.getPartialInfo(req);
} finally {
table.getLock().unlock();
}
}
case FUNCTION: {
versionLock_.readLock().lock();
try {
Db db = getDb(objectDesc.fn.name.db_name);
if (db == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
}
List<Function> funcs = db.getFunctions(objectDesc.fn.name.function_name);
if (funcs.isEmpty()) {
return createGetPartialCatalogObjectError(
CatalogLookupStatus.FUNCTION_NOT_FOUND);
}
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
List<TFunction> thriftFuncs = Lists.newArrayListWithCapacity(funcs.size());
for (Function f : funcs) thriftFuncs.add(f.toThrift());
resp.setFunctions(thriftFuncs);
return resp;
} finally {
versionLock_.readLock().unlock();
}
}
default:
throw new CatalogException("Unable to fetch partial info for type: " +
req.object_desc.type);
}
}
private static TGetPartialCatalogObjectResponse createGetPartialCatalogObjectError(
CatalogLookupStatus status) {
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
resp.setLookup_status(status);
return resp;
}
/**
* Return a partial view of information about global parts of the catalog (eg
* the list of tables, etc).
*/
private TGetPartialCatalogObjectResponse getPartialCatalogInfo(
TGetPartialCatalogObjectRequest req) {
TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
resp.catalog_info = new TPartialCatalogInfo();
TCatalogInfoSelector sel = Preconditions.checkNotNull(req.catalog_info_selector,
"no catalog_info_selector in request");
if (sel.want_db_names) {
resp.catalog_info.db_names = ImmutableList.copyOf(dbCache_.get().keySet());
}
// TODO(todd) implement data sources and other global information.
return resp;
}
/**
* Set the last used time of specified tables to now.
* TODO: Make use of TTableUsage.num_usages.
*/
public void updateTableUsage(TUpdateTableUsageRequest req) {
for (TTableUsage usage : req.usages) {
Table table = null;
try {
table = getTable(usage.table_name.db_name, usage.table_name.table_name);
} catch (DatabaseNotFoundException e) {
// do nothing
}
if (table != null) table.refreshLastUsedTime();
}
}
CatalogdTableInvalidator getCatalogdTableInvalidator() {
return catalogdTableInvalidator_;
}
@VisibleForTesting
void setCatalogdTableInvalidator(CatalogdTableInvalidator cleaner) {
catalogdTableInvalidator_ = cleaner;
}
@VisibleForTesting
public void setMetastoreEventProcessor(
ExternalEventsProcessor metastoreEventProcessor) {
this.metastoreEventProcessor_ = metastoreEventProcessor;
}
}