| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog.local; |
| |
| import java.lang.management.ManagementFactory; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; |
| import org.apache.hadoop.hive.metastore.api.Database; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; |
| import org.apache.hadoop.hive.metastore.api.Partition; |
| import org.apache.hadoop.hive.metastore.api.SQLForeignKey; |
| import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; |
| import org.apache.hadoop.hive.metastore.api.Table; |
| import org.apache.hadoop.hive.metastore.api.UnknownDBException; |
| import org.apache.impala.authorization.AuthorizationChecker; |
| import org.apache.impala.authorization.AuthorizationPolicy; |
| import org.apache.impala.catalog.AuthzCacheInvalidation; |
| import org.apache.impala.catalog.Catalog; |
| import org.apache.impala.catalog.CatalogDeltaLog; |
| import org.apache.impala.catalog.CatalogException; |
| import org.apache.impala.catalog.CatalogObjectCache; |
| import org.apache.impala.catalog.Function; |
| import org.apache.impala.catalog.HdfsPartition.FileDescriptor; |
| import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer; |
| import org.apache.impala.catalog.Principal; |
| import org.apache.impala.catalog.PrincipalPrivilege; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.common.Pair; |
| import org.apache.impala.service.FeSupport; |
| import org.apache.impala.service.FrontendProfile; |
| import org.apache.impala.thrift.CatalogLookupStatus; |
| import org.apache.impala.thrift.TBackendGflags; |
| import org.apache.impala.thrift.TCatalogInfoSelector; |
| import org.apache.impala.thrift.TCatalogObject; |
| import org.apache.impala.thrift.TCatalogObjectType; |
| import org.apache.impala.thrift.TDatabase; |
| import org.apache.impala.thrift.TDbInfoSelector; |
| import org.apache.impala.thrift.TErrorCode; |
| import org.apache.impala.thrift.TFunction; |
| import org.apache.impala.thrift.TFunctionName; |
| import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; |
| import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; |
| import org.apache.impala.thrift.THdfsFileDesc; |
| import org.apache.impala.thrift.TNetworkAddress; |
| import org.apache.impala.thrift.TPartialPartitionInfo; |
| import org.apache.impala.thrift.TTable; |
| import org.apache.impala.thrift.TTableInfoSelector; |
| import org.apache.impala.thrift.TUniqueId; |
| import org.apache.impala.thrift.TUnit; |
| import org.apache.impala.thrift.TUpdateCatalogCacheRequest; |
| import org.apache.impala.thrift.TUpdateCatalogCacheResponse; |
| import org.apache.impala.util.ListMap; |
| import org.apache.impala.util.TByteBuffer; |
| import org.apache.thrift.TDeserializer; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.TSerializer; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.ehcache.sizeof.SizeOf; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Objects; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Throwables; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheStats; |
| import com.google.common.cache.Weigher; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.util.concurrent.UncheckedExecutionException; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import com.google.errorprone.annotations.Immutable; |
| import com.google.errorprone.annotations.concurrent.GuardedBy; |
| |
| /** |
| * MetaProvider which fetches metadata in a granular fashion from the catalogd. |
| * |
| * When pieces of metadata are requested, a local LRU cache is first queried. If the |
| * metadata is present in the cache, it will be returned. Otherwise, an RPC is made |
| * to the catalogd to fetch the metadata and the metadata is written back into the cache |
| * before being returned. |
| * |
| * The implementation subscribes to a "minimal" subset of the Catalog statestore topic |
| * in order to be notified about changes to objects stored within the catalogd. We |
| * use the notifications from this topic to perform cache invalidation. The invalidation |
| * strategy is slightly different depending on the specific piece of metadata: |
| * |
| * Strategy 1): small objects, cached coarse-grained |
| * --------------------------- |
| * For objects which we know to be small (eg database metadata, lists of table names, |
| * etc) we simply invalidate our local cache whenever we see any change that indicates |
| * our data might be stale. For example, when we see any change of a table, we invalidate |
| * the list of tables in the associated database. This is simpler than tracking whether |
| * the table is an addition or just an updated version of an already-existing entity, |
| * and in most cases we aim for simplicity even if it means occasional over-invalidation. |
| * |
| * As another example, when we see a new version of a database object, we invalidate |
| * the list of databases as well as the cached information for that specific database. |
| * |
| * Strategy 2): granular information about tables |
| * ----------------------------------------------- |
| * The metadata associated with tables is large enough that, instead of caching it as |
| * a single coarse-grained entry, we use separate cache entries for more fine-grained |
| * pieces of metadata (eg lists of partitions, individual partitions, etc). Thus, when |
| * we see a notification indicating that a table has changed versions in the catalogd, |
| * it would be difficult to evict all of the associated items from the cache. The cache |
| * implementation does not support "prefix search" or "tag-based invalidation" of any |
| * kind, so we would need to scan through all of the cached items in order to invalidate |
| * all of the data referring to a table. |
| * |
| * Instead, we use a different strategy: all of the granular metadata associated with a |
| * particular version of a table includes the _version number_ of the table as part of |
| * its cache key. When we are notified that the table has changed version numbers, we |
| * simply invalidate the top-level table entry, and allow other information to remain |
| * in the cache. When we next load this table, we will load a new version of the top-level |
| * table entry, including its new version number. Thus, the requests for granular |
| * information pertaining to the new version will not include the old version number |
| * in cache keys anymore. The old metadata is essentially invalidated by the fact that |
| * it is no longer "linked". Over time, the old entries will naturally age out of the |
| * cache. |
| * |
| * |
| * Metadata that is _not_ fetched on demand |
| * ================================================ |
| * This implementation does not fetch _all_ metadata on demand. In fact, some pieces of |
| * metadata are currently provided in the same manner as "legacy" coordinators: the |
| * full metadata objects are published by the catalog daemon into the statestore, and |
| * we keep a full "replica" of that information. In particular, we currently use this |
| * strategy for Sentry metadata (roles and privileges) since the caching of this data |
| * is relatively more complex. Given that this data is typically quite small relative |
| * to the table metadata, it's not too expensive to maintain the full replica. |
| * |
| * |
| * TODO(todd): expose statistics on a per-query and per-daemon level about cache |
| * hit rates, number of outbound RPCs, etc. |
| * TODO(todd): handle retry/backoff to ride over short catalog interruptions |
| */ |
| public class CatalogdMetaProvider implements MetaProvider { |
| |
| private final static Logger LOG = LoggerFactory.getLogger(CatalogdMetaProvider.class); |
| |
| /** |
| * Sentinel value used as a negative cache entry for column statistics. |
| * Some columns (e.g. partitioning columns )do not have statistics in the catalog |
| * and won't be returned when we ask it for stats. It's important to cache negative |
| * entries for those or else we would require a round-trip every time the table |
| * is loaded. |
| * |
| * This special sentinel value is stored in the cache to indicate such a "negative |
| * cache" entry. It is always compared by reference equality. |
| */ |
| private static final ColumnStatisticsObj NEGATIVE_COLUMN_STATS_SENTINEL = |
| new ColumnStatisticsObj(); |
| |
| /** |
| * Used as a cache key for caching the "null partition key value", which is a global |
| * Hive configuration. Value is a String. |
| */ |
| private static final Object NULL_PARTITION_KEY_VALUE_CACHE_KEY = new Object(); |
| |
| /** |
| * Used as a cache key for caching the global list of database names. Value is |
| * an ImmutableList<String>. |
| */ |
| private static final Object DB_LIST_CACHE_KEY = new Object(); |
| |
| private static final String CATALOG_FETCH_PREFIX = "CatalogFetch"; |
| private static final String DB_LIST_STATS_CATEGORY = "DatabaseList"; |
| private static final String DB_METADATA_STATS_CATEGORY = "Databases"; |
| private static final String TABLE_NAMES_STATS_CATEGORY = "TableNames"; |
| private static final String TABLE_METADATA_CACHE_CATEGORY = "Tables"; |
| private static final String PARTITION_LIST_STATS_CATEGORY = "PartitionLists"; |
| private static final String PARTITIONS_STATS_CATEGORY = "Partitions"; |
| private static final String COLUMN_STATS_STATS_CATEGORY = "ColumnStats"; |
| private static final String GLOBAL_CONFIGURATION_STATS_CATEGORY = "Config"; |
| private static final String FUNCTION_LIST_STATS_CATEGORY = "FunctionLists"; |
| private static final String FUNCTIONS_STATS_CATEGORY = "Functions"; |
| private static final String RPC_STATS_CATEGORY = "RPCs"; |
| private static final String STORAGE_METADATA_LOAD_CATEGORY = "StorageLoad"; |
| private static final String RPC_REQUESTS = |
| CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Requests"; |
| private static final String RPC_BYTES = |
| CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Bytes"; |
| private static final String RPC_TIME = |
| CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Time"; |
| |
| /** |
| * File descriptors store replicas using a compressed format that references hosts |
| * by index in a "host index" list rather than by their full addresses. Since we cache |
| * partition metadata including file descriptors across many queries, we can't rely on |
| * callers to provide a consistent host index. Instead, cached file descriptors are |
| * always relative to this global host index. |
| * |
| * Note that we never evict entries from this host index. We rely on the fact that, |
| * in a given storage cluster, the number of hosts is bounded, and "leaking" the unique |
| * network addresses won't cause a problem over time. |
| */ |
| private final ListMap<TNetworkAddress> cacheHostIndex_ = |
| new ListMap<TNetworkAddress>(); |
| |
| // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces |
| // of metadata. In order to incrementally build this out, we delegate various calls |
| // to the "direct" provider for now and circumvent catalogd. |
| private DirectMetaProvider directProvider_ = new DirectMetaProvider(); |
| |
| /** |
| * Number of requests which piggy-backed on a concurrent request for the same key, |
| * and resulted in success. Used only for test assertions. |
| */ |
| @VisibleForTesting |
| final AtomicInteger piggybackSuccessCountForTests = new AtomicInteger(); |
| |
| /** |
| * Number of requests which piggy-backed on a concurrent request for the same key, |
| * and resulted in an exception. Used only for test assertions. |
| */ |
| @VisibleForTesting |
| final AtomicInteger piggybackExceptionCountForTests = new AtomicInteger(); |
| |
| /** |
| * The underlying cache. |
| * |
| * The keys in this cache are various types of objects (strings, DbCacheKey, etc). |
| * The values are also variant depending on the type of cache key. While any key |
| * is being loaded, it is a Future<T>, which gets replaced with a non-wrapped object |
| * once it is successfully loaded (see {@link #getIfPresent(Object)} for a convenient |
| * wrapper). |
| * |
| * For details of the usage of Futures within the cache, see |
| * {@link #loadWithCaching(String, String, Object, Callable). |
| */ |
| final Cache<Object,Object> cache_; |
| |
| /** |
| * The last catalog version seen in an update from the catalogd. |
| * |
| * This is used to implement SYNC_DDL: when a SYNC_DDL operation is done, the catalog |
| * responds to the DDL with the version of the catalog at which the DDL has been |
| * applied. The backend then waits until this 'lastSeenCatalogVersion' advances past |
| * the version where the DDL was applied, and correlates that with the corresponding |
| * statestore topic version. It then waits until the statestore reports that this topic |
| * version has been distributed to all coordinators before proceeding. |
| */ |
| private final AtomicLong lastSeenCatalogVersion_ = new AtomicLong( |
| Catalog.INITIAL_CATALOG_VERSION); |
| |
| /** |
| * The catalog version at last time when catalogd starts resetting the entire catalog. |
| * |
| * This is used to implement global INVALIDATE METADATA: Catalogd saves its catalog |
| * version to this when it starts to reset the entire catalog. After the reset is done, |
| * all valid catalog objects should have a catalog version larger than this. Thus, we |
| * can safely advance the catalog version lower bound to this version + 1. |
| * The coordinator first gets this value in the RPC response from Catalogd and starts |
| * to wait until the catalog version lower bound becomes larger than it. Once we |
| * receive the update from statestored and update our catalog cache accordingly, the |
| * catalog version lower bound is set to lastResetCatalogVersion_ + 1 (returned by |
| * updateCatalogCache()). |
| */ |
| private final AtomicLong lastResetCatalogVersion_ = new AtomicLong(-1); |
| |
| /** |
| * Tracks objects that have been deleted in response to a DDL issued from this |
| * coordinator. |
| */ |
| CatalogDeltaLog deletedObjectsLog_ = new CatalogDeltaLog(); |
| |
| /** |
| * The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer |
| * has restarted. |
| */ |
| @GuardedBy("catalogServiceIdLock_") |
| private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID; |
| private final Object catalogServiceIdLock_ = new Object(); |
| |
| |
| /** |
| * Cache of authorization policy metadata. Populated from data pushed from the |
| * StateStore. Currently this is _not_ "fetch-on-demand". |
| */ |
| private final AuthorizationPolicy authPolicy_ = new AuthorizationPolicy(); |
| // Cache of authorization refresh markers. |
| private final CatalogObjectCache<AuthzCacheInvalidation> authzCacheInvalidation_ = |
| new CatalogObjectCache<>(); |
| private AtomicReference<? extends AuthorizationChecker> authzChecker_; |
| |
| public CatalogdMetaProvider(TBackendGflags flags) { |
| Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s()); |
| Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb()); |
| |
| long cacheSizeBytes; |
| if (flags.local_catalog_cache_mb < 0) { |
| long maxHeapBytes = ManagementFactory.getMemoryMXBean() |
| .getHeapMemoryUsage().getMax(); |
| cacheSizeBytes = (long)(maxHeapBytes * 0.6); |
| } else { |
| cacheSizeBytes = flags.local_catalog_cache_mb * 1024 * 1024; |
| } |
| int expirationSecs = flags.local_catalog_cache_expiration_s; |
| LOG.info("Metadata cache configuration: capacity={} MB, expiration={} sec", |
| cacheSizeBytes/1024/1024, expirationSecs); |
| |
| // TODO(todd) add end-to-end test cases which stress cache eviction (both time |
| // and size-triggered) and make sure results are still correct. |
| cache_ = CacheBuilder.newBuilder() |
| .maximumWeight(cacheSizeBytes) |
| .expireAfterAccess(expirationSecs, TimeUnit.SECONDS) |
| .weigher(new SizeOfWeigher()) |
| .recordStats() |
| .build(); |
| } |
| |
| public CacheStats getCacheStats() { |
| return cache_.stats(); |
| } |
| |
| @Override |
| public AuthorizationPolicy getAuthPolicy() { |
| return authPolicy_; |
| } |
| |
| @Override |
| public boolean isReady() { |
| return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION; |
| } |
| |
| public void setAuthzChecker( |
| AtomicReference<? extends AuthorizationChecker> authzChecker) { |
| authzChecker_ = authzChecker; |
| } |
| |
| /** |
| * Send a GetPartialCatalogObject request to catalogd. This handles converting |
| * non-OK status responses back to exceptions, performing various generic sanity |
| * checks, etc. |
| */ |
| private TGetPartialCatalogObjectResponse sendRequest( |
| TGetPartialCatalogObjectRequest req) |
| throws TException { |
| TGetPartialCatalogObjectResponse resp; |
| byte[] ret = null; |
| Stopwatch sw = new Stopwatch().start(); |
| try { |
| ret = FeSupport.GetPartialCatalogObject(new TSerializer().serialize(req)); |
| } catch (InternalException e) { |
| throw new TException(e); |
| } finally { |
| sw.stop(); |
| FrontendProfile profile = FrontendProfile.getCurrentOrNull(); |
| if (profile != null) { |
| profile.addToCounter(RPC_REQUESTS, TUnit.NONE, 1); |
| profile.addToCounter(RPC_BYTES, TUnit.BYTES, ret == null ? 0 : ret.length); |
| profile.addToCounter(RPC_TIME, TUnit.TIME_MS, sw.elapsed(TimeUnit.MILLISECONDS)); |
| } |
| } |
| resp = new TGetPartialCatalogObjectResponse(); |
| new TDeserializer().deserialize(resp, ret); |
| if (resp.status.status_code != TErrorCode.OK) { |
| // TODO(todd) do reasonable error handling |
| throw new TException(resp.toString()); |
| } |
| |
| // If we get a "not found" response, then we assume that this was a case of an |
| // inconsistent cache. For example, we might have cached the list of tables within |
| // a database, but the table we're trying to load was dropped just prior to us |
| // trying to load it. In these cases, we need to invalidate whatever cache items |
| // might have led us to the dropped object and throw an exception so that we |
| // can retry with a reloaded cache. |
| switch (resp.lookup_status) { |
| case DB_NOT_FOUND: |
| case FUNCTION_NOT_FOUND: |
| case TABLE_NOT_FOUND: |
| case TABLE_NOT_LOADED: |
| case PARTITION_NOT_FOUND: |
| invalidateCacheForObject(req.object_desc); |
| throw new InconsistentMetadataFetchException( |
| String.format("Fetching %s failed. Could not find %s", |
| req.object_desc.type.name(), req.object_desc.toString())); |
| default: break; |
| } |
| Preconditions.checkState(resp.lookup_status == CatalogLookupStatus.OK); |
| |
| // If we requested information about a particular version of an object, but |
| // got back a response for a different version, then we have a case of "read skew". |
| // For example, we may have fetched the partition list of a table, performed pruning, |
| // and then tried to fetch the specific partitions needed for a query, while some |
| // concurrent DDL modified the set of partitions. This could result in an unexpected |
| // result which violates the snapshot consistency guarantees expected by users. |
| if (req.object_desc.isSetCatalog_version() && |
| resp.isSetObject_version_number() && |
| req.object_desc.catalog_version != resp.object_version_number) { |
| invalidateCacheForObject(req.object_desc); |
| LOG.warn("Catalog object {} changed version from {} to {} while fetching metadata", |
| req.object_desc.toString(), req.object_desc.catalog_version, |
| resp.object_version_number); |
| throw new InconsistentMetadataFetchException( |
| String.format("Catalog object %s changed version between accesses.", |
| req.object_desc.toString())); |
| } |
| return resp; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <CacheKeyType, ValueType> ValueType loadWithCaching(String itemString, |
| String statsCategory, CacheKeyType key, |
| final Callable<ValueType> loadCallable) throws TException { |
| |
| // We cache Futures during loading to deal with a particularly troublesome race |
| // around invalidation (IMPALA-7534). Namely, we have the following interleaving to |
| // worry about: |
| // |
| // Thread 1: loadTableNames() misses and sends a request to fetch table names |
| // Catalogd: sends a response with table list ['foo'] |
| // Thread 2: creates a table 'bar' |
| // Catalogd: returns an invalidation for the table name list |
| // Thread 2: invalidates the table list |
| // Thread 1: response arrives with ['foo'], which is stored in the cache |
| // |
| // In this case, we've "missed" an invalidation because it arrived concurrently |
| // with the loading of a value in the cache. This is a well-known issue with |
| // Guava: |
| // |
| // https://softwaremill.com/race-condition-cache-guava-caffeine/ |
| // |
| // In order to avoid this issue, if we don't find an element in the cache, we insert |
| // a Future while we load the value. Essentially, an entry can be in one of the |
| // following states: |
| // |
| // Missing (no entry in the cache): |
| // invalidate would be ignored, but that's OK, because any future read would fetch |
| // new data from the catalogd, and see a version newer than the invalidate |
| // |
| // Loading (a Future<> in the cache): |
| // invalidate removes the future. When loading completes, its attempt to swap |
| // in the value will fail. Any request after the invalidate will cause a second |
| // load to be triggered, which sees the post-invalidated data in catalogd. |
| // |
| // Any concurrent *read* of the cache (with no invalidation or prior to an |
| // invalidation) will piggy-back on the e same Future and return its result when |
| // it completes. |
| // |
| // Cached (non-Future in the cache): |
| // no interesting race: an invalidation ensures that any future load will miss |
| // and fetch a new value |
| // |
| // NOTE: we don't need to perform this dance for cache keys which embed a version |
| // number, because invalidation is not handled by removing cache entries, but |
| // rather by bumping top-level version numbers. |
| Stopwatch sw = new Stopwatch().start(); |
| boolean hit = false; |
| boolean isPiggybacked = false; |
| try { |
| CompletableFuture<Object> f = new CompletableFuture<Object>(); |
| // NOTE: the Cache ensures that this is an atomic operation of either returning |
| // an existing value or inserting our own. Only one thread can think it is the |
| // "loader" at a time. |
| Object inCache = cache_.get(key, () -> f); |
| if (!(inCache instanceof Future)) { |
| hit = true; |
| return (ValueType)inCache; |
| } |
| |
| if (inCache != f) { |
| isPiggybacked = true; |
| Future<ValueType> existing = (Future<ValueType>)inCache; |
| ValueType ret = Uninterruptibles.getUninterruptibly(existing); |
| piggybackSuccessCountForTests.incrementAndGet(); |
| return ret; |
| } |
| |
| // No other thread was loading this value, so we need to fetch it ourselves. |
| try { |
| f.complete(loadCallable.call()); |
| // Assuming we were able to load the value, store it back into the map |
| // as a plain-old object. This is important to get the proper weight in the |
| // map. If someone invalidated this load concurrently, this 'replace' will |
| // fail because 'f' will not be the current value. |
| cache_.asMap().replace(key, f, f.get()); |
| } catch (Exception e) { |
| // If there was an exception, remove it from the map so that any later loads |
| // retry. |
| cache_.asMap().remove(key, f); |
| // Ensure any piggy-backed loaders get the exception. 'f.get()' below will |
| // throw to this caller. |
| f.completeExceptionally(e); |
| } |
| return (ValueType) Uninterruptibles.getUninterruptibly(f); |
| } catch (ExecutionException | UncheckedExecutionException e) { |
| if (isPiggybacked) { |
| piggybackExceptionCountForTests.incrementAndGet(); |
| } |
| |
| Throwables.propagateIfPossible(e.getCause(), TException.class); |
| // Since the loading code should only throw TException, we shouldn't get |
| // any other exceptions here. If for some reason we do, just rethrow as RTE. |
| throw new RuntimeException(e); |
| } finally { |
| sw.stop(); |
| addStatsToProfile(statsCategory, /*numHits=*/hit ? 1 : 0, |
| /*numMisses=*/hit ? 0 : 1, sw); |
| LOG.trace("Request for {}: {}{}", itemString, isPiggybacked ? "piggy-backed " : "", |
| hit ? "hit" : "miss"); |
| } |
| } |
| |
| /** |
| * Adds basic statistics to the query's profile when accessing cache entries. |
| * For each cache request, the number of hits, misses, and elapsed time is aggregated. |
| * Cache requests for different types of cache entries, such as function names vs. |
| * table names, are differentiated by a 'statsCategory'. |
| */ |
| private void addStatsToProfile(String statsCategory, int numHits, int numMisses, |
| Stopwatch stopwatch) { |
| FrontendProfile profile = FrontendProfile.getCurrentOrNull(); |
| if (profile == null) return; |
| final String prefix = CATALOG_FETCH_PREFIX + "." + |
| Preconditions.checkNotNull(statsCategory) + "."; |
| profile.addToCounter(prefix + "Requests", TUnit.NONE, numHits + numMisses); |
| profile.addToCounter(prefix + "Time", TUnit.TIME_MS, |
| stopwatch.elapsed(TimeUnit.MILLISECONDS)); |
| profile.addToCounter(prefix + "Hits", TUnit.NONE, numHits); |
| profile.addToCounter(prefix + "Misses", TUnit.NONE, numMisses); |
| } |
| |
| /** |
| * Adds tables metadata storage access time to query's profile. |
| * The access time is aggregated for the tables which need to be loaded. |
| */ |
| private void addTableMetadatStorageLoadTimeToProfile(long storageLoadTimeNano) { |
| FrontendProfile profile = FrontendProfile.getCurrentOrNull(); |
| if (profile == null) return; |
| // Storage-load-time for the table and its partitions |
| final String storageAccessTimeCounter = CATALOG_FETCH_PREFIX + "." + |
| STORAGE_METADATA_LOAD_CATEGORY + "." + "Time"; |
| profile.addToCounter(storageAccessTimeCounter, TUnit.TIME_MS, |
| TimeUnit.MILLISECONDS.convert(storageLoadTimeNano, TimeUnit.NANOSECONDS)); |
| } |
| |
| @Override |
| public ImmutableList<String> loadDbList() throws TException { |
| return loadWithCaching("database list", DB_LIST_STATS_CATEGORY, DB_LIST_CACHE_KEY, |
| new Callable<ImmutableList<String>>() { |
| @Override |
| public ImmutableList<String> call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForCatalog(); |
| req.catalog_info_selector.want_db_names = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.catalog_info != null && resp.catalog_info.db_names != null, |
| req, "missing table names"); |
| return ImmutableList.copyOf(resp.catalog_info.db_names); |
| } |
| }); |
| } |
| |
| private TGetPartialCatalogObjectRequest newReqForCatalog() { |
| TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest(); |
| req.object_desc = new TCatalogObject(); |
| req.object_desc.setType(TCatalogObjectType.CATALOG); |
| req.catalog_info_selector = new TCatalogInfoSelector(); |
| return req; |
| } |
| |
| private TGetPartialCatalogObjectRequest newReqForDb(String dbName) { |
| TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest(); |
| req.object_desc = new TCatalogObject(); |
| req.object_desc.setType(TCatalogObjectType.DATABASE); |
| req.object_desc.db = new TDatabase(dbName); |
| req.db_info_selector = new TDbInfoSelector(); |
| return req; |
| } |
| |
| private TGetPartialCatalogObjectRequest newReqForFunction(String dbName, |
| String funcName) { |
| TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest(); |
| req.object_desc = new TCatalogObject(); |
| req.object_desc.setType(TCatalogObjectType.FUNCTION); |
| req.object_desc.fn = new TFunction(); |
| req.object_desc.fn.name = new TFunctionName(); |
| req.object_desc.fn.name.db_name = dbName; |
| req.object_desc.fn.name.function_name = funcName; |
| return req; |
| } |
| |
| |
| @Override |
| public Database loadDb(final String dbName) throws TException { |
| return loadWithCaching("database metadata for " + dbName, |
| DB_METADATA_STATS_CATEGORY, |
| new DbCacheKey(dbName, DbCacheKey.DbInfoType.HMS_METADATA), |
| new Callable<Database>() { |
| @Override |
| public Database call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForDb(dbName); |
| req.db_info_selector.want_hms_database = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.db_info != null && resp.db_info.hms_database != null, |
| req, "missing expected HMS database"); |
| return resp.db_info.hms_database; |
| } |
| }); |
| } |
| |
| @Override |
| public ImmutableList<String> loadTableNames(final String dbName) |
| throws MetaException, UnknownDBException, TException { |
| return loadWithCaching("table names for database " + dbName, |
| TABLE_NAMES_STATS_CATEGORY, |
| new DbCacheKey(dbName, DbCacheKey.DbInfoType.TABLE_NAMES), |
| new Callable<ImmutableList<String>>() { |
| @Override |
| public ImmutableList<String> call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForDb(dbName); |
| req.db_info_selector.want_table_names = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.db_info != null && resp.db_info.table_names != null, |
| req, "missing expected table names"); |
| return ImmutableList.copyOf(resp.db_info.table_names); |
| } |
| }); |
| } |
| |
| private TGetPartialCatalogObjectRequest newReqForTable(String dbName, |
| String tableName) { |
| TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest(); |
| req.object_desc = new TCatalogObject(); |
| req.object_desc.setType(TCatalogObjectType.TABLE); |
| req.object_desc.table = new TTable(dbName, tableName); |
| req.table_info_selector = new TTableInfoSelector(); |
| return req; |
| } |
| |
| private TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef table) { |
| Preconditions.checkArgument(table instanceof TableMetaRefImpl, |
| "table ref %s was not created by CatalogdMetaProvider", table); |
| TGetPartialCatalogObjectRequest req = newReqForTable( |
| ((TableMetaRefImpl)table).dbName_, |
| ((TableMetaRefImpl)table).tableName_); |
| req.object_desc.setCatalog_version(((TableMetaRefImpl)table).catalogVersion_); |
| return req; |
| } |
| |
| @Override |
| public Pair<Table, TableMetaRef> loadTable(final String dbName, final String tableName) |
| throws NoSuchObjectException, MetaException, TException { |
| // TODO(todd) need to lower case? |
| TableCacheKey cacheKey = new TableCacheKey(dbName, tableName); |
| TableMetaRefImpl ref = loadWithCaching( |
| "table metadata for " + dbName + "." + tableName, |
| TABLE_METADATA_CACHE_CATEGORY, |
| cacheKey, |
| new Callable<TableMetaRefImpl>() { |
| @Override |
| public TableMetaRefImpl call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForTable(dbName, tableName); |
| req.table_info_selector.want_hms_table = true; |
| // To be consistent with implementation in legacy catalog mode, we eagerly |
| // load constraint information whenever a table is loaded. |
| req.table_info_selector.want_table_constraints = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.table_info != null && resp.table_info.hms_table != null, |
| req, "missing expected HMS table"); |
| addTableMetadatStorageLoadTimeToProfile( |
| resp.table_info.storage_metadata_load_time_ns); |
| return new TableMetaRefImpl( |
| dbName, tableName, resp.table_info.hms_table, resp.object_version_number, |
| resp.table_info.primary_keys, resp.table_info.foreign_keys); |
| } |
| }); |
| return Pair.create(ref.msTable_, (TableMetaRef)ref); |
| } |
| |
| @Override |
| public List<ColumnStatisticsObj> loadTableColumnStatistics(final TableMetaRef table, |
| List<String> colNames) throws TException { |
| Stopwatch sw = new Stopwatch().start(); |
| List<ColumnStatisticsObj> ret = Lists.newArrayListWithCapacity(colNames.size()); |
| // Look up in cache first, keeping track of which ones are missing. |
| // We can't use 'loadWithCaching' since we need to fetch several entries batched |
| // in a single RPC to the catalog. |
| int negativeHitCount = 0; |
| List<String> missingCols = Lists.newArrayListWithCapacity(colNames.size()); |
| for (String colName: colNames) { |
| ColStatsCacheKey cacheKey = new ColStatsCacheKey((TableMetaRefImpl)table, colName); |
| ColumnStatisticsObj val = (ColumnStatisticsObj) getIfPresent(cacheKey); |
| if (val == null) { |
| missingCols.add(colName); |
| } else if (val == NEGATIVE_COLUMN_STATS_SENTINEL) { |
| negativeHitCount++; |
| } else { |
| ret.add(val); |
| } |
| } |
| int hitCount = ret.size(); |
| |
| // Fetch and re-add those missing ones. |
| if (!missingCols.isEmpty()) { |
| TGetPartialCatalogObjectRequest req = newReqForTable(table); |
| req.table_info_selector.want_stats_for_column_names = missingCols; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.table_info != null && resp.table_info.column_stats != null, |
| req, "missing column stats"); |
| |
| Set<String> colsWithoutStats = new HashSet<>(missingCols); |
| for (ColumnStatisticsObj stats: resp.table_info.column_stats) { |
| cache_.put(new ColStatsCacheKey((TableMetaRefImpl)table, stats.getColName()), |
| stats); |
| ret.add(stats); |
| colsWithoutStats.remove(stats.getColName()); |
| } |
| |
| // Cache negative entries for any that were not returned. |
| for (String missingColName: colsWithoutStats) { |
| cache_.put(new ColStatsCacheKey((TableMetaRefImpl)table, missingColName), |
| NEGATIVE_COLUMN_STATS_SENTINEL); |
| } |
| } |
| sw.stop(); |
| addStatsToProfile(COLUMN_STATS_STATS_CATEGORY, |
| hitCount + negativeHitCount, missingCols.size(), sw); |
| LOG.trace("Request for column stats of {}: hit {}/ neg hit {} / miss {}", |
| table, hitCount, negativeHitCount, missingCols.size()); |
| return ret; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Object getIfPresent(Object cacheKey) throws TException { |
| Object existing = cache_.getIfPresent(cacheKey); |
| if (existing == null) return null; |
| if (!(existing instanceof Future)) return existing; |
| try { |
| return ((Future<Object>)existing).get(); |
| } catch (InterruptedException | ExecutionException e) { |
| Throwables.propagateIfPossible(e, TException.class); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public List<PartitionRef> loadPartitionList(final TableMetaRef table) |
| throws TException { |
| PartitionListCacheKey key = new PartitionListCacheKey((TableMetaRefImpl) table); |
| return (List<PartitionRef>) loadWithCaching("partition list for " + table, |
| PARTITION_LIST_STATS_CATEGORY, key, new Callable<List<PartitionRef>>() { |
| /** Called to load cache for cache misses */ |
| @Override |
| public List<PartitionRef> call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForTable(table); |
| req.table_info_selector.want_partition_names = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.table_info != null && resp.table_info.partitions != null, |
| req, "missing partition list result"); |
| List<PartitionRef> partitionRefs = |
| Lists.newArrayListWithCapacity(resp.table_info.partitions.size()); |
| for (TPartialPartitionInfo p : resp.table_info.partitions) { |
| checkResponse( |
| p.isSetId(), req, "response missing partition IDs for partition %s", p); |
| partitionRefs.add(new PartitionRefImpl(p)); |
| } |
| return partitionRefs; |
| } |
| }); |
| } |
| |
| @Override |
| public Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> loadConstraints( |
| final TableMetaRef table, Table msTbl) { |
| Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> pair = |
| new Pair<>(((TableMetaRefImpl) table).primaryKeys_, |
| ((TableMetaRefImpl) table).foreignKeys_); |
| return pair; |
| } |
| |
| @Override |
| public Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table, |
| List<String> partitionColumnNames, |
| ListMap<TNetworkAddress> hostIndex, |
| List<PartitionRef> partitionRefs) |
| throws MetaException, TException { |
| Preconditions.checkArgument(table instanceof TableMetaRefImpl); |
| TableMetaRefImpl refImpl = (TableMetaRefImpl)table; |
| Stopwatch sw = new Stopwatch().start(); |
| // Load what we can from the cache. |
| Map<PartitionRef, PartitionMetadata> refToMeta = loadPartitionsFromCache(refImpl, |
| hostIndex, partitionRefs); |
| |
| final int numHits = refToMeta.size(); |
| final int numMisses = partitionRefs.size() - numHits; |
| |
| // Load the remainder from the catalogd. |
| List<PartitionRef> missingRefs = new ArrayList<>(); |
| for (PartitionRef ref: partitionRefs) { |
| if (!refToMeta.containsKey(ref)) missingRefs.add(ref); |
| } |
| if (!missingRefs.isEmpty()) { |
| Map<PartitionRef, PartitionMetadata> fromCatalogd = loadPartitionsFromCatalogd( |
| refImpl, hostIndex, missingRefs); |
| refToMeta.putAll(fromCatalogd); |
| // Write back to the cache. |
| storePartitionsInCache(refImpl, hostIndex, fromCatalogd); |
| } |
| sw.stop(); |
| addStatsToProfile(PARTITIONS_STATS_CATEGORY, refToMeta.size(), numMisses, sw); |
| LOG.trace("Request for partitions of {}: hit {}/{}", table, refToMeta.size(), |
| partitionRefs.size()); |
| |
| // Convert the returned map to be by-name instead of by-ref. |
| Map<String, PartitionMetadata> nameToMeta = Maps.newHashMapWithExpectedSize( |
| refToMeta.size()); |
| for (Map.Entry<PartitionRef, PartitionMetadata> e: refToMeta.entrySet()) { |
| nameToMeta.put(e.getKey().getName(), e.getValue()); |
| } |
| return nameToMeta; |
| } |
| |
| /** |
| * Load the specified partitions 'prefs' from catalogd. The partitions are made |
| * relative to the given 'hostIndex' before being returned. |
| */ |
| private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCatalogd( |
| TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex, |
| List<PartitionRef> partRefs) throws TException { |
| List<Long> ids = Lists.newArrayListWithCapacity(partRefs.size()); |
| for (PartitionRef partRef: partRefs) { |
| ids.add(((PartitionRefImpl)partRef).getId()); |
| } |
| |
| TGetPartialCatalogObjectRequest req = newReqForTable(table); |
| req.table_info_selector.partition_ids = ids; |
| req.table_info_selector.want_partition_metadata = true; |
| req.table_info_selector.want_partition_files = true; |
| // TODO(todd): fetch incremental stats on-demand for compute-incremental-stats. |
| req.table_info_selector.want_partition_stats = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.table_info != null && resp.table_info.partitions != null, |
| req, "missing partition list result"); |
| checkResponse(resp.table_info.network_addresses != null, |
| req, "missing network addresses"); |
| checkResponse(resp.table_info.partitions.size() == ids.size(), |
| req, "returned %d partitions instead of expected %d", |
| resp.table_info.partitions.size(), ids.size()); |
| addTableMetadatStorageLoadTimeToProfile( |
| resp.table_info.storage_metadata_load_time_ns); |
| Map<PartitionRef, PartitionMetadata> ret = new HashMap<>(); |
| for (int i = 0; i < ids.size(); i++) { |
| PartitionRef partRef = partRefs.get(i); |
| TPartialPartitionInfo part = resp.table_info.partitions.get(i); |
| Partition msPart = part.getHms_partition(); |
| if (msPart == null) { |
| checkResponse(table.msTable_.getPartitionKeysSize() == 0, req, |
| "Should not return a partition with missing HMS partition unless " + |
| "the table is unpartitioned"); |
| msPart = DirectMetaProvider.msTableToPartition(table.msTable_); |
| } |
| |
| // Transform the file descriptors to the caller's index. |
| checkResponse(part.file_descriptors != null, req, "missing file descriptors"); |
| List<FileDescriptor> fds = Lists.newArrayListWithCapacity( |
| part.file_descriptors.size()); |
| for (THdfsFileDesc thriftFd: part.file_descriptors) { |
| FileDescriptor fd = FileDescriptor.fromThrift(thriftFd); |
| // The file descriptors returned via the RPC use host indexes that reference |
| // the 'network_addresses' list in the RPC. However, the caller may have already |
| // loaded some addresses into 'hostIndex'. So, the returned FDs need to be |
| // remapped to point to the caller's 'hostIndex' instead of the list in the |
| // RPC response. |
| fds.add(fd.cloneWithNewHostIndex(resp.table_info.network_addresses, hostIndex)); |
| } |
| PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart, |
| ImmutableList.copyOf(fds), part.getPartition_stats(), |
| part.has_incremental_stats); |
| |
| checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id); |
| |
| PartitionMetadata oldVal = ret.put(partRef, metaImpl); |
| if (oldVal != null) { |
| throw new RuntimeException("catalogd returned partition " + part.id + |
| " multiple times"); |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Load all partitions from 'partitionRefs' that are currently present in the cache. |
| * Any partitions that miss the cache are left unset in the resulting map. |
| * |
| * The FileDescriptors of the resulting partitions are copied and made relative to |
| * the provided hostIndex. |
| */ |
| private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCache( |
| TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex, |
| List<PartitionRef> partitionRefs) throws TException { |
| |
| Map<PartitionRef, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize( |
| partitionRefs.size()); |
| for (PartitionRef ref: partitionRefs) { |
| PartitionRefImpl prefImpl = (PartitionRefImpl)ref; |
| PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId()); |
| PartitionMetadataImpl val = (PartitionMetadataImpl)getIfPresent(cacheKey); |
| if (val == null) continue; |
| |
| // The entry in the cache has file descriptors that are relative to the cache's |
| // host index, rather than the caller's host index. So, we need to transform them. |
| ret.put(ref, val.cloneRelativeToHostIndex(cacheHostIndex_, hostIndex)); |
| } |
| return ret; |
| } |
| |
| |
| /** |
| * Write back the partitions in 'metas' into the cache. The file descriptors in these |
| * partitions must be relative to the 'hostIndex'. |
| */ |
| private void storePartitionsInCache(TableMetaRefImpl table, |
| ListMap<TNetworkAddress> hostIndex, Map<PartitionRef, PartitionMetadata> metas) { |
| for (Map.Entry<PartitionRef, PartitionMetadata> e: metas.entrySet()) { |
| PartitionRefImpl prefImpl = (PartitionRefImpl)e.getKey(); |
| PartitionMetadataImpl metaImpl = (PartitionMetadataImpl)e.getValue(); |
| PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId()); |
| PartitionMetadataImpl cacheVal = metaImpl.cloneRelativeToHostIndex(hostIndex, |
| cacheHostIndex_); |
| cache_.put(cacheKey, cacheVal); |
| } |
| } |
| |
| private static void checkResponse(boolean condition, |
| TGetPartialCatalogObjectRequest req, String msg, Object... args) throws TException { |
| if (condition) return; |
| throw new TException(String.format("Invalid response from catalogd for request " + |
| req.toString() + ": " + msg, args)); |
| } |
| |
| @Override |
| public String loadNullPartitionKeyValue() throws MetaException, TException { |
| return (String) loadWithCaching("null partition key value", |
| GLOBAL_CONFIGURATION_STATS_CATEGORY, |
| NULL_PARTITION_KEY_VALUE_CACHE_KEY, |
| new Callable<String>() { |
| /** Called to load cache for cache misses */ |
| @Override |
| public String call() throws Exception { |
| return directProvider_.loadNullPartitionKeyValue(); |
| } |
| }); |
| } |
| |
| @Override |
| public List<String> loadFunctionNames(final String dbName) throws TException { |
| return loadWithCaching("function names for database " + dbName, |
| FUNCTION_LIST_STATS_CATEGORY, |
| new DbCacheKey(dbName, DbCacheKey.DbInfoType.FUNCTION_NAMES), |
| new Callable<ImmutableList<String>>() { |
| @Override |
| public ImmutableList<String> call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForDb(dbName); |
| req.db_info_selector.want_function_names = true; |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.db_info != null && resp.db_info.function_names != null, |
| req, "missing expected function names"); |
| return ImmutableList.copyOf(resp.db_info.function_names); |
| } |
| }); |
| } |
| |
| @Override |
| public ImmutableList<Function> loadFunction(final String dbName, |
| final String functionName) throws TException { |
| ImmutableList<TFunction> thriftFuncs = loadWithCaching( |
| "function " + dbName + "." + functionName, |
| FUNCTIONS_STATS_CATEGORY, |
| new FunctionsCacheKey(dbName, functionName), |
| new Callable<ImmutableList<TFunction>>() { |
| @Override |
| public ImmutableList<TFunction> call() throws Exception { |
| TGetPartialCatalogObjectRequest req = newReqForFunction(dbName, functionName); |
| TGetPartialCatalogObjectResponse resp = sendRequest(req); |
| checkResponse(resp.functions != null, req, "missing expected function"); |
| return ImmutableList.copyOf(resp.functions); |
| } |
| }); |
| // It may seem wasteful to cache the thrift function objects and then always |
| // convert them back to non-Thrift 'Functions' on every load. However, loading |
| // functions is rare enough and this ensures we don't accidentally leak something |
| // mutable back to the catalog layer. If this turns out to be a problem we can |
| // consider wrapping Function with an immutable 'FeFunction' interface. |
| ImmutableList.Builder<Function> funcs = ImmutableList.builder(); |
| for (TFunction thriftFunc : thriftFuncs) { |
| funcs.add(Function.fromThrift(thriftFunc)); |
| } |
| return funcs.build(); |
| } |
| |
| /** |
| * Invalidate portions of the cache as indicated by the provided request. |
| * |
| * This is called in two scenarios: |
| * |
| * 1) after a user DDL, the catalog returns a list of updated objects. This ensures |
| * that the impalad that issued the DDL can immediately invalidate its cache for the |
| * modified objects and will see the effects immediately. |
| * |
| * 2) catalog topic updates are received via the statestore. These topic updates |
| * indicate that the catalogd representation of the object has changed and therefore |
| * needs to be invalidated in the impalad. |
| */ |
| public synchronized TUpdateCatalogCacheResponse updateCatalogCache( |
| TUpdateCatalogCacheRequest req) { |
| if (req.isSetCatalog_service_id()) { |
| witnessCatalogServiceId(req.catalog_service_id); |
| } |
| |
| // We might see a top-level catalog version number while visiting the objects. If so, |
| // we'll capture it here and process it down at the end after applying all other |
| // objects. |
| Long nextCatalogVersion = null; |
| |
| ObjectUpdateSequencer authObjectSequencer = new ObjectUpdateSequencer(); |
| |
| Pair<Boolean, ByteBuffer> update; |
| while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr)) |
| != null) { |
| boolean isDelete = update.first; |
| TCatalogObject obj = new TCatalogObject(); |
| try { |
| obj.read(new TBinaryProtocol(new TByteBuffer(update.second))); |
| } catch (TException e) { |
| // TODO(todd) include the bad key here! currently the JNI bridge doesn't expose |
| // the key in any way. |
| LOG.warn("Unable to deserialize updated catalog info. Skipping cache " + |
| "invalidation which may result in stale metadata being used at this " + |
| "coordinator.", e); |
| continue; |
| } |
| if (isDelete) { |
| deletedObjectsLog_.addRemovedObject(obj); |
| } else if (deletedObjectsLog_.wasObjectRemovedAfter(obj)) { |
| LOG.trace("Skipping update because a matching object was removed " + |
| "in a later catalog version: {}", obj); |
| continue; |
| } |
| |
| invalidateCacheForObject(obj); |
| |
| // The sequencing of updates to authorization objects is important since they |
| // may be cross-referential. So, just add them to the sequencer which ensures |
| // we handle them in the right order later. |
| if (obj.type == TCatalogObjectType.PRINCIPAL || |
| obj.type == TCatalogObjectType.PRIVILEGE || |
| obj.type == TCatalogObjectType.AUTHZ_CACHE_INVALIDATION) { |
| authObjectSequencer.add(obj, isDelete); |
| } |
| |
| // Handle CATALOG objects. These are sent only via the updates published via |
| // the statestore topic, and not via the synchronous updates returned from DDLs. |
| if (obj.type == TCatalogObjectType.CATALOG) { |
| // The top-level CATALOG object version is used to implement SYNC_DDL. We need |
| // to keep track of this and pass it back to the C++ code in the return value |
| // of this call. This is also used to know when the catalog is ready at |
| // startup. |
| nextCatalogVersion = obj.catalog_version; |
| witnessCatalogServiceId(obj.catalog.catalog_service_id); |
| long resetStartVersion = obj.catalog.last_reset_catalog_version; |
| if (lastResetCatalogVersion_.getAndSet(resetStartVersion) != resetStartVersion) { |
| // Detected a new reset() finishes in Catalogd, clear the cache in case some |
| // tables are skipped in this topic update. |
| cache_.invalidateAll(); |
| } |
| } |
| } |
| |
| for (TCatalogObject obj : authObjectSequencer.getUpdatedObjects()) { |
| updateAuthPolicy(obj, /*isDelete=*/false); |
| } |
| for (TCatalogObject obj : authObjectSequencer.getDeletedObjects()) { |
| updateAuthPolicy(obj, /*isDelete=*/true); |
| } |
| |
| deletedObjectsLog_.garbageCollect(lastSeenCatalogVersion_.get()); |
| |
| // NOTE: it's important to defer setting the new catalog version until the |
| // end of the loop, since the CATALOG object might be one of the first objects |
| // processed, and we don't want to prematurely indicate that we are done processing |
| // the update. |
| if (nextCatalogVersion != null) { |
| lastSeenCatalogVersion_.set(nextCatalogVersion); |
| } |
| |
| // NOTE: the return value is ignored when this function is called by a DDL |
| // operation. |
| synchronized (catalogServiceIdLock_) { |
| // Set catalog_object_version_lower_bound to lastResetCatalogVersion_ + 1. All |
| // catalog objects with catalog version <= lastResetCatalogVersion_ should have |
| // been invalidated. See more comments above the definition of |
| // lastResetCatalogVersion_. |
| return new TUpdateCatalogCacheResponse(catalogServiceId_, |
| lastResetCatalogVersion_.get() + 1, lastSeenCatalogVersion_.get()); |
| } |
| } |
| |
| private void updateAuthPolicy(TCatalogObject obj, boolean isDelete) { |
| LOG.trace("Updating authorization policy: {} isDelete={}", obj, isDelete); |
| switch (obj.type) { |
| case PRINCIPAL: |
| if (!isDelete) { |
| Principal principal = Principal.fromThrift(obj.getPrincipal()); |
| principal.setCatalogVersion(obj.getCatalog_version()); |
| authPolicy_.addPrincipal(principal); |
| } else { |
| authPolicy_.removePrincipalIfLowerVersion(obj.getPrincipal(), |
| obj.getCatalog_version()); |
| } |
| break; |
| case PRIVILEGE: |
| if (!isDelete) { |
| // TODO(todd): duplicate code from ImpaladCatalog. |
| PrincipalPrivilege privilege = |
| PrincipalPrivilege.fromThrift(obj.getPrivilege()); |
| privilege.setCatalogVersion(obj.getCatalog_version()); |
| try { |
| authPolicy_.addPrivilege(privilege); |
| } catch (CatalogException e) { |
| // TODO(todd) it's odd that we swallow this error, both here and in |
| // the original code in ImpaladCatalog. |
| LOG.error("Error adding privilege: ", e); |
| } |
| } else { |
| authPolicy_.removePrivilegeIfLowerVersion(obj.getPrivilege(), |
| obj.getCatalog_version()); |
| } |
| break; |
| case AUTHZ_CACHE_INVALIDATION: |
| if (!isDelete) { |
| AuthzCacheInvalidation authzCacheInvalidation = new AuthzCacheInvalidation( |
| obj.getAuthz_cache_invalidation()); |
| authzCacheInvalidation.setCatalogVersion(obj.getCatalog_version()); |
| authzCacheInvalidation_.add(authzCacheInvalidation); |
| Preconditions.checkState(authzChecker_ != null); |
| authzChecker_.get().invalidateAuthorizationCache(); |
| } else { |
| authzCacheInvalidation_.remove(obj.getAuthz_cache_invalidation() |
| .getMarker_name()); |
| } |
| break; |
| default: |
| throw new IllegalArgumentException("invalid type: " + obj.type); |
| } |
| } |
| |
| /** |
| * Witness a service ID received from the catalog. We can see the service IDs |
| * either from a DDL response (in which case the service ID is part of the RPC |
| * response object) or from a statestore topic update (in which case the service ID |
| * is part of the published CATALOG object). |
| * |
| * If we notice the service ID changed, we need to invalidate our cache. |
| */ |
| private void witnessCatalogServiceId(TUniqueId serviceId) { |
| synchronized (catalogServiceIdLock_) { |
| if (!catalogServiceId_.equals(serviceId)) { |
| if (!catalogServiceId_.equals(Catalog.INITIAL_CATALOG_SERVICE_ID)) { |
| LOG.warn("Detected catalog service restart: service ID changed from " + |
| "{} to {}. Invalidating all cached metadata on this coordinator.", |
| catalogServiceId_, serviceId); |
| } |
| catalogServiceId_ = serviceId; |
| cache_.invalidateAll(); |
| // TODO(todd): we probably need to invalidate the auth policy too. |
| // we are probably better off detecting this at a higher level and |
| // reinstantiating the metaprovider entirely, similar to how ImpaladCatalog |
| // handles this. |
| |
| // TODO(todd): slight race here: a concurrent request from the old catalog |
| // could theoretically be just about to write something back into the cache |
| // after we do the above invalidate. Maybe we would be better off replacing |
| // the whole cache object, or doing a soft barrier here to wait for any |
| // concurrent cache accessors to cycle out. Another option is to associate |
| // the catalog service ID as part of all of the cache keys. |
| // |
| // This is quite unlikely to be an issue in practice, so deferring it to later |
| // clean-up. |
| } |
| } |
| } |
| |
| /** |
| * Invalidate items from the cache in response to seeing an updated catalog object |
| * from the catalogd or getting an error response from another request that indicates |
| * that the object has been removed. |
| */ |
| @VisibleForTesting |
| void invalidateCacheForObject(TCatalogObject obj) { |
| List<String> invalidated = new ArrayList<>(); |
| switch (obj.type) { |
| case TABLE: |
| case VIEW: |
| invalidateCacheForTable(obj.table.db_name, obj.table.tbl_name, invalidated); |
| |
| // Currently adding or dropping a table doesn't send an invalidation for the |
| // DB, so we'll be coarse-grained here and invalidate the DB table list when |
| // any table change happens. It's relatively cheap to re-fetch this. |
| invalidateCacheForDb(obj.table.db_name, |
| ImmutableList.of(DbCacheKey.DbInfoType.TABLE_NAMES), |
| invalidated); |
| break; |
| case FUNCTION: |
| // Same as above: if we see a function, it might be new or deleted and we should |
| // refresh the list of functions in the DB to be safe. |
| invalidateCacheForDb(obj.fn.name.db_name, |
| ImmutableList.of(DbCacheKey.DbInfoType.FUNCTION_NAMES), |
| invalidated); |
| invalidateCacheForFunction(obj.fn.name.db_name, obj.fn.name.function_name, |
| invalidated); |
| if (obj.fn.hdfs_location != null) { |
| // After the coordinator creates a function, it will also receive an invalidation |
| // update for this function from the statestored's broadcast. We shouldn't remove |
| // the libcache entry for this case, just mark it as needs refresh. LibCache will |
| // refresh the cached file if its mtime changes in HDFS. |
| FeSupport.NativeLibCacheSetNeedsRefresh(obj.fn.hdfs_location); |
| } |
| break; |
| case DATABASE: |
| if (cache_.asMap().remove(DB_LIST_CACHE_KEY) != null) { |
| invalidated.add("list of database names"); |
| } |
| invalidateCacheForDb(obj.db.db_name, ImmutableList.of( |
| DbCacheKey.DbInfoType.TABLE_NAMES, |
| DbCacheKey.DbInfoType.HMS_METADATA, |
| DbCacheKey.DbInfoType.FUNCTION_NAMES), invalidated); |
| break; |
| |
| default: |
| break; |
| } |
| if (!invalidated.isEmpty()) { |
| LOG.debug("Invalidated objects in cache: {}", invalidated); |
| } |
| } |
| |
| /** |
| * Invalidate cached metadata of the given types for the given database. If anything |
| * was invalidated, adds a human-readable string to 'invalidated' indicating the |
| * invalidated metadata. |
| */ |
| private void invalidateCacheForDb(String dbName, Iterable<DbCacheKey.DbInfoType> types, |
| List<String> invalidated) { |
| // TODO(todd) check whether we need to lower-case/canonicalize dbName? |
| for (DbCacheKey.DbInfoType type: types) { |
| DbCacheKey key = new DbCacheKey(dbName, type); |
| if (cache_.asMap().remove(key) != null) { |
| invalidated.add(type + " for DB " + dbName); |
| } |
| } |
| } |
| |
| /** |
| * Invalidate cached metadata for the given table. If anything was invalidated, adds |
| * a human-readable string to 'invalidated' indicating the invalidated metadata. |
| */ |
| private void invalidateCacheForTable(String dbName, String tblName, |
| List<String> invalidated) { |
| // TODO(todd) check whether we need to lower-case/canonicalize dbName and tblName? |
| TableCacheKey key = new TableCacheKey(dbName, tblName); |
| if (cache_.asMap().remove(key) != null) { |
| invalidated.add("table " + dbName + "." + tblName); |
| } |
| } |
| |
| /** |
| * Invalidate cached metadata for the given function. If anything was invalidated, adds |
| * a human-readable string to 'invalidated' indicating the invalidated metadata. |
| */ |
| private void invalidateCacheForFunction(String dbName, String functionName, |
| List<String> invalidated) { |
| // TODO(todd) check whether we need to lower-case/canonicalize names? |
| FunctionsCacheKey key = new FunctionsCacheKey(dbName, functionName); |
| if (cache_.asMap().remove(key) != null) { |
| invalidated.add("function " + dbName + "." + functionName); |
| } |
| } |
| |
| /** |
| * Reference to a partition within a table. We remember the partition's ID and pass |
| * that back to the catalog in subsequent requests back to fetch the details of the |
| * partition, since the ID is smaller than the name and provides a unique (not-reused) |
| * identifier. |
| */ |
| @Immutable |
| private static class PartitionRefImpl implements PartitionRef { |
| @SuppressWarnings("Immutable") // Thrift objects are mutable, but we won't mutate it. |
| private final TPartialPartitionInfo info_; |
| |
| public PartitionRefImpl(TPartialPartitionInfo p) { |
| this.info_ = Preconditions.checkNotNull(p); |
| } |
| |
| @Override |
| public String getName() { |
| return info_.getName(); |
| } |
| |
| private long getId() { |
| return info_.id; |
| } |
| } |
| |
| public static class PartitionMetadataImpl implements PartitionMetadata { |
| private final Partition msPartition_; |
| private final ImmutableList<FileDescriptor> fds_; |
| private final byte[] partitionStats_; |
| private final boolean hasIncrementalStats_; |
| |
| public PartitionMetadataImpl(Partition msPartition, ImmutableList<FileDescriptor> fds, |
| byte[] partitionStats, boolean hasIncrementalStats) { |
| this.msPartition_ = Preconditions.checkNotNull(msPartition); |
| this.fds_ = fds; |
| this.partitionStats_ = partitionStats; |
| this.hasIncrementalStats_ = hasIncrementalStats; |
| } |
| |
| /** |
| * Clone this metadata object, but make it relative to 'dstIndex' instead of |
| * 'origIndex'. |
| */ |
| public PartitionMetadataImpl cloneRelativeToHostIndex( |
| ListMap<TNetworkAddress> origIndex, |
| ListMap<TNetworkAddress> dstIndex) { |
| List<FileDescriptor> fds = Lists.newArrayListWithCapacity(fds_.size()); |
| for (FileDescriptor fd: fds_) { |
| fds.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex)); |
| } |
| return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds), |
| partitionStats_, hasIncrementalStats_); |
| } |
| |
| @Override |
| public Partition getHmsPartition() { |
| return msPartition_; |
| } |
| |
| @Override |
| public ImmutableList<FileDescriptor> getFileDescriptors() { |
| return fds_; |
| } |
| |
| @Override |
| public byte[] getPartitionStats() { return partitionStats_; } |
| |
| @Override |
| public boolean hasIncrementalStats() { return hasIncrementalStats_; } |
| } |
| |
| /** |
| * A reference to a table that has been looked up, allowing callers to fetch further |
| * detailed information. This is is more extensive than just the table name so that |
| * we can provide a consistency check that the catalog version doesn't change in |
| * between calls. |
| */ |
| private static class TableMetaRefImpl implements TableMetaRef { |
| private final String dbName_; |
| private final String tableName_; |
| // SQL constraints for the table, populated during loadTable(). |
| private final List<SQLPrimaryKey> primaryKeys_; |
| private final List<SQLForeignKey> foreignKeys_; |
| |
| /** |
| * Stash the HMS Table object since we need this in order to handle some strange |
| * behavior whereby the catalogd returns a Partition with no HMS partition object |
| * in the case of unpartitioned tables. |
| */ |
| private final Table msTable_; |
| |
| /** |
| * The version of the table when we first loaded it. Subsequent requests about |
| * the table are verified against this version. |
| */ |
| private final long catalogVersion_; |
| |
| public TableMetaRefImpl(String dbName, String tableName, |
| Table msTable, long catalogVersion, List<SQLPrimaryKey> primaryKeys, |
| List<SQLForeignKey> foreignKeys) { |
| this.dbName_ = dbName; |
| this.tableName_ = tableName; |
| this.msTable_ = msTable; |
| this.catalogVersion_ = catalogVersion; |
| this.primaryKeys_ = primaryKeys; |
| this.foreignKeys_ = foreignKeys; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("TableMetaRef %s.%s@%d", dbName_, tableName_, catalogVersion_); |
| } |
| } |
| |
| /** |
| * Base class for cache key for a named item within a database (eg table or function). |
| */ |
| private static class DbChildCacheKey { |
| final String dbName_; |
| final String childName_; |
| |
| protected DbChildCacheKey(String dbName, String childName) { |
| this.dbName_ = dbName; |
| this.childName_ = childName; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(dbName_, childName_, getClass()); |
| } |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == null || !obj.getClass().equals(getClass())) { |
| return false; |
| } |
| DbChildCacheKey other = (DbChildCacheKey)obj; |
| return childName_.equals(other.childName_) && |
| dbName_.equals(other.dbName_); |
| } |
| } |
| |
| /** |
| * Base class for cache keys related to a specific table. Such keys are explicitly |
| * invalidated by 'invalidateCacheForTable' above. |
| */ |
| private static class TableCacheKey extends DbChildCacheKey { |
| TableCacheKey(String dbName, String tableName) { |
| super(dbName, tableName); |
| } |
| } |
| |
| /** |
| * Cache key for a the set of overloads of a function given a name. |
| * Invalidated by 'invalidateCacheForFunction' above. |
| * Values are ImmutableList<TFunction>. |
| */ |
| private static class FunctionsCacheKey extends DbChildCacheKey { |
| FunctionsCacheKey(String dbName, String funcName) { |
| super(dbName, funcName); |
| } |
| } |
| |
| /** |
| * Base class for cache keys that are tied to a particular version of a table. |
| * These keys are never explicitly invalidated. Instead, we rely on the fact that, |
| * when the table is updated, it has a new version number. This results in making |
| * previous entries for earlier versions of the table essentially unreachable in the |
| * cache. They will age out over time as no queries access them. |
| */ |
| private static class VersionedTableCacheKey extends TableCacheKey { |
| /** |
| * The catalog version number of the Table object. Including the version number in |
| * the cache key ensures that, if the version number changes, any dependent entities |
| * are "automatically" invalidated. |
| */ |
| final long version_; |
| |
| VersionedTableCacheKey(TableMetaRefImpl table) { |
| super(table.dbName_, table.tableName_); |
| version_ = table.catalogVersion_; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(super.hashCode(), version_); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == null || !(obj.getClass().equals(getClass()))) { |
| return false; |
| } |
| VersionedTableCacheKey other = (VersionedTableCacheKey)obj; |
| return super.equals(obj) && version_ == other.version_; |
| } |
| } |
| |
| /** |
| * Cache key for an entry storing column statistics. |
| * |
| * Values for these keys are 'ColumnStatisticsObj' objects. |
| */ |
| private static class ColStatsCacheKey extends VersionedTableCacheKey { |
| private final String colName_; |
| |
| public ColStatsCacheKey(TableMetaRefImpl table, String colName) { |
| super(table); |
| colName_ = colName; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(super.hashCode(), colName_); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == null || !(obj instanceof ColStatsCacheKey)) { |
| return false; |
| } |
| ColStatsCacheKey other = (ColStatsCacheKey)obj; |
| return super.equals(obj) && colName_.equals(other.colName_); |
| } |
| } |
| |
| /** |
| * Cache key for the partition list of a table. |
| * |
| * Values for these keys are 'List<PartitionRefImpl>'. |
| */ |
| private static class PartitionListCacheKey extends VersionedTableCacheKey { |
| PartitionListCacheKey(TableMetaRefImpl table) { |
| super(table); |
| } |
| } |
| |
| /** |
| * Key for caching information about a single partition. |
| * |
| * TODO(todd): currently this inherits from VersionedTableCacheKey. This means that, if |
| * a table's version number changes, all of its partitions must be reloaded. However, |
| * since partition IDs are globally unique within a catalogd instance, we could |
| * optimize this to just key based on the partition ID. However, currently, there are |
| * some cases where partitions are mutated in place rather than replaced with a new ID. |
| * We need to eliminate those or add a partition sequence number before we can make |
| * this optimization. |
| */ |
| private static class PartitionCacheKey extends VersionedTableCacheKey { |
| private final long partId_; |
| |
| PartitionCacheKey(TableMetaRefImpl table, long partId) { |
| super(table); |
| partId_ = partId; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(super.hashCode(), partId_); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == null || !(obj instanceof PartitionCacheKey)) { |
| return false; |
| } |
| PartitionCacheKey other = (PartitionCacheKey)obj; |
| return super.equals(obj) && partId_ == other.partId_; |
| } |
| } |
| |
| /** |
| * Cache key for metadata about databases. |
| */ |
| private static class DbCacheKey { |
| static enum DbInfoType { |
| /** Cache the HMS Database object */ |
| HMS_METADATA, |
| /** Cache an ImmutableList<String> for table names within the DB */ |
| TABLE_NAMES, |
| /** Cache an ImmutableList<String> for function names within the DB */ |
| FUNCTION_NAMES |
| } |
| private final String dbName_; |
| private final DbInfoType type_; |
| |
| DbCacheKey(String dbName, DbInfoType type) { |
| dbName_ = Preconditions.checkNotNull(dbName); |
| type_ = Preconditions.checkNotNull(type); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(getClass(), dbName_, type_); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj == null) return false; |
| if (getClass() != obj.getClass()) return false; |
| DbCacheKey other = (DbCacheKey) obj; |
| return dbName_.equals(other.dbName_) && type_ == other.type_; |
| } |
| } |
| |
| @VisibleForTesting |
| static class SizeOfWeigher implements Weigher<Object, Object> { |
| // Bypass flyweight objects like small boxed integers, Boolean.TRUE, enums, etc. |
| private static final boolean BYPASS_FLYWEIGHT = true; |
| // Cache the reflected sizes of classes seen. |
| private static final boolean CACHE_SIZES = true; |
| |
| private static SizeOf SIZEOF = SizeOf.newInstance(BYPASS_FLYWEIGHT, CACHE_SIZES); |
| |
| private static final int BYTES_PER_WORD = 8; // Assume 64-bit VM. |
| // Guava cache overhead based on: |
| // http://code-o-matic.blogspot.com/2012/02/updated-memory-cost-per-javaguava.html |
| private static final int OVERHEAD_PER_ENTRY = |
| 12 * BYTES_PER_WORD + // base cost per entry |
| 4 * BYTES_PER_WORD; // for use of 'maximumSize()' |
| |
| @Override |
| public int weigh(Object key, Object value) { |
| long size = SIZEOF.deepSizeOf(key, value) + OVERHEAD_PER_ENTRY; |
| if (size > Integer.MAX_VALUE) { |
| return Integer.MAX_VALUE; |
| } |
| return (int)size; |
| } |
| } |
| } |