| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <mutex> |
| #include <string> |
| #include <unistd.h> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <gtest/gtest_prod.h> |
| |
| #include "common/status.h" |
| #include "util/spinlock.h" |
| #include "util/thread-pool.h" |
| #include "kudu/util/cache.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/slice.h" |
| |
| namespace kudu { |
| class Cache; |
| } // kudu |
| |
| /// This class is an implementation of an IO data cache which is backed by local storage. |
| /// It implicitly relies on the OS page cache management to shuffle data between memory |
| /// and the storage device. This is useful for caching data read from remote filesystems |
| /// (e.g. remote HDFS data node, S3, ABFS, ADLS). |
| /// |
| /// A data cache is divided into one or more partitions based on the configuration |
| /// string which specifies a list of directories and their corresponding storage quotas. |
| /// |
| /// Each partition has a meta-data cache which tracks the mappings of cache keys to |
| /// the locations of the cached data. A cache key is a tuple of (file's name, file's |
| /// modification time, file offset) and a cache entry is a tuple of (backing file, |
| /// offset in the backing file, length of the cached data, optional checksum). The |
| /// file's modification time is used for distinguishing between different versions of |
| /// a file with a given name. Each partition stores its set of cached data in backing |
| /// files created on local storage. When inserting new data into the cache, the data is |
| /// appended to the current backing file in use. The storage consumption of each cache |
| /// entry counts towards the quota of that partition. When a partition reaches its |
| /// capacity, the least recently used data in that partition is evicted. Evicted data is |
| /// removed from the underlying storage by punching holes in the backing file it's stored |
| /// in. As a backing file reaches a certain size (e.g. 4TB), new data will stop being |
| /// appended to it and a new file will be created instead. Note that due to hole punching, |
| /// the backing file is actually sparse. For instance, a backing file may look like the |
| /// following after some insertion and eviction. All the holes in file consume no storage |
| /// space at all. |
| /// |
| /// 0 1GB |
| /// +----------+----------+----------+-----------------+---------+---------+-------+ |
| /// | | | | | | | | |
| /// | Data | Hole | Data | Hole | Data | Hole | Data | |
| /// | | | | | | | | |
| /// +----------+----------+----------+-----------------+---------+---------+-------+ |
| /// ^ |
| /// insertion offset |
| /// |
| /// Optionally, checksumming can be enabled to verify read from the cache is consistent |
| /// with what was inserted and to verify that multiple attempted insertions with the same |
| /// cache key have the same cache content. |
| /// |
| /// Note that the cache currently doesn't support sub-ranges lookup and or handle |
| /// overlapping ranges. In other words, if the cache has an entry for a file at range |
| /// [0,4095], a look up for range [4000,4095] will result in a miss even though it's a |
| /// sub-range of [0,4095]. Also inserting the range [4000,4095] will not consolidate |
| /// with any overlapping ranges. In other words, inserting entries for ranges [0,4095] |
| /// and [4000,4095] will result in caching the data for range [4000,4095] twice. This |
| /// hasn't been a major concern in practice when testing with TPC-DS + parquet but this |
| /// requires more investigation for other file formats and workloads. |
| /// |
| /// To probe for cached data in the cache, the interface Lookup() is used; To insert |
| /// data into the cache, the interface Store() is used. Write to the backing file and |
| /// eviction from it happen synchronously. Currently, Store() is limited to the |
| /// concurrency of one thread per partition to prevent slowing down the caller in case |
| /// the cache is thrashing and it becomes IO bound. The write concurrency can be tuned |
| /// via the knob --data_cache_write_concurrency. Also, Store() has a minimum granularity |
| /// of 4KB so any data inserted will be rounded up to the nearest multiple of 4KB. |
| /// |
| /// The number of backing files in all partitions is bound by |
| /// --data_cache_max_opened_files. Once the number of files exceeds that set limit, files |
| /// are closed and deleted asynchronously by thread in 'file_deleter_pool_'. Stale cache |
| /// entries which reference deleted files are erased lazily upon the next access or |
| /// indirectly via eviction. |
| /// |
| /// Future work: |
| /// - investigate the overlapping ranges support |
| /// - be more selective on what to cache |
| /// - asynchronous eviction |
| /// - better data placement: put on hot data on faster media and lukewarm data in not |
| /// so fast storage media |
| /// - evaluate the option of exposing the cache via mmap() and pinning similar to HDFS |
| /// caching. This has the advantage of not needing to copy out the data but pinning |
| /// may complicate the code. |
| /// |
| |
| namespace impala { |
| namespace io { |
| |
| class DataCache { |
| public: |
| |
| /// 'config' is the configuration string of the form <dir1>,...,<dirN>:<quota> |
| /// in which <dir1>, <dirN> are part of a list of directories for storing cached data |
| /// and each directory corresponds to a cache partition. <quota> is the storage quota |
| /// for each directory. Impala daemons running on the same host will not share any |
| /// caching directories. |
| explicit DataCache(const std::string config) : config_(std::move(config)) { } |
| |
| ~DataCache() { ReleaseResources(); } |
| |
| /// Parses the configuration string, initializes all partitions in the cache by |
| /// checking for storage space available and creates a backing file for caching. |
| /// Return error if any of the partitions failed to be initialized. |
| Status Init(); |
| |
| /// Releases any resources (e.g. backing files) consumed by all partitions. |
| void ReleaseResources(); |
| |
| /// Looks up a cached entry and copies any cached content from the cache into 'buffer'. |
| /// (filename, mtime, offset) forms a cache key. Please note that sub-range lookup is |
| /// currently not supported. See header comments for details. |
| /// |
| /// 'filename' : name of the requested file |
| /// 'mtime' : the modification time of the requested file |
| /// 'offset' : starting offset of the requested region in the file |
| /// 'bytes_to_read' : number of bytes to be read from the cache |
| /// 'buffer' : output buffer to be written into on cache hit |
| /// |
| /// Returns the number of bytes read from the cache on cache hit; Returns 0 otherwise. |
| /// |
| int64_t Lookup(const std::string& filename, int64_t mtime, int64_t offset, |
| int64_t bytes_to_read, uint8_t* buffer); |
| |
| /// Inserts a new cache entry by copying the content in 'buffer' into the cache. |
| /// (filename, mtime, offset) together forms a cache key. Insertion involves writing |
| /// to the backing file and potentially evicting entries synchronously so callers |
| /// may want to avoid holding locks while calling this function. |
| /// |
| /// 'filename' : name of the file being inserted |
| /// 'mtime' : the modification time of the file being inserted. |
| /// 'offset' : the starting offset of the region in the file being inserted |
| /// 'buffer' : buffer holding the data to be inserted |
| /// 'buffer_len' : size of 'buffer' |
| /// |
| /// The cache key is hashed and the resulting hash determines the partition to use. |
| /// |
| /// Please note that 'buffer_len' is rounded up to the nearest multiple of 4KB when |
| /// it's being written to the backing file. This ensures that every cache entry starts |
| /// at a 4KB offset in the backing file, making hole punching easier as the entire page |
| /// can be reclaimed. |
| /// |
| /// An entry may not be installed for various reasons: |
| /// - an entry with the given cache key already exists unless 'buffer_len' is larger |
| /// than the existing entry, in which case, the entry will be replaced with the |
| /// new data. |
| /// - a pending entry with the same key is already being installed. |
| /// - the maximum write concurrency (via --data_cache_write_concurrency) is reached. |
| /// - IO error when writing to the backing file. |
| /// |
| /// Returns true iff the entry is installed successfully. |
| /// |
| bool Store(const std::string& filename, int64_t mtime, int64_t offset, |
| const uint8_t* buffer, int64_t buffer_len); |
| |
| /// Utility function to verify that all partitions' consumption don't exceed their |
| /// quotas. Return error status if checking files' sizes failed or if the total space |
| /// consumed by a partition exceeded its capacity. Will close the backing files of |
| /// partitions before verifying their sizes. Used by test only. |
| Status CloseFilesAndVerifySizes(); |
| |
| private: |
| friend class DataCacheTest; |
| FRIEND_TEST(DataCacheTest, TestAccessTrace); |
| |
| class CacheFile; |
| struct CacheKey; |
| class CacheEntry; |
| |
| /// An implementation of a cache partition. Each partition maintains its own set of |
| /// cache keys in a LRU cache. |
| class Partition : public kudu::Cache::EvictionCallback { |
| public: |
| /// Creates a partition at the given directory 'path' with quota 'capacity' in bytes. |
| /// 'max_opened_files' is the maximum number of opened files allowed per partition. |
| Partition(const std::string& path, int64_t capacity, int max_opened_files); |
| |
| ~Partition(); |
| |
| /// Initializes the current partition: |
| /// - verifies if the specified directory is valid |
| /// - removes any stale backing file in this partition |
| /// - checks if there is enough storage space |
| /// - checks if the filesystem supports hole punching |
| /// - creates an empty backing file. |
| /// |
| /// Returns error if there is any of the above steps fails. Returns OK otherwise. |
| Status Init(); |
| |
| /// Close and delete all backing files created for this partition. Also releases |
| /// the memory held by the metadata cache. |
| void ReleaseResources(); |
| |
| /// Looks up in the meta-data cache with key 'cache_key'. If found, try copying |
| /// 'bytes_to_read' bytes from the backing file into 'buffer'. Returns number |
| /// of bytes read from the cache. Returns 0 if there is a cache miss. |
| int64_t Lookup(const CacheKey& cache_key, int64_t bytes_to_read, uint8_t* buffer); |
| |
| /// Inserts a entry with key 'cache_key' and data in 'buffer' into the cache. |
| /// 'buffer_len' is the length of buffer. 'start_reclaim' is set to true if |
| /// the number of backing files exceeds the per partition limit. Returns true if |
| /// the entry is inserted. Returns false otherwise. |
| bool Store(const CacheKey& cache_key, const uint8_t* buffer, int64_t buffer_len, |
| bool* start_reclaim); |
| |
| /// Callback invoked when evicting an entry from the cache. 'key' is the cache key |
| /// of the entry being evicted and 'value' contains the cache entry which is the |
| /// meta-data of where the cached data is stored. |
| virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override; |
| |
| /// Utility function to verify that the backing files don't exceed the capacity |
| /// of the partition. Used by test only. |
| /// |
| /// Return error status if: |
| /// - getting files' sizes failed |
| /// - total space consumed exceeded the partition's capacity |
| /// - file's size exceeded the specified limit in --data_cache_file_max_size |
| /// |
| /// Will close the backing files of partitions before verifying their sizes. |
| /// |
| /// Returns OK otherwise. |
| Status CloseFilesAndVerifySizes(); |
| |
| /// Deletes old backing files until number of backing files is no larger than |
| /// --data_cache_max_opened_files. |
| void DeleteOldFiles(); |
| |
| private: |
| friend class DataCacheTest; |
| FRIEND_TEST(DataCacheTest, TestAccessTrace); |
| class Tracer; |
| |
| /// The directory path which this partition stores cached data in. |
| const std::string path_; |
| |
| /// The capacity in bytes of this partition. |
| const int64_t capacity_; |
| |
| /// Maximum number of opened files allowed in a partition. |
| const int max_opened_files_; |
| |
| /// True if this partition has been closed. Expected to be set after all IO |
| /// threads have been joined. |
| bool closed_ = false; |
| |
| /// The prefix of the names of the cache backing files. |
| static const char* CACHE_FILE_PREFIX; |
| |
| /// The file name used for the access trace. |
| static const char* TRACE_FILE_NAME; |
| |
| /// Protects the following fields. |
| SpinLock lock_; |
| |
| /// Index into 'cache_files_' of the oldest opened file. |
| int oldest_opened_file_ = -1; |
| |
| /// The set of backing files used by this partition. By default, cache_files_.back() |
| /// is the latest backing file to which new data is appended. Must be accessed with |
| /// 'lock_' held. |
| std::vector<std::unique_ptr<CacheFile>> cache_files_; |
| |
| /// This set tracks cache keys of entries in progress of being inserted into the |
| /// cache. As we don't hold locks while writing to the backing file, this set is |
| /// used to prevent multiple insertion into the cache with the same cache key. |
| /// The insertion path will check against this set and if the entry doesn't already |
| /// exist, it will insert one into this set. Upon completion of cache insertion, |
| /// the entry will be removed from this set. Must be accessed with 'lock_' held. |
| std::unordered_set<std::string> pending_insert_set_; |
| |
| /// The LRU cache for tracking the cache key to cache entries mappings. |
| /// |
| /// A cache key is created by calling the constructor of CacheKey, which is a tuple |
| /// of (fname, mtime, offset). |
| /// |
| /// A cache entry has type CacheEntry and it contains the metadata of the cached |
| /// content. Please see comments at CachedEntry for details. |
| std::unique_ptr<kudu::Cache> meta_cache_; |
| |
| std::unique_ptr<Tracer> tracer_; |
| |
| /// Utility function for creating a new backing file in 'path_'. The cache |
| /// partition's lock needs to be held when calling this function. Returns |
| /// error on failure. |
| Status CreateCacheFile(); |
| |
| /// Utility function to delete cache files left over from previous runs of Impala. |
| /// Returns error on failure. |
| Status DeleteExistingFiles() const; |
| |
| /// Utility function for computing the checksum of 'buffer' with length 'buffer_len'. |
| static uint64_t Checksum(const uint8_t* buffer, int64_t buffer_len); |
| |
| /// Helper function which handles the case in which the key to be inserted already |
| /// exists in the cache. With checksumming enabled, it also verifies that the content |
| /// in 'buffer' matches the expected checksum in the cache's metadata. Please note |
| /// that an existing entry may be overwritten if 'buffer_len' is larger than the |
| /// length of the existing entry. 'handle' is the handle into the metadata cache. |
| /// Needed for referencing the cache entry. |
| /// |
| /// Returns true iff the existing entry already covers the range of 'buffer' so no |
| /// work needs to be done. Returns false otherwise. In which case, the existing entry |
| /// will be overwritten. |
| bool HandleExistingEntry(const kudu::Slice& key, kudu::Cache::Handle* handle, |
| const uint8_t* buffer, int64_t buffer_len); |
| |
| /// Helper function to insert a new entry with key 'key' into the LRU cache. |
| /// The content in 'buffer' of length 'buffer_len' in bytes will be written to |
| /// the backing file 'cache_file' at offset 'insertion_offset'. |
| /// |
| /// Returns true iff the insertion into the cache and the write to the backing file |
| /// succeeded. Returns false otherwise. |
| bool InsertIntoCache(const kudu::Slice& key, CacheFile* cache_file, |
| int64_t insertion_offset, const uint8_t* buffer, int64_t buffer_len); |
| |
| /// Utility function for verifying that the checksum of 'buffer' with length |
| /// 'buffer_len' matches the checksum recorded in the meta-data 'entry->checksum'. |
| /// |
| /// 'ops_name" is the name of the operation which triggers the checksum verification. |
| /// Currently, it's either "read" or "write" but future changes may add more names. |
| /// |
| /// Returns false if the checksum of 'buffer' doesn't match 'entry->checksum'. |
| static bool VerifyChecksum(const std::string& ops_name, const CacheEntry& entry, |
| const uint8_t* buffer, int64_t buffer_len); |
| }; |
| |
| /// The configuration string for the data cache. |
| const std::string config_; |
| |
| /// The set of all cache partitions. |
| std::vector<std::unique_ptr<Partition>> partitions_; |
| |
| /// Thread pool for deleting old files from partitions to keep the number of opened |
| /// files within --date_cache_max_opened_files. This allows deletion requests |
| /// to be queued for deferred processing. There is only one thread in this pool. |
| std::unique_ptr<ThreadPool<int>> file_deleter_pool_; |
| |
| /// Thread function called by threads in 'file_deleter_pool_' for deleting old files |
| /// in partitions_[partition_idx]. |
| void DeleteOldFiles(uint32_t thread_id, int partition_idx); |
| |
| }; |
| |
| } // namespace io |
| } // namespace impala |
| |