blob: 96add9f2a1c99e58131c0cecacf9be05831c8b96 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <array>
#include <list>
#include <map>
#include <memory>
#include <boost/thread/mutex.hpp>
#include "common/hdfs.h"
#include "util/aligned-new.h"
#include "util/impalad-metrics.h"
#include "util/spinlock.h"
#include "util/thread.h"
namespace impala {
/// This class is a small wrapper around the hdfsFile handle and the file system
/// instance which is needed to close the file handle. The handle incorporates
/// the last modified time of the file when it was opened. This is used to distinguish
/// between file handles for files that can be updated or overwritten.
class HdfsFileHandle {
/// Constructor will open the file
HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
/// Destructor will close the file handle
hdfsFile file() const { return hdfs_file_; }
int64_t mtime() const { return mtime_; }
bool ok() const { return hdfs_file_ != nullptr; }
hdfsFS fs_;
hdfsFile hdfs_file_;
int64_t mtime_;
/// The FileHandleCache is a data structure that owns HdfsFileHandles to share between
/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS partitions.
/// Each partition operates independently with its own locks, reducing contention
/// between concurrent threads. The `capacity` is split between the partitions and is
/// enforced independently.
/// Threads check out a file handle for exclusive access and return it when finished.
/// If the file handle is not already present in the cache or all file handles for this
/// file are checked out, the file handle is constructed and added to the cache.
/// The cache can contain multiple file handles for the same file. If a file handle
/// is checked out, it cannot be evicted from the cache. In this case, a cache can
/// exceed the specified capacity.
/// The file handle cache is currently not suitable for remote files that maintain a
/// connection as part of the handle. Most remote systems have a limit on the number
/// of concurrent connections, and file handles in the cache would be counted towards
/// that limit.
/// If there is a file handle in the cache and the underlying file is deleted,
/// the file handle might keep the file from being deleted at the OS level. This can
/// take up disk space and impact correctness. To avoid this, the cache will evict any
/// file handle that has been unused for longer than threshold specified by
/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0.
/// TODO: The cache should also evict file handles more aggressively if the file handle's
/// mtime is older than the file's current mtime.
template <size_t NUM_PARTITIONS>
class FileHandleCache {
/// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
/// partitions. If the capacity does not split evenly, then the capacity is rounded
/// up. The cache will age out any file handle that is unused for
/// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set to zero.
FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
/// Destructor is only called for backend tests
/// Starts up a thread that monitors the age of file handles and evicts any that
/// exceed the limit.
void Init();
/// Get a file handle from the cache for the specified filename (fname) and
/// last modification time (mtime). This will hash the filename to determine
/// which partition to use for this file handle.
/// If 'require_new_handle' is false and the partition contains an available handle,
/// the handle is returned and cache_hit is set to true. Otherwise, the partition will
/// try to construct a file handle and add it to the partition. On success, the new
/// file handle will be returned with cache_hit set to false. On failure, nullptr will
/// be returned. In either case, the partition may evict a file handle to make room
/// for the new file handle.
/// This obtains exclusive control over the returned file handle. It must be paired
/// with a call to ReleaseFileHandle to release exclusive control.
HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
bool require_new_handle, bool* cache_hit);
/// Release the exclusive hold on the specified file handle (which was obtained
/// by calling GetFileHandle). The cache may evict a file handle if the cache is
/// above capacity. If 'destroy_handle' is true, immediately remove this handle
/// from the cache.
void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool destroy_handle);
struct FileHandleEntry;
typedef std::multimap<std::string, FileHandleEntry> MapType;
struct LruListEntry {
LruListEntry(typename MapType::iterator map_entry_in);
typename MapType::iterator map_entry;
uint64_t timestamp_seconds;
typedef std::list<LruListEntry> LruListType;
struct FileHandleEntry {
FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
: fh(fh_in), lru_entry(lru_list.end()) {}
std::unique_ptr<HdfsFileHandle> fh;
/// in_use is true for a file handle checked out via GetFileHandle() that has not
/// been returned via ReleaseFileHandle().
bool in_use = false;
/// Iterator to this element's location in the LRU list. This only points to a
/// valid location when in_use is true. For error-checking, this is set to
/// lru_list.end() when in_use is false.
typename LruListType::iterator lru_entry;
/// Each partition operates independently, and thus has its own cache, LRU list,
/// and corresponding lock. To avoid contention on the lock_ due to false sharing
/// the partitions are aligned to cache line boundaries.
struct FileHandleCachePartition : public CacheLineAligned {
/// Protects access to cache and lru_list.
SpinLock lock;
/// Multimap from the file name to the file handles for that file. The cache
/// can contain multiple file handles for the same file and some may have
/// different mtimes if the file is being modified. All file handles are always
/// owned by the cache.
MapType cache;
/// The LRU list only contains file handles that are not in use.
LruListType lru_list;
/// Maximum number of file handles in cache without evicting unused file handles.
/// It is not a strict limit, and can be exceeded if all file handles are in use.
size_t capacity;
/// Current number of file handles in the cache
size_t size;
/// Periodic check to evict unused file handles. Only executed by eviction_thread_.
void EvictHandlesLoop();
static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
/// If the partition is above its capacity, evict the oldest unused file handles to
/// enforce the capacity.
void EvictHandles(FileHandleCachePartition& p);
std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
/// Maximum time before an unused file handle is aged out of the cache.
/// Aging out is disabled if this is set to 0.
uint64_t unused_handle_timeout_secs_;
/// Thread to check for unused file handles to evict. This thread will exit when
/// the shut_down_promise_ is set.
std::unique_ptr<Thread> eviction_thread_;
Promise<bool> shut_down_promise_;