| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <memory> |
| #include <mutex> |
| |
| #include "common/status.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "util/cache/cache.h" |
| #include "util/metrics.h" |
| #include "util/thread-pool.h" |
| |
| namespace impala { |
| |
| class HistogramMetric; |
| class TupleReader; |
| |
| // Declaration of the debug tuple cache bad postfix constant. |
| extern const char* DEBUG_TUPLE_CACHE_BAD_POSTFIX; |
| |
| enum class TupleCacheState; |
| |
| /// The TupleCacheMgr maintains per-daemon settings and metadata for the tuple cache. |
| /// This it used by the various TupleCacheNodes from queries to lookup the cache |
| /// entries or write cache entries. The TupleCacheMgr maintains the capacity constraint |
| /// by evicting entries as needed. Unlike the data cache, the tuple cache maintains |
| /// individual entries in separate files and the files have the cache key incorporated |
| /// into the file name. |
| /// |
| /// There are a couple unique features for the TupleCacheMgr that makes it distinct from |
| /// other caches: |
| /// 1. It inserts an entry into the cache immediately, even before it knows the total |
| /// size of the entry. It updates the size of the cache entry later when the entry |
| /// is completed. This allows the cache to avoid multiple writers trying to create the |
| /// same entry. |
| /// 2. When a cache entry is aborted due to its size, the cache keeps a tombstone record |
| /// to prevent future writers from trying to write that entry. This reduces the |
| /// overhead of the tuple cache by avoiding writing entries that won't be useful |
| /// to the cache. |
| /// For more information about the exact state transitions, see the diagram of the states |
| /// in tuple-cache-mgr.cc. |
| class TupleCacheMgr : public Cache::EvictionCallback { |
| public: |
| TupleCacheMgr(MetricGroup* metrics); |
| ~TupleCacheMgr() = default; |
| |
| // Initialize the TupleCacheMgr. Must be called before any of the other APIs. |
| // The process_bytes_limit is used to scale a percentage value for the outstanding |
| // writes limit. If it is set to 0, a percentage value is not allowed. |
| Status Init(int64_t process_bytes_limit = 0) WARN_UNUSED_RESULT; |
| |
| /// Enum for metric type. |
| enum class MetricType { |
| HIT, |
| MISS, |
| SKIPPED, |
| }; |
| |
| struct DebugDumpCacheMetaData { |
| /// The fragment id of the tuple cache for debug purpose. |
| std::string fragment_id; |
| }; |
| |
| struct Handle; |
| class HandleDeleter { |
| public: |
| void operator()(Handle*) const; |
| }; |
| |
| // UniqueHandle -- a wrapper around opaque Handle structure to facilitate deletion. |
| typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle; |
| |
| // Following methods need to be thread-safe. They primarily rely on Cache locking. |
| |
| // UniqueHandle lifecycle: |
| // 1. Lookup(acquire_write=true). Abandon caching if null. |
| // 2a. If IsAvailableForRead, start reading. END |
| // 2b. Else if IsAvailableForWrite, start writing. |
| // 3. If successful, CompleteWrite. Else AbortWrite. If repeating is also expected to |
| // result in failure, set tombstone. END |
| |
| /// Get a handle for the key. If acquire_write and no entry exists, takes a creation |
| /// lock and creates a new entry. Null if cache is unavailable. |
| UniqueHandle Lookup(const Slice& key, bool acquire_write = false); |
| |
| /// Check if entry is complete and ready to read. |
| bool IsAvailableForRead(UniqueHandle&) const; |
| |
| /// Check if entry is ready to write. Always false if acquire_write=false. |
| bool IsAvailableForWrite(UniqueHandle&) const; |
| |
| // Register results are complete and available. |
| void CompleteWrite(UniqueHandle handle, size_t size); |
| |
| // Abort writing. If tombstone is true, IsAvailableForWrite will be false for future |
| // queries. |
| void AbortWrite(UniqueHandle handle, bool tombstone); |
| |
| // Request an increase to the outstanding write size. This should be called before |
| // writing more data to a tuple cache file. If the new_size exceeds the maximum size |
| // for a cache entry, this returns TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED. This returns |
| // TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED if it hits the outstanding writes limit. |
| Status RequestWriteSize(UniqueHandle* handle, size_t new_size); |
| |
| /// Get path to read/write. |
| const char* GetPath(UniqueHandle&) const; |
| |
| /// Max size for a single cache entry. |
| int MaxSize() const { return cache_->MaxCharge(); } |
| |
| /// For metrics increment. |
| void IncrementMetric(MetricType type) { |
| switch (type) { |
| case MetricType::HIT: |
| tuple_cache_hits_->Increment(1); |
| break; |
| case MetricType::MISS: |
| tuple_cache_misses_->Increment(1); |
| break; |
| case MetricType::SKIPPED: |
| tuple_cache_skipped_->Increment(1); |
| break; |
| } |
| } |
| |
| /// Callback invoked when evicting an entry from the cache. 'key' is the cache key |
| /// of the entry being evicted and 'value' contains the cache entry which is the |
| /// meta-data of where the cached data is stored. |
| virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override; |
| |
| /// Returns the full file path for debug dumping of the tuple cache. |
| /// If sub_dir_full_path is not nullptr, returns the full path of the subdirectory of |
| /// the debug tuple cache. |
| string GetDebugDumpPath(const string& sub_dir, const string& file_name, |
| string* sub_dir_full_path = nullptr) const; |
| |
| /// Return whether debug dumping is enabled. |
| bool DebugDumpEnabled() const { return cache_debug_dump_dir_ != ""; } |
| |
| /// Store, retrieve or remove the metadata of the stored tuple cache for correctness |
| /// verification. |
| void StoreMetadataForTupleCache(const string& cache_key, const string& fragment_id); |
| string GetFragmentIdForTupleCache(const string& cache_key); |
| void RemoveMetadataForTupleCache(const string& cache_key); |
| |
| /// Create the subdirectory for debug dumping of the tuple cache if needed. |
| Status CreateDebugDumpSubdir(const string& sub_dir); |
| |
| private: |
| // Disallow copy and assign |
| TupleCacheMgr(const TupleCacheMgr&) = delete; |
| TupleCacheMgr& operator=(const TupleCacheMgr&) = delete; |
| |
| friend class TupleCacheMgrTest; |
| FRIEND_TEST(TupleCacheMgrTest, TestRequestWriteSize); |
| FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteLimit); |
| FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteLimitConcurrent); |
| FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteChunkSize); |
| FRIEND_TEST(TupleCacheMgrTest, TestDroppedSyncs); |
| |
| // Constructor for tests |
| enum DebugPos { |
| FAIL_ALLOCATE = 1 << 0, |
| FAIL_INSERT = 1 << 1, |
| NO_FILES = 1 << 2, |
| }; |
| TupleCacheMgr(string cache_config, string eviction_policy_str, MetricGroup* metrics, |
| uint8_t debug_pos, uint32_t sync_pool_size, uint32_t sync_pool_queue_depth, |
| string outstanding_write_limit_str, uint32_t outstanding_write_chunk_bytes); |
| |
| // Delete any existing files in the cache directory to start fresh |
| Status DeleteExistingFiles() const; |
| |
| // Sync file for cache key to disk |
| void SyncFileToDisk(const std::string& cache_key); |
| |
| // Get the current state for a cache handle |
| TupleCacheState GetState(Handle* handle) const; |
| |
| // Update a handle's state to newState, verifying that it matches the requredState. |
| // If the update fails, return false. |
| bool UpdateState(Handle* handle, TupleCacheState requiredState, |
| TupleCacheState newState); |
| |
| // Get the current charge for this handle. |
| size_t GetCharge(Handle* handle) const; |
| |
| // Update the current charge for this handle and adjust the outstanding writes |
| // counter accordingly. |
| void UpdateWriteSize(Handle* handle, size_t charge); |
| |
| const std::string cache_config_; |
| const std::string eviction_policy_str_; |
| const std::string outstanding_write_limit_str_; |
| |
| std::string cache_dir_; |
| std::string cache_debug_dump_dir_; |
| bool enabled_ = false; |
| uint8_t debug_pos_; |
| uint32_t sync_pool_size_; |
| uint32_t sync_pool_queue_depth_; |
| uint32_t outstanding_write_chunk_bytes_; |
| int64_t outstanding_write_limit_ = 0; |
| |
| /// Metrics for the tuple cache in the daemon level. |
| IntCounter* tuple_cache_hits_; |
| IntCounter* tuple_cache_misses_; |
| IntCounter* tuple_cache_skipped_; |
| IntCounter* tuple_cache_halted_; |
| IntCounter* tuple_cache_backpressure_halted_; |
| IntCounter* tuple_cache_entries_evicted_; |
| IntCounter* tuple_cache_failed_sync_; |
| IntCounter* tuple_cache_dropped_sync_; |
| IntGauge* tuple_cache_entries_in_use_; |
| IntGauge* tuple_cache_entries_in_use_bytes_; |
| IntGauge* tuple_cache_tombstones_in_use_; |
| IntGauge* tuple_cache_outstanding_writes_bytes_; |
| |
| /// Statistics for the tuple cache sizes allocated. |
| HistogramMetric* tuple_cache_entry_size_stats_; |
| |
| /// The instance of the cache. |
| mutable std::mutex creation_lock_; |
| std::unique_ptr<Cache> cache_; |
| |
| /// Used by CreateDebugDumpSubdir() to ensure no two threads are creating the same |
| /// sub directory. |
| std::mutex debug_dump_subdir_lock_; |
| /// Protects the debug_dump_caches_metadata_. |
| std::mutex debug_dump_lock_; |
| /// An in-memory presentation for metadata of tuple caches for debug verification. |
| /// The key is the key of the tuple cache. |
| std::unordered_map<std::string, DebugDumpCacheMetaData> debug_dump_caches_metadata_; |
| |
| /// Thread pool for syncing files to disk |
| std::unique_ptr<ThreadPool<std::string>> sync_thread_pool_; |
| }; |
| |
| } |