blob: 4c959b21b9b1f32595e6c6679039bfd1d9bc21bd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDataSource;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TRole;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
/**
* Thread safe Catalog for an Impalad. The Impalad catalog can be updated either via
* a StateStore heartbeat or by directly applying the result of a catalog operation to
* the CatalogCache. All updates are applied using the updateCatalog() function.
* Table metadata is loaded lazily. The CatalogServer initially broadcasts (via the
* statestore) the known table names (as IncompleteTables). These table names are added
* to the Impalad catalog cache and when one of the tables is accessed, the impalad will
* make an RPC to the CatalogServer to request loading the complete table metadata.
* In both cases, we need to ensure that work from one update is not "undone" by another
* update. To handle this the ImpaladCatalog does the following:
* - Tracks the overall catalog version last received in a state store heartbeat, this
* version is maintained by the catalog server and it is always guaranteed that
* this impalad's catalog will never contain any objects < than this version
* (any updates with a lower version number are ignored).
* - For updated/new objects, check if the object already exists in the
* catalog cache. If it does, only apply the update if the catalog version is > the
* existing object's catalog version. Also keep a log of all dropped catalog objects
* (and the version they were dropped in). Before updating any object, check if it was
* dropped in a later version. If so, ignore the update.
* - Before dropping any catalog object, see if the object already exists in the catalog
* cache. If it does, only drop the object if the version of the drop is > that
* object's catalog version.
* The CatalogServiceId is also tracked to detect if a different instance of the catalog
* service has been started, in which case a full topic update is required.
*/
public class ImpaladCatalog extends Catalog {
private static final Logger LOG = Logger.getLogger(ImpaladCatalog.class);
private static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L);
// The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer
// has restarted.
private TUniqueId catalogServiceId_ = INITIAL_CATALOG_SERVICE_ID;
// The catalog version received in the last StateStore heartbeat. It is guaranteed
// all objects in the catalog have at a minimum, this version. Because updates may
// be applied out of band of a StateStore heartbeat, it is possible the catalog
// contains some objects > than this version.
private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
// Flag to determine if the Catalog is ready to accept user requests. See isReady().
private final AtomicBoolean isReady_ = new AtomicBoolean(false);
// Tracks modifications to this Impalad's catalog from direct updates to the cache.
private final CatalogDeltaLog catalogDeltaLog_ = new CatalogDeltaLog();
// Object that is used to synchronize on and signal when a catalog update is received.
private final Object catalogUpdateEventNotifier_ = new Object();
// The addresses of the Kudu masters to use if no Kudu masters were explicitly provided.
// Used during table creation.
private final String defaultKuduMasterHosts_;
/**
* C'tor used by tests that need to validate the ImpaladCatalog outside of the
* CatalogServer.
*/
public ImpaladCatalog(String defaultKuduMasterHosts) {
super();
defaultKuduMasterHosts_ = defaultKuduMasterHosts;
}
/**
* Returns true if the given object does not depend on any other object already
* existing in the catalog in order to be added.
*/
private boolean isTopLevelCatalogObject(TCatalogObject catalogObject) {
return catalogObject.getType() == TCatalogObjectType.DATABASE ||
catalogObject.getType() == TCatalogObjectType.DATA_SOURCE ||
catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL ||
catalogObject.getType() == TCatalogObjectType.ROLE;
}
/**
* Updates the internal Catalog based on the given TCatalogUpdateReq.
* This method:
* 1) Updates all top level objects (such as databases and roles).
* 2) Updates all objects that depend on top level objects (such as functions, tables,
* privileges).
* 3) Removes all dropped catalog objects.
*
* This method is called once per statestore heartbeat and is guaranteed the same
* object will not be in both the "updated" list and the "removed" list (it is
* a detail handled by the statestore).
* Catalog objects are ordered by version, which is not necessarily the same as ordering
* by dependency. This is handled by doing two passes and first updating the top level
* objects, followed by updating the dependent objects. This method is synchronized
* because updateCatalog() can be called by during a statestore update or during a
* direct-DDL operation and catalogServiceId_ and lastSyncedCatalogVersion_ must be
* protected.
*/
public synchronized TUpdateCatalogCacheResponse updateCatalog(
TUpdateCatalogCacheRequest req) throws CatalogException {
// Check for changes in the catalog service ID.
if (!catalogServiceId_.equals(req.getCatalog_service_id())) {
boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
catalogServiceId_ = req.getCatalog_service_id();
if (!firstRun) {
// Throw an exception which will trigger a full topic update request.
throw new CatalogException("Detected catalog service ID change. Aborting " +
"updateCatalog()");
}
}
// Process updates to top level objects first because they don't depend on any other
// objects already existing in the catalog.
for (TCatalogObject catalogObject: req.getUpdated_objects()) {
if (isTopLevelCatalogObject(catalogObject)) {
Preconditions.checkState(catalogObject.getType() != TCatalogObjectType.CATALOG);
try {
addCatalogObject(catalogObject);
} catch (Exception e) {
LOG.error("Error adding catalog object: " + e.getMessage(), e);
}
}
}
// Process updates to dependent objects next. Since the top level objects were already
// processed, we are guaranteed that the top level objects that the dependent objects
// depend on exist in the catalog.
long newCatalogVersion = lastSyncedCatalogVersion_;
for (TCatalogObject catalogObject: req.getUpdated_objects()) {
if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
newCatalogVersion = catalogObject.getCatalog_version();
} else if (!isTopLevelCatalogObject(catalogObject)) {
try {
addCatalogObject(catalogObject);
} catch (Exception e) {
LOG.error("Error adding catalog object: " + e.getMessage(), e);
}
}
}
// Now remove all objects from the catalog. Removing a database before removing
// its child tables/functions is fine. If that happens, the removal of the child
// object will be a no-op.
for (TCatalogObject catalogObject: req.getRemoved_objects()) {
removeCatalogObject(catalogObject, newCatalogVersion);
}
lastSyncedCatalogVersion_ = newCatalogVersion;
// Cleanup old entries in the log.
catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
isReady_.set(true);
// Notify all the threads waiting on a catalog update.
synchronized (catalogUpdateEventNotifier_) {
catalogUpdateEventNotifier_.notifyAll();
}
return new TUpdateCatalogCacheResponse(catalogServiceId_);
}
/**
* Causes the calling thread to wait until a catalog update notification has been sent
* or the given timeout has been reached. A timeout value of 0 indicates an indefinite
* wait. Does not protect against spurious wakeups, so this should be called in a loop.
*
*/
public void waitForCatalogUpdate(long timeoutMs) {
synchronized (catalogUpdateEventNotifier_) {
try {
catalogUpdateEventNotifier_.wait(timeoutMs);
} catch (InterruptedException e) {
// Ignore
}
}
}
/**
* Returns the Table object for the given dbName/tableName. Returns null
* if the table does not exist. Will throw a TableLoadingException if the table's
* metadata was not able to be loaded successfully and DatabaseNotFoundException
* if the parent database does not exist.
*/
@Override
public Table getTable(String dbName, String tableName)
throws CatalogException {
Table table = super.getTable(dbName, tableName);
if (table == null) return null;
if (table.isLoaded() && table instanceof IncompleteTable) {
// If there were problems loading this table's metadata, throw an exception
// when it is accessed.
ImpalaException cause = ((IncompleteTable) table).getCause();
if (cause instanceof TableLoadingException) throw (TableLoadingException) cause;
throw new TableLoadingException("Missing metadata for table: " + tableName, cause);
}
return table;
}
/**
* Returns the HDFS path where the metastore would create the given table. If the table
* has a "location" set, that will be returned. Otherwise the path will be resolved
* based on the location of the parent database. The metastore folder hierarchy is:
* <warehouse directory>/<db name>.db/<table name>
* Except for items in the default database which will be:
* <warehouse directory>/<table name>
* This method handles both of these cases.
*/
public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws NoSuchObjectException, MetaException, TException {
try (MetaStoreClient msClient = getMetaStoreClient()) {
// If the table did not have its path set, build the path based on the the
// location property of the parent database.
if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
String dbLocation =
msClient.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri();
return new Path(dbLocation, msTbl.getTableName().toLowerCase());
} else {
return new Path(msTbl.getSd().getLocation());
}
}
}
/**
* Adds the given TCatalogObject to the catalog cache. The update may be ignored
* (considered out of date) if:
* 1) An item exists in the catalog cache with a version > than the given
* TCatalogObject's version.
* 2) The catalogDeltaLog_ contains an entry for this object with a version
* > than the given TCatalogObject's version.
*/
private void addCatalogObject(TCatalogObject catalogObject)
throws TableLoadingException, DatabaseNotFoundException {
// This item is out of date and should not be applied to the catalog.
if (catalogDeltaLog_.wasObjectRemovedAfter(catalogObject)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Skipping update because a matching object was removed " +
"in a later catalog version: %s", catalogObject));
}
return;
}
switch(catalogObject.getType()) {
case DATABASE:
addDb(catalogObject.getDb(), catalogObject.getCatalog_version());
break;
case TABLE:
case VIEW:
addTable(catalogObject.getTable(), catalogObject.getCatalog_version());
break;
case FUNCTION:
// Remove the function first, in case there is an existing function with the same
// name and signature.
removeFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
addFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
break;
case DATA_SOURCE:
addDataSource(catalogObject.getData_source(), catalogObject.getCatalog_version());
break;
case ROLE:
Role role = Role.fromThrift(catalogObject.getRole());
role.setCatalogVersion(catalogObject.getCatalog_version());
authPolicy_.addRole(role);
break;
case PRIVILEGE:
RolePrivilege privilege =
RolePrivilege.fromThrift(catalogObject.getPrivilege());
privilege.setCatalogVersion(catalogObject.getCatalog_version());
try {
authPolicy_.addPrivilege(privilege);
} catch (CatalogException e) {
LOG.error("Error adding privilege: ", e);
}
break;
case HDFS_CACHE_POOL:
HdfsCachePool cachePool = new HdfsCachePool(catalogObject.getCache_pool());
cachePool.setCatalogVersion(catalogObject.getCatalog_version());
hdfsCachePools_.add(cachePool);
break;
default:
throw new IllegalStateException(
"Unexpected TCatalogObjectType: " + catalogObject.getType());
}
}
/**
* Removes the matching TCatalogObject from the catalog, if one exists and its
* catalog version is < the catalog version of this drop operation.
* Note that drop operations that come from statestore heartbeats always have a
* version of 0. To determine the drop version for statestore updates,
* the catalog version from the current update is used. This is okay because there
* can never be a catalog update from the statestore that contains a drop
* and an addition of the same object. For more details on how drop
* versioning works, see CatalogServerCatalog.java
*/
private void removeCatalogObject(TCatalogObject catalogObject,
long currentCatalogUpdateVersion) {
// The TCatalogObject associated with a drop operation from a state store
// heartbeat will always have a version of zero. Because no update from
// the state store can contain both a drop and an addition of the same object,
// we can assume the drop version is the current catalog version of this update.
// If the TCatalogObject contains a version that != 0, it indicates the drop
// came from a direct update.
long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
currentCatalogUpdateVersion : catalogObject.getCatalog_version();
switch(catalogObject.getType()) {
case DATABASE:
removeDb(catalogObject.getDb(), dropCatalogVersion);
break;
case TABLE:
case VIEW:
removeTable(catalogObject.getTable(), dropCatalogVersion);
break;
case FUNCTION:
removeFunction(catalogObject.getFn(), dropCatalogVersion);
break;
case DATA_SOURCE:
removeDataSource(catalogObject.getData_source(), dropCatalogVersion);
break;
case ROLE:
removeRole(catalogObject.getRole(), dropCatalogVersion);
break;
case PRIVILEGE:
removePrivilege(catalogObject.getPrivilege(), dropCatalogVersion);
break;
case HDFS_CACHE_POOL:
HdfsCachePool existingItem =
hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) {
hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
}
break;
default:
throw new IllegalStateException(
"Unexpected TCatalogObjectType: " + catalogObject.getType());
}
if (catalogObject.getCatalog_version() > lastSyncedCatalogVersion_) {
catalogDeltaLog_.addRemovedObject(catalogObject);
}
}
private void addDb(TDatabase thriftDb, long catalogVersion) {
Db existingDb = getDb(thriftDb.getDb_name());
if (existingDb == null ||
existingDb.getCatalogVersion() < catalogVersion) {
Db newDb = Db.fromTDatabase(thriftDb, this);
newDb.setCatalogVersion(catalogVersion);
addDb(newDb);
}
}
private void addTable(TTable thriftTable, long catalogVersion)
throws TableLoadingException {
Db db = getDb(thriftTable.db_name);
if (db == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Parent database of table does not exist: " +
thriftTable.db_name + "." + thriftTable.tbl_name);
}
return;
}
Table newTable = Table.fromThrift(db, thriftTable);
newTable.setCatalogVersion(catalogVersion);
db.addTable(newTable);
}
private void addFunction(TFunction fn, long catalogVersion) {
Function function = Function.fromThrift(fn);
function.setCatalogVersion(catalogVersion);
Db db = getDb(function.getFunctionName().getDb());
if (db == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Parent database of function does not exist: " + function.getName());
}
return;
}
Function existingFn = db.getFunction(fn.getSignature());
if (existingFn == null ||
existingFn.getCatalogVersion() < catalogVersion) {
db.addFunction(function);
}
}
private void addDataSource(TDataSource thrift, long catalogVersion) {
DataSource dataSource = DataSource.fromThrift(thrift);
dataSource.setCatalogVersion(catalogVersion);
addDataSource(dataSource);
}
private void removeDataSource(TDataSource thrift, long dropCatalogVersion) {
removeDataSource(thrift.getName());
}
private void removeDb(TDatabase thriftDb, long dropCatalogVersion) {
Db db = getDb(thriftDb.getDb_name());
if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
removeDb(db.getName());
}
}
private void removeTable(TTable thriftTable, long dropCatalogVersion) {
Db db = getDb(thriftTable.db_name);
// The parent database doesn't exist, nothing to do.
if (db == null) return;
Table table = db.getTable(thriftTable.getTbl_name());
if (table != null && table.getCatalogVersion() < dropCatalogVersion) {
db.removeTable(thriftTable.tbl_name);
}
}
private void removeFunction(TFunction thriftFn, long dropCatalogVersion) {
Db db = getDb(thriftFn.name.getDb_name());
// The parent database doesn't exist, nothing to do.
if (db == null) return;
// If the function exists and it has a catalog version less than the
// version of the drop, remove the function.
Function fn = db.getFunction(thriftFn.getSignature());
if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
db.removeFunction(thriftFn.getSignature());
}
}
private void removeRole(TRole thriftRole, long dropCatalogVersion) {
Role existingRole = authPolicy_.getRole(thriftRole.getRole_name());
// version of the drop, remove the function.
if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
authPolicy_.removeRole(thriftRole.getRole_name());
}
}
private void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion) {
Role role = authPolicy_.getRole(thriftPrivilege.getRole_id());
if (role == null) return;
RolePrivilege existingPrivilege =
role.getPrivilege(thriftPrivilege.getPrivilege_name());
// version of the drop, remove the function.
if (existingPrivilege != null &&
existingPrivilege.getCatalogVersion() < dropCatalogVersion) {
role.removePrivilege(thriftPrivilege.getPrivilege_name());
}
}
/**
* Returns true if the ImpaladCatalog is ready to accept requests (has
* received and processed a valid catalog topic update from the StateStore),
* false otherwise.
*/
public boolean isReady() { return isReady_.get(); }
// Only used for testing.
public void setIsReady(boolean isReady) { isReady_.set(isReady); }
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
public String getDefaultKuduMasterHosts() { return defaultKuduMasterHosts_; }
}