| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hive.ql.cache.results; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Supplier; |
| import java.util.stream.Stream; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.ContentSummary; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hive.common.metrics.common.Metrics; |
| import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; |
| import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; |
| import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; |
| import org.apache.hadoop.hive.common.ValidTxnWriteIdList; |
| import org.apache.hadoop.hive.common.ValidWriteIdList; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.NotificationEvent; |
| import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; |
| import org.apache.hadoop.hive.ql.hooks.Entity.Type; |
| import org.apache.hadoop.hive.ql.hooks.ReadEntity; |
| import org.apache.hadoop.hive.ql.io.AcidUtils; |
| import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; |
| import org.apache.hadoop.hive.ql.metadata.Table; |
| import org.apache.hadoop.hive.ql.metadata.events.EventConsumer; |
| import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; |
| import org.apache.hadoop.hive.ql.parse.TableAccessInfo; |
| import org.apache.hadoop.hive.ql.plan.FetchWork; |
| import org.apache.hadoop.hive.ql.plan.HiveOperation; |
| import org.apache.hive.common.util.TxnIdUtils; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A class to handle management and lookup of cached Hive query results. |
| */ |
| public final class QueryResultsCache { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(QueryResultsCache.class); |
| |
| public static class LookupInfo { |
| private final String queryText; |
| private final Supplier<ValidTxnWriteIdList> txnWriteIdListProvider; |
| private final Set<Long> tableIds; |
| |
| public LookupInfo(String queryText, Supplier<ValidTxnWriteIdList> txnWriteIdListProvider, Set<Long> tableIds) { |
| super(); |
| this.queryText = queryText; |
| this.txnWriteIdListProvider = txnWriteIdListProvider; |
| this.tableIds = tableIds; |
| } |
| |
| public String getQueryText() { |
| return queryText; |
| } |
| } |
| |
| public static class QueryInfo { |
| private long queryTime; |
| private LookupInfo lookupInfo; |
| private HiveOperation hiveOperation; |
| private List<FieldSchema> resultSchema; |
| private TableAccessInfo tableAccessInfo; |
| private ColumnAccessInfo columnAccessInfo; |
| private Set<ReadEntity> inputs; |
| |
| public QueryInfo( |
| long queryTime, |
| LookupInfo lookupInfo, |
| HiveOperation hiveOperation, |
| List<FieldSchema> resultSchema, |
| TableAccessInfo tableAccessInfo, |
| ColumnAccessInfo columnAccessInfo, |
| Set<ReadEntity> inputs) { |
| this.queryTime = queryTime; |
| this.lookupInfo = lookupInfo; |
| this.hiveOperation = hiveOperation; |
| this.resultSchema = resultSchema; |
| this.tableAccessInfo = tableAccessInfo; |
| this.columnAccessInfo = columnAccessInfo; |
| this.inputs = inputs; |
| } |
| |
| public LookupInfo getLookupInfo() { |
| return lookupInfo; |
| } |
| |
| public void setLookupInfo(LookupInfo lookupInfo) { |
| this.lookupInfo = lookupInfo; |
| } |
| |
| public HiveOperation getHiveOperation() { |
| return hiveOperation; |
| } |
| |
| public void setHiveOperation(HiveOperation hiveOperation) { |
| this.hiveOperation = hiveOperation; |
| } |
| |
| public List<FieldSchema> getResultSchema() { |
| return resultSchema; |
| } |
| |
| public void setResultSchema(List<FieldSchema> resultSchema) { |
| this.resultSchema = resultSchema; |
| } |
| |
| public TableAccessInfo getTableAccessInfo() { |
| return tableAccessInfo; |
| } |
| |
| public void setTableAccessInfo(TableAccessInfo tableAccessInfo) { |
| this.tableAccessInfo = tableAccessInfo; |
| } |
| |
| public ColumnAccessInfo getColumnAccessInfo() { |
| return columnAccessInfo; |
| } |
| |
| public void setColumnAccessInfo(ColumnAccessInfo columnAccessInfo) { |
| this.columnAccessInfo = columnAccessInfo; |
| } |
| |
| public Set<ReadEntity> getInputs() { |
| return inputs; |
| } |
| |
| public void setInputs(Set<ReadEntity> inputs) { |
| this.inputs = inputs; |
| } |
| |
| public long getQueryTime() { |
| return queryTime; |
| } |
| |
| public void setQueryTime(long queryTime) { |
| this.queryTime = queryTime; |
| } |
| } |
| |
| public enum CacheEntryStatus { |
| VALID, INVALID, PENDING |
| } |
| |
| public static class CacheEntry { |
| private QueryInfo queryInfo; |
| private FetchWork fetchWork; |
| private Path cachedResultsPath; |
| private Set<FileStatus> cachedResultPaths; |
| |
| // Cache administration |
| private long size; |
| private AtomicInteger readers = new AtomicInteger(0); |
| private ScheduledFuture<?> invalidationFuture = null; |
| private volatile CacheEntryStatus status = CacheEntryStatus.PENDING; |
| private ValidTxnWriteIdList txnWriteIdList; |
| |
| public void releaseReader() { |
| int readerCount = 0; |
| synchronized (this) { |
| readerCount = readers.decrementAndGet(); |
| } |
| LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount); |
| |
| cleanupIfNeeded(); |
| } |
| |
| public String toString() { |
| return String.format("CacheEntry#%s query: [ %s ], status: %s, location: %s, size: %d", |
| System.identityHashCode(this), getQueryInfo().getLookupInfo().getQueryText(), status, |
| cachedResultsPath, size); |
| } |
| |
| public boolean addReader() { |
| boolean added = false; |
| int readerCount = 0; |
| synchronized (this) { |
| if (status == CacheEntryStatus.VALID) { |
| readerCount = readers.incrementAndGet(); |
| added = true; |
| } |
| } |
| LOG.debug("addReader: entry: {}, readerCount: {}, added: {}", this, readerCount, added); |
| return added; |
| } |
| |
| private int numReaders() { |
| return readers.get(); |
| } |
| |
| private void invalidate() { |
| LOG.info("Invalidating cache entry: {}", this); |
| CacheEntryStatus prevStatus = setStatus(CacheEntryStatus.INVALID); |
| if (prevStatus == CacheEntryStatus.VALID) { |
| if (invalidationFuture != null) { |
| // The cache entry has just been invalidated, no need for the scheduled invalidation. |
| invalidationFuture.cancel(false); |
| } |
| cleanupIfNeeded(); |
| decrementMetric(MetricsConstant.QC_VALID_ENTRIES); |
| } else if (prevStatus == CacheEntryStatus.PENDING) { |
| // Need to notify any queries waiting on the change from pending status. |
| synchronized (this) { |
| this.notifyAll(); |
| } |
| decrementMetric(MetricsConstant.QC_PENDING_FAILS); |
| } |
| } |
| |
| public CacheEntryStatus getStatus() { |
| return status; |
| } |
| |
| private CacheEntryStatus setStatus(CacheEntryStatus newStatus) { |
| synchronized (this) { |
| CacheEntryStatus oldStatus = status; |
| status = newStatus; |
| return oldStatus; |
| } |
| } |
| |
| private void cleanupIfNeeded() { |
| if (status == CacheEntryStatus.INVALID && readers.get() <= 0) { |
| QueryResultsCache.cleanupEntry(this); |
| } |
| } |
| |
| private String getQueryText() { |
| return getQueryInfo().getLookupInfo().getQueryText(); |
| } |
| |
| public FetchWork getFetchWork() { |
| // FetchWork's sink is used to hold results, so each query needs a separate copy of FetchWork |
| FetchWork fetch = new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit()); |
| fetch.setCachedResult(true); |
| fetch.setFilesToFetch(this.cachedResultPaths); |
| return fetch; |
| } |
| |
| public QueryInfo getQueryInfo() { |
| return queryInfo; |
| } |
| |
| public Path getCachedResultsPath() { |
| return cachedResultsPath; |
| } |
| |
| /** |
| * Wait for the cache entry to go from PENDING to VALID status. |
| * @return true if the cache entry successfully changed to VALID status, |
| * false if the status changes from PENDING to INVALID |
| */ |
| public boolean waitForValidStatus() { |
| LOG.info("Waiting on pending cacheEntry: {}", this); |
| long timeout = 1000; |
| |
| long startTime = System.nanoTime(); |
| long endTime; |
| |
| while (true) { |
| try { |
| switch (status) { |
| case VALID: |
| endTime = System.nanoTime(); |
| incrementMetric(MetricsConstant.QC_PENDING_SUCCESS_WAIT_TIME, |
| TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS)); |
| return true; |
| case INVALID: |
| endTime = System.nanoTime(); |
| incrementMetric(MetricsConstant.QC_PENDING_FAILS_WAIT_TIME, |
| TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS)); |
| return false; |
| case PENDING: |
| // Status has not changed, continue waiting. |
| break; |
| } |
| |
| synchronized (this) { |
| this.wait(timeout); |
| } |
| } catch (InterruptedException err) { |
| Thread.currentThread().interrupt(); |
| return false; |
| } |
| } |
| } |
| |
| public Stream<String> getTableNames() { |
| return queryInfo.getInputs().stream() |
| .filter(readEntity -> readEntity.getType() == Type.TABLE) |
| .map(readEntity -> readEntity.getTable().getFullyQualifiedName()); |
| } |
| } |
| |
| // Allow lookup by query string |
| private final Map<String, Set<CacheEntry>> queryMap = new HashMap<String, Set<CacheEntry>>(); |
| |
| // LRU. Could also implement LRU as a doubly linked list if CacheEntry keeps its node. |
| // Use synchronized map since even read actions cause the lru to get updated. |
| private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap( |
| new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true)); |
| |
| // Lookup of cache entries by table used in the query, for cache invalidation. |
| private final Map<String, Set<CacheEntry>> tableToEntryMap = new HashMap<>(); |
| |
| private final HiveConf conf; |
| private Path cacheDirPath; |
| private Path zeroRowsPath; |
| private long cacheSize = 0; |
| private long maxCacheSize; |
| private long maxEntrySize; |
| private long maxEntryLifetime; |
| private ReadWriteLock rwLock = new ReentrantReadWriteLock(); |
| private ScheduledFuture<?> invalidationPollFuture; |
| |
| private QueryResultsCache(HiveConf configuration) throws IOException { |
| this.conf = configuration; |
| |
| // Set up cache directory |
| Path rootCacheDir = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY)); |
| LOG.info("Initializing query results cache at {}", rootCacheDir); |
| |
| String currentCacheDirName = "results-" + UUID.randomUUID().toString(); |
| cacheDirPath = new Path(rootCacheDir, currentCacheDirName); |
| FileSystem fs = cacheDirPath.getFileSystem(conf); |
| FsPermission fsPermission = new FsPermission("700"); |
| fs.mkdirs(cacheDirPath, fsPermission); |
| |
| // Create non-existent path for 0-row results |
| zeroRowsPath = new Path(cacheDirPath, "dummy_zero_rows"); |
| |
| // Results cache directory should be cleaned up at process termination. |
| fs.deleteOnExit(cacheDirPath); |
| |
| maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE); |
| maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE); |
| maxEntryLifetime = conf.getTimeVar( |
| HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME, |
| TimeUnit.MILLISECONDS); |
| |
| LOG.info("Query results cache: cacheDirectory {}, maxCacheSize {}, maxEntrySize {}, maxEntryLifetime {}", |
| cacheDirPath, maxCacheSize, maxEntrySize, maxEntryLifetime); |
| } |
| |
| private static final AtomicBoolean inited = new AtomicBoolean(false); |
| private static QueryResultsCache instance; |
| |
| public static void initialize(HiveConf conf) throws IOException { |
| if (!inited.getAndSet(true)) { |
| try { |
| instance = new QueryResultsCache(conf); |
| |
| Metrics metrics = MetricsFactory.getInstance(); |
| if (metrics != null) { |
| registerMetrics(metrics, instance); |
| } |
| } catch (Exception err) { |
| inited.set(false); |
| throw err; |
| } |
| } |
| } |
| |
| public static QueryResultsCache getInstance() { |
| return instance; |
| } |
| |
| public Path getCacheDirPath() { |
| return cacheDirPath; |
| } |
| |
| /** |
| * Check if the cache contains an entry for the requested LookupInfo. |
| * @param request |
| * @return The cached result if there is a match in the cache, or null if no match is found. |
| */ |
| public CacheEntry lookup(LookupInfo request) { |
| CacheEntry result = null; |
| |
| LOG.debug("QueryResultsCache lookup for query: {}", request.queryText); |
| |
| boolean foundPending = false; |
| // Cannot entries while we currently hold read lock, so keep track of them to delete later. |
| Set<CacheEntry> entriesToRemove = new HashSet<CacheEntry>(); |
| Lock readLock = rwLock.readLock(); |
| try { |
| // Note: ReentrantReadWriteLock does not allow upgrading a read lock to a write lock. |
| // Care must be taken while under read lock, to make sure we do not perform any actions |
| // which attempt to take a write lock. |
| readLock.lock(); |
| Set<CacheEntry> candidates = queryMap.get(request.queryText); |
| if (candidates != null) { |
| CacheEntry pendingResult = null; |
| for (CacheEntry candidate : candidates) { |
| if (entryMatches(request, candidate, entriesToRemove)) { |
| CacheEntryStatus entryStatus = candidate.status; |
| if (entryStatus == CacheEntryStatus.VALID) { |
| result = candidate; |
| break; |
| } else if (entryStatus == CacheEntryStatus.PENDING && pendingResult == null) { |
| pendingResult = candidate; |
| } |
| } |
| } |
| |
| // Try to find valid entry, but settle for pending entry if that is all we have. |
| if (result == null && pendingResult != null) { |
| result = pendingResult; |
| foundPending = true; |
| } |
| |
| if (result != null) { |
| lru.get(result); // Update LRU |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| |
| // Now that we have exited read lock it is safe to remove any invalid entries. |
| for (CacheEntry invalidEntry : entriesToRemove) { |
| removeEntry(invalidEntry); |
| } |
| |
| LOG.debug("QueryResultsCache lookup result: {}", result); |
| incrementMetric(MetricsConstant.QC_LOOKUPS); |
| if (result != null) { |
| if (foundPending) { |
| incrementMetric(MetricsConstant.QC_PENDING_HITS); |
| } else { |
| incrementMetric(MetricsConstant.QC_VALID_HITS); |
| } |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Add an entry to the cache. |
| * The new entry will be in PENDING state and not usable setEntryValid() is called on the entry. |
| * @param queryInfo |
| * @return |
| */ |
| public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteIdList) { |
| // Create placeholder entry with PENDING state. |
| String queryText = queryInfo.getLookupInfo().getQueryText(); |
| CacheEntry addedEntry = new CacheEntry(); |
| addedEntry.queryInfo = queryInfo; |
| addedEntry.txnWriteIdList = txnWriteIdList; |
| |
| Lock writeLock = rwLock.writeLock(); |
| try { |
| writeLock.lock(); |
| |
| LOG.info("Adding placeholder cache entry for query '{}'", queryText); |
| |
| // Add the entry to the cache structures while under write lock. |
| addToEntryMap(queryMap, queryText, addedEntry); |
| lru.put(addedEntry, addedEntry); |
| // Index of entries by table usage. |
| addedEntry.getTableNames() |
| .forEach(tableName -> addToEntryMap(tableToEntryMap, tableName, addedEntry)); |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| return addedEntry; |
| } |
| |
| /** |
| * Updates a pending cache entry with a FetchWork result from a finished query. |
| * If successful the cache entry will be set to valid status and be usable for cached queries. |
| * Important: Adding the entry to the cache will increment the reader count for the cache entry. |
| * CacheEntry.releaseReader() should be called when the caller is done with the cache entry. |
| * @param cacheEntry |
| * @param fetchWork |
| * @return |
| */ |
| public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { |
| Path queryResultsPath = null; |
| Path cachedResultsPath = null; |
| |
| try { |
| // if we are here file sink op should have created files to fetch from |
| assert(fetchWork.getFilesToFetch() != null ); |
| |
| boolean requiresCaching = true; |
| queryResultsPath = fetchWork.getTblDir(); |
| FileSystem resultsFs = queryResultsPath.getFileSystem(conf); |
| |
| long resultSize = 0; |
| for(FileStatus fs:fetchWork.getFilesToFetch()) { |
| if(resultsFs.exists(fs.getPath())) { |
| resultSize += fs.getLen(); |
| } else { |
| // No actual result directory, no need to cache anything. |
| requiresCaching = false; |
| break; |
| } |
| } |
| |
| if (!shouldEntryBeAdded(cacheEntry, resultSize)) { |
| return false; |
| } |
| |
| // Synchronize on the cache entry so that no one else can invalidate this entry |
| // while we are in the process of setting it to valid. |
| synchronized (cacheEntry) { |
| if (cacheEntry.getStatus() == CacheEntryStatus.INVALID) { |
| // Entry either expired, or was invalidated due to table updates |
| return false; |
| } |
| |
| if (requiresCaching) { |
| cacheEntry.cachedResultPaths = new HashSet<>(); |
| for(FileStatus fs:fetchWork.getFilesToFetch()) { |
| cacheEntry.cachedResultPaths.add(fs); |
| } |
| LOG.info("Cached query result paths located at {} (size {}) for query '{}'", |
| queryResultsPath, resultSize, cacheEntry.getQueryText()); |
| } |
| |
| // Create a new FetchWork to reference the new cache location. |
| FetchWork fetchWorkForCache = |
| new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit()); |
| fetchWorkForCache.setCachedResult(true); |
| fetchWorkForCache.setFilesToFetch(fetchWork.getFilesToFetch()); |
| cacheEntry.fetchWork = fetchWorkForCache; |
| //cacheEntry.cachedResultsPath = cachedResultsPath; |
| cacheEntry.size = resultSize; |
| this.cacheSize += resultSize; |
| |
| cacheEntry.setStatus(CacheEntryStatus.VALID); |
| // Mark this entry as being in use. Caller will need to release later. |
| cacheEntry.addReader(); |
| |
| scheduleEntryInvalidation(cacheEntry); |
| |
| // Notify any queries waiting on this cacheEntry to become valid. |
| cacheEntry.notifyAll(); |
| } |
| |
| incrementMetric(MetricsConstant.QC_VALID_ENTRIES); |
| incrementMetric(MetricsConstant.QC_TOTAL_ENTRIES_ADDED); |
| } catch (Exception err) { |
| String queryText = cacheEntry.getQueryText(); |
| LOG.error("Failed to create cache entry for query results for query: " + queryText, err); |
| cacheEntry.size = 0; |
| cacheEntry.cachedResultsPath = null; |
| // Invalidate the entry. Rely on query cleanup to remove from lookup. |
| cacheEntry.invalidate(); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| public void clear() { |
| Lock writeLock = rwLock.writeLock(); |
| try { |
| writeLock.lock(); |
| LOG.info("Clearing the results cache"); |
| CacheEntry[] allEntries = null; |
| synchronized (lru) { |
| allEntries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY); |
| } |
| for (CacheEntry entry : allEntries) { |
| try { |
| removeEntry(entry); |
| } catch (Exception err) { |
| LOG.error("Error removing cache entry " + entry, err); |
| } |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public long getSize() { |
| Lock readLock = rwLock.readLock(); |
| try { |
| readLock.lock(); |
| return cacheSize; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void notifyTableChanged(String dbName, String tableName, long updateTime) { |
| LOG.debug("Table changed: {}.{}, at {}", dbName, tableName, updateTime); |
| // Invalidate all cache entries using this table. |
| List<CacheEntry> entriesToInvalidate = null; |
| rwLock.writeLock().lock(); |
| try { |
| String key = (dbName.toLowerCase() + "." + tableName.toLowerCase()); |
| Set<CacheEntry> entriesForTable = tableToEntryMap.get(key); |
| if (entriesForTable != null) { |
| // Possible concurrent modification issues if we try to remove cache entries while |
| // traversing the cache structures. Save the entries to remove in a separate list. |
| entriesToInvalidate = new ArrayList<>(entriesForTable); |
| } |
| if (entriesToInvalidate != null) { |
| for (CacheEntry entry : entriesToInvalidate) { |
| // Ignore updates that occured before this cached query was created. |
| if (entry.getQueryInfo().getQueryTime() <= updateTime) { |
| removeEntry(entry); |
| } |
| } |
| } |
| } finally { |
| rwLock.writeLock().unlock(); |
| } |
| } |
| |
| private static final int INITIAL_LRU_SIZE = 16; |
| private static final float LRU_LOAD_FACTOR = 0.75f; |
| private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {}; |
| |
| /** |
| * Check that the cache entry matches the lookupInfo. |
| * @param lookupInfo |
| * @param entry |
| * @param entriesToRemove Set of entries to be removed after exiting read lock section. |
| * If the entry is found to be invalid it will be added to this set. |
| * @return |
| */ |
| private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry, Set<CacheEntry> entriesToRemove) { |
| QueryInfo queryInfo = entry.getQueryInfo(); |
| |
| for (ReadEntity readEntity : queryInfo.getInputs()) { |
| if (readEntity.getType() == Type.TABLE) { |
| Table tableUsed = readEntity.getTable(); |
| // we want that the lookupInfo.tableIds are all covered by the table ids of the cache entry |
| // the query is used as cache key, so the lookup and the entry should use the same number of tables |
| // so it is enough to check whether every cache table id is contained in the lookup |
| long id = tableUsed.getTTable().getId(); |
| if (!lookupInfo.tableIds.contains(id)) { |
| LOG.debug("Cache entry contains a table (tableId={}) that is not present in the query", id); |
| return false; |
| } |
| |
| // Check that the tables used do not resolve to temp tables. |
| Map<String, Table> tempTables = |
| SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName(), tableUsed.getTableName()); |
| if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) { |
| LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.", |
| tableUsed.getTableName()); |
| return false; |
| } |
| |
| // Has the table changed since the query was cached? |
| // For transactional tables, can compare the table writeIDs of the current/cached query. |
| if (AcidUtils.isTransactionalTable(tableUsed)) { |
| boolean writeIdCheckPassed = false; |
| String tableName = tableUsed.getFullyQualifiedName(); |
| ValidTxnWriteIdList currentTxnWriteIdList = lookupInfo.txnWriteIdListProvider.get(); |
| if (currentTxnWriteIdList == null) { |
| LOG.warn("Current query's txnWriteIdList is null!"); |
| return false; |
| } |
| if (entry.txnWriteIdList == null) { |
| LOG.warn("Cache entry's txnWriteIdList is null!"); |
| return false; |
| } |
| ValidWriteIdList currentWriteIdForTable = |
| currentTxnWriteIdList.getTableValidWriteIdList(tableName); |
| ValidWriteIdList cachedWriteIdForTable = entry.txnWriteIdList.getTableValidWriteIdList(tableName); |
| |
| LOG.debug("Checking writeIds for table {}: currentWriteIdForTable {}, cachedWriteIdForTable {}", |
| tableName, currentWriteIdForTable, cachedWriteIdForTable); |
| if (currentWriteIdForTable != null && cachedWriteIdForTable != null) { |
| if (TxnIdUtils.checkEquivalentWriteIds(currentWriteIdForTable, cachedWriteIdForTable)) { |
| writeIdCheckPassed = true; |
| } |
| } |
| |
| if (!writeIdCheckPassed) { |
| LOG.debug("Cached query no longer valid due to table {}", tableUsed.getFullyQualifiedName()); |
| // We can invalidate the entry now, but calling removeEntry() requires a write lock |
| // and we may already have read lock taken now. Add to entriesToRemove to delete later. |
| entriesToRemove.add(entry); |
| entry.invalidate(); |
| return false; |
| } |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| public void removeEntry(CacheEntry entry) { |
| entry.invalidate(); |
| rwLock.writeLock().lock(); |
| try { |
| removeFromLookup(entry); |
| lru.remove(entry); |
| // Should the cache size be updated here, or after the result data has actually been deleted? |
| cacheSize -= entry.size; |
| } finally { |
| rwLock.writeLock().unlock(); |
| } |
| } |
| |
| private void removeFromLookup(CacheEntry entry) { |
| String queryString = entry.getQueryText(); |
| if (!removeFromEntryMap(queryMap, queryString, entry)) { |
| LOG.warn("Attempted to remove entry but it was not in the cache: {}", entry); |
| } |
| |
| // Remove this entry from the table usage mappings. |
| entry.getTableNames() |
| .forEach(tableName -> removeFromEntryMap(tableToEntryMap, tableName, entry)); |
| } |
| |
| private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException { |
| Path queryResultsPath = fetchWork.getTblDir(); |
| FileSystem resultsFs = queryResultsPath.getFileSystem(conf); |
| ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); |
| entry.size = cs.getLength(); |
| } |
| |
| /** |
| * Determines if the cache entry should be added to the results cache. |
| */ |
| private boolean shouldEntryBeAdded(CacheEntry entry, long size) { |
| // Assumes the cache lock has already been taken. |
| if (maxEntrySize >= 0 && size > maxEntrySize) { |
| LOG.debug("Cache entry size {} larger than max entry size ({})", size, maxEntrySize); |
| incrementMetric(MetricsConstant.QC_REJECTED_TOO_LARGE); |
| return false; |
| } |
| |
| if (!clearSpaceForCacheEntry(entry, size)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) { |
| if (maxCacheSize >= 0) { |
| return (cacheSize + size) <= maxCacheSize; |
| } |
| // Negative max cache size means unbounded. |
| return true; |
| } |
| |
| private CacheEntry findEntryToRemove() { |
| // Entries should be in LRU order in the keyset iterator. |
| Set<CacheEntry> entries = lru.keySet(); |
| synchronized (lru) { |
| for (CacheEntry removalCandidate : entries) { |
| if (removalCandidate.getStatus() != CacheEntryStatus.VALID) { |
| continue; |
| } |
| return removalCandidate; |
| } |
| } |
| return null; |
| } |
| |
| private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) { |
| if (hasSpaceForCacheEntry(entry, size)) { |
| return true; |
| } |
| |
| LOG.info("Clearing space for cache entry for query: [{}] with size {}", |
| entry.getQueryText(), size); |
| |
| CacheEntry removalCandidate; |
| while ((removalCandidate = findEntryToRemove()) != null) { |
| LOG.info("Removing entry: {}", removalCandidate); |
| removeEntry(removalCandidate); |
| // TODO: Should we wait for the entry to actually be deleted from HDFS? Would have to |
| // poll the reader count, waiting for it to reach 0, at which point cleanup should occur. |
| if (hasSpaceForCacheEntry(entry, size)) { |
| return true; |
| } |
| } |
| |
| LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}", |
| entry.getQueryText(), size); |
| return false; |
| } |
| |
| private static void addToEntryMap(Map<String, Set<CacheEntry>> entryMap, |
| String key, CacheEntry entry) { |
| Set<CacheEntry> entriesForKey = entryMap.get(key); |
| if (entriesForKey == null) { |
| entriesForKey = new HashSet<CacheEntry>(); |
| entryMap.put(key, entriesForKey); |
| } |
| entriesForKey.add(entry); |
| } |
| |
| private static boolean removeFromEntryMap(Map<String, Set<CacheEntry>> entryMap, |
| String key, CacheEntry entry) { |
| Set<CacheEntry> entries = entryMap.get(key); |
| if (entries == null) { |
| return false; |
| } |
| boolean deleted = entries.remove(entry); |
| if (!deleted) { |
| return false; |
| } |
| if (entries.isEmpty()) { |
| entryMap.remove(key); |
| } |
| return true; |
| } |
| |
| @VisibleForTesting |
| public static void cleanupInstance() { |
| // This should only ever be called in testing scenarios. |
| // There should not be any other users of the cache or its entries or this may mess up cleanup. |
| if (inited.get()) { |
| if (instance.invalidationPollFuture != null) { |
| instance.invalidationPollFuture.cancel(true); |
| instance.invalidationPollFuture = null; |
| } |
| instance.clear(); |
| instance = null; |
| inited.set(false); |
| } |
| } |
| |
| private static ScheduledExecutorService invalidationExecutor = null; |
| private static ExecutorService deletionExecutor = null; |
| |
| static { |
| ThreadFactory threadFactory = |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryResultsCache %d").build(); |
| invalidationExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); |
| deletionExecutor = Executors.newSingleThreadExecutor(threadFactory); |
| } |
| |
| private void scheduleEntryInvalidation(final CacheEntry entry) { |
| if (maxEntryLifetime >= 0) { |
| // Schedule task to invalidate cache entry and remove from lookup. |
| ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() { |
| @Override |
| public void run() { |
| removeEntry(entry); |
| } |
| }, maxEntryLifetime, TimeUnit.MILLISECONDS); |
| entry.invalidationFuture = future; |
| } |
| } |
| |
| private static void cleanupEntry(final CacheEntry entry) { |
| Preconditions.checkState(entry.getStatus() == CacheEntryStatus.INVALID); |
| final HiveConf conf = getInstance().conf; |
| |
| if (entry.cachedResultsPath != null && |
| !getInstance().zeroRowsPath.equals(entry.cachedResultsPath)) { |
| deletionExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| Path path = entry.cachedResultsPath; |
| LOG.info("Cache directory cleanup: deleting {}", path); |
| try { |
| FileSystem fs = entry.cachedResultsPath.getFileSystem(getInstance().conf); |
| fs.delete(entry.cachedResultsPath, true); |
| } catch (Exception err) { |
| LOG.error("Error while trying to delete " + path, err); |
| } |
| } |
| }); |
| } |
| } |
| |
| public static void incrementMetric(String name, long count) { |
| Metrics metrics = MetricsFactory.getInstance(); |
| if (metrics != null) { |
| metrics.incrementCounter(name, count); |
| } |
| } |
| |
| public static void decrementMetric(String name, long count) { |
| Metrics metrics = MetricsFactory.getInstance(); |
| if (metrics != null) { |
| metrics.decrementCounter(name, count); |
| } |
| } |
| |
| public static void incrementMetric(String name) { |
| incrementMetric(name, 1); |
| } |
| |
| public static void decrementMetric(String name) { |
| decrementMetric(name, 1); |
| } |
| |
| private static void registerMetrics(Metrics metrics, final QueryResultsCache cache) { |
| MetricsVariable<Long> maxCacheSize = new MetricsVariable<Long>() { |
| @Override |
| public Long getValue() { |
| return cache.maxCacheSize; |
| } |
| }; |
| |
| MetricsVariable<Long> curCacheSize = new MetricsVariable<Long>() { |
| @Override |
| public Long getValue() { |
| return cache.cacheSize; |
| } |
| }; |
| |
| metrics.addGauge(MetricsConstant.QC_MAX_SIZE, maxCacheSize); |
| metrics.addGauge(MetricsConstant.QC_CURRENT_SIZE, curCacheSize); |
| } |
| |
| // EventConsumer to invalidate cache entries based on metastore notification events (alter table, add partition, etc). |
| public static class InvalidationEventConsumer implements EventConsumer { |
| Configuration conf; |
| |
| @Override |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| @Override |
| public void accept(NotificationEvent event) { |
| String dbName; |
| String tableName; |
| |
| switch (event.getEventType()) { |
| case MessageBuilder.ADD_PARTITION_EVENT: |
| case MessageBuilder.ALTER_PARTITION_EVENT: |
| case MessageBuilder.ALTER_PARTITIONS_EVENT: |
| case MessageBuilder.DROP_PARTITION_EVENT: |
| case MessageBuilder.ALTER_TABLE_EVENT: |
| case MessageBuilder.DROP_TABLE_EVENT: |
| case MessageBuilder.INSERT_EVENT: |
| dbName = event.getDbName(); |
| tableName = event.getTableName(); |
| break; |
| default: |
| return; |
| } |
| |
| if (dbName == null || tableName == null) { |
| LOG.info("Possibly malformed notification event, missing db or table name: {}", event); |
| return; |
| } |
| |
| LOG.debug("Handling event {} on table {}.{}", event.getEventType(), dbName, tableName); |
| |
| QueryResultsCache cache = QueryResultsCache.getInstance(); |
| if (cache != null) { |
| long eventTime = event.getEventTime() * 1000L; |
| cache.notifyTableChanged(dbName, tableName, eventTime); |
| } else { |
| LOG.debug("Cache not instantiated, skipping event on {}.{}", dbName, tableName); |
| } |
| } |
| } |
| } |