blob: e4d01a275fa2d904ae1aa917b8bba267865572e4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/io/data-cache.h"
#include <boost/algorithm/string.hpp>
#include <errno.h>
#include <fcntl.h>
#include <mutex>
#include <string.h>
#include <unistd.h>
#include <sstream>
#include <glog/logging.h>
#include "exec/kudu-util.h"
#include "kudu/util/async_logger.h"
#include "kudu/util/cache.h"
#include "kudu/util/env.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/locks.h"
#include "kudu/util/path_util.h"
#include "gutil/hash/city.h"
#include "gutil/port.h"
#include "gutil/strings/escaping.h"
#include "gutil/strings/split.h"
#include "gutil/walltime.h"
#include "util/bit-util.h"
#include "util/error-util.h"
#include "util/filesystem-util.h"
#include "util/hash-util.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "util/parse-util.h"
#include "util/pretty-printer.h"
#include "util/scope-exit-trigger.h"
#include "util/uid-util.h"
#ifndef FALLOC_FL_PUNCH_HOLE
#include <linux/falloc.h>
#endif
#include "common/names.h"
using kudu::Env;
using kudu::faststring;
using kudu::JoinPathSegments;
using kudu::percpu_rwlock;
using kudu::RWFile;
using kudu::rw_spinlock;
using kudu::Slice;
using kudu::WritableFile;
using strings::SkipEmpty;
using strings::Split;
#ifdef NDEBUG
#define ENABLE_CHECKSUMMING (false)
#else
#define ENABLE_CHECKSUMMING (true)
#endif
DEFINE_int64(data_cache_file_max_size_bytes, 1L << 40 /* 1TB */,
"(Advanced) The maximum size which a cache file can grow to before data stops being "
"appended to it.");
DEFINE_int32(data_cache_max_opened_files, 1000,
"(Advanced) The maximum number of allowed opened files. This must be at least the "
"number of specified partitions.");
DEFINE_int32(data_cache_write_concurrency, 1,
"(Advanced) Number of concurrent threads allowed to insert into the cache per "
"partition.");
DEFINE_bool(data_cache_checksum, ENABLE_CHECKSUMMING,
"(Advanced) Enable checksumming for the cached buffer.");
DEFINE_bool(data_cache_enable_tracing, false,
"(Advanced) Collect a trace of all lookups in the data cache.");
DEFINE_bool(data_cache_anonymize_trace, false,
"(Advanced) Use hashes of filenames rather than file paths in the data "
"cache access trace.");
namespace impala {
namespace io {
static const int64_t PAGE_SIZE = 1L << 12;
const char* DataCache::Partition::CACHE_FILE_PREFIX = "impala-cache-file-";
const char* DataCache::Partition::TRACE_FILE_NAME = "impala-cache-trace.txt";
const int MAX_FILE_DELETER_QUEUE_SIZE = 500;
namespace {
class FileLogger;
// Simple implementation of a glog Logger that writes to a file, used for
// cache access tracing.
//
// This doesn't fully implement the Logger interface -- only the bare minimum
// to be usable with kudu::AsyncLogger.
class FileLogger : public google::base::Logger {
public:
explicit FileLogger(string path) : path_(std::move(path)) {}
virtual ~FileLogger() {
if (file_) Flush();
}
Status Open() {
KUDU_RETURN_IF_ERROR(Env::Default()->NewWritableFile({}, path_, &file_),
"Failed to create trace log file");
return Status::OK();
}
void Write(bool force_flush,
time_t timestamp,
const char* message,
int message_len) override {
buf_.append(message, message_len);
if (force_flush || buf_.size() > kBufSize) {
Flush();
}
}
// Flush any buffered messages.
// NOTE: declared 'final' to allow safe calls from the destructor.
void Flush() override final {
if (buf_.empty()) return;
KUDU_WARN_NOT_OK(file_->Append(buf_), "Could not append to trace log");
buf_.clear();
}
uint32 LogSize() override {
LOG(FATAL) << "Unimplemented";
return 0;
}
private:
const string path_;
string buf_;
unique_ptr<WritableFile> file_;
static constexpr int kBufSize = 64*1024;
};
} // anonymous namespace
class DataCache::Partition::Tracer {
public:
explicit Tracer(string path) : underlying_logger_(new FileLogger(std::move(path))) {}
~Tracer() {
if (logger_) logger_->Stop();
}
Status Start() {
RETURN_IF_ERROR(underlying_logger_->Open());
logger_.reset(new kudu::AsyncLogger(underlying_logger_.get(), 8 * 1024 * 1024));
logger_->Start();
return Status::OK();
}
enum CacheStatus {
HIT,
MISS,
STORE,
STORE_FAILED_BUSY
};
void Trace(CacheStatus status, const DataCache::CacheKey& key,
int64_t lookup_len, int64_t entry_len);
private:
// The underlying logger that we wrap with the AsyncLogger wrapper
// 'logger_'. NOTE: AsyncLogger consumes a raw pointer which must
// outlive the AsyncLogger instance, so it's important that these
// are declared in this order (logger_ must destruct before
// underlying_logger_).
unique_ptr<FileLogger> underlying_logger_;
// The async wrapper around underlying_logger_ (see above).
unique_ptr<kudu::AsyncLogger> logger_;
};
/// This class is an implementation of backing files in a cache partition.
///
/// A partition uses the interface Create() to create a backing file. A reader can read
/// from the backing file using the interface Read().
///
/// The backing file is append-only. To insert new data into the file, Allocate() is
/// called to reserve a contiguous area in the backing file. If the reservation succeeds,
/// the insertion offset is returned. Write() is called to add the data at the insertion
/// offset in the backing file. Allocations in the file may be evicted by punching hole
/// (via PunchHole()) in the backing file. The data in the hole area is reclaimed by the
/// underlying filesystem.
///
/// To avoid having too many backing files opened, old files are deleted to keep the
/// number of opened files within --data_cache_max_opened_files. Files are deleted
/// asynchronously by the file deleter thread pool. To synchronize between file deletion
/// and concurrent accesses of the file via Read()/Write()/PunchHole(), reader lock
/// is held in those functions. Before a file is deleted, Close() must be called by
/// the deleter thread, which holds the writer lock to block off all readers and sets
/// 'file_' to NULL. Read()/Write()/PunchHole() will check whether 'file_' is NULL.
/// If the file is already closed, the function will fail. On a failure of Read(), the
/// caller is expected to delete the stale cache entry. On a failure of Write(), the
/// caller is not expected to insert the cache entry. In other words, any stale cache
/// entry which references a deleted file will either be lazily erased on Read() or
/// evicted due to inactivity.
///
class DataCache::CacheFile {
public:
~CacheFile() {
// Close file if it's not closed already.
DeleteFile();
}
static Status Create(std::string path, std::unique_ptr<CacheFile>* cache_file_ptr) {
unique_ptr<CacheFile> cache_file(new CacheFile(path));
KUDU_RETURN_IF_ERROR(kudu::Env::Default()->NewRWFile(path, &cache_file->file_),
"Failed to create cache file");
*cache_file_ptr = std::move(cache_file);
return Status::OK();
}
// Close the underlying file so it cannot be read or written to anymore.
void Close() {
// Explicitly hold the lock in write mode to block all readers. This ensures that
// setting 'file_' to NULL and 'allow_append_' to false below is atomic.
std::unique_lock<percpu_rwlock> lock(lock_);
// If the file is already closed, nothing to do.
if (!file_) return;
kudu::Status status = file_->Close();
if (!status.ok()) {
LOG(WARNING) << Substitute("Failed to close cache file $0: $1", path_,
status.ToString());
}
file_.reset();
allow_append_ = false;
}
// Close the underlying file and delete it from the filesystem.
void DeleteFile() {
Close();
DCHECK(!file_);
kudu::Status status = kudu::Env::Default()->DeleteFile(path_);
if (!status.ok()) {
LOG(WARNING) << Substitute("Failed to unlink $0: $1", path_, status.ToString());
}
}
// Allocates a chunk of 'len' bytes in this file. The cache partition's lock
// 'partition_lock' must be held when calling this function. Returns the byte offset
// into the file for insertion. 'len' is expected to be multiples of PAGE_SIZE.
// Returns -1 if the file doesn't have enough space for insertion.
int64_t Allocate(int64_t len, const std::unique_lock<SpinLock>& partition_lock) {
DCHECK(partition_lock.owns_lock());
DCHECK_EQ(len % PAGE_SIZE, 0);
DCHECK_EQ(current_offset_ % PAGE_SIZE, 0);
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (!allow_append_ || (current_offset_ + len > FLAGS_data_cache_file_max_size_bytes &&
current_offset_ > 0)) {
allow_append_ = false;
return -1;
}
DCHECK(file_);
int64_t insertion_offset = current_offset_;
current_offset_ += len;
return insertion_offset;
}
// Reads from byte offset 'offset' for 'bytes_to_read' bytes into 'buffer'.
// Returns true iff read succeeded. Returns false on error or if the file
// is already closed.
bool Read(int64_t offset, uint8_t* buffer, int64_t bytes_to_read) {
DCHECK_EQ(offset % PAGE_SIZE, 0);
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return false;
DCHECK_LE(offset + bytes_to_read, current_offset_);
kudu::Status status = file_->Read(offset, Slice(buffer, bytes_to_read));
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << Substitute("Failed to read from $0 at offset $1 for $2 bytes: $3",
path_, offset, PrettyPrinter::PrintBytes(bytes_to_read), status.ToString());
return false;
}
return true;
}
// Writes 'buffer' of length 'buffer_len' into byte offset 'offset' in the file.
// Returns true iff write succeeded. Returns false on errors or if the file is
// already closed.
bool Write(int64_t offset, const uint8_t* buffer, int64_t buffer_len) {
DCHECK_EQ(offset % PAGE_SIZE, 0);
DCHECK_LE(offset, current_offset_);
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return false;
DCHECK_LE(offset + buffer_len, current_offset_);
kudu::Status status = file_->Write(offset, Slice(buffer, buffer_len));
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << Substitute("Failed to write to $0 at offset $1 for $2 bytes: $3",
path_, offset, PrettyPrinter::PrintBytes(buffer_len), status.ToString());
return false;
}
return true;
}
void PunchHole(int64_t offset, int64_t hole_size) {
DCHECK_EQ(offset % PAGE_SIZE, 0);
DCHECK_EQ(hole_size % PAGE_SIZE, 0);
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return;
DCHECK_LE(offset + hole_size, current_offset_);
kudu::Status status = file_->PunchHole(offset, hole_size);
if (UNLIKELY(!status.ok())) {
LOG(DFATAL) << Substitute("Failed to punch hole in $0 at offset $1 for $2 $3",
path_, offset, PrettyPrinter::PrintBytes(hole_size), status.ToString());
}
}
const string& path() const { return path_; }
private:
/// Full path of the backing file in the local storage.
const string path_;
/// The underlying backing file. NULL if the file has been closed.
unique_ptr<RWFile> file_;
/// True iff it's okay to append to this backing file.
bool allow_append_ = true;
/// The current offset in the file to append to on next insert.
int64_t current_offset_ = 0;
/// This is a reader-writer lock used for synchronization with the deleter thread.
/// It is taken in write mode in Close() and shared mode everywhere else. It's expected
/// that all places except for Close() check that 'file_' is not NULL with the lock held
/// in shared mode while Close() ensures that no thread is holding the lock in shared
/// mode so it's safe to close the file. The file can no longer be read, written or hole
/// punched after it has been closed. The only operation allowed is to deletion.
percpu_rwlock lock_;
/// C'tor of CacheFile to be called by Create() only.
explicit CacheFile(std::string path) : path_(move(path)) { }
DISALLOW_COPY_AND_ASSIGN(CacheFile);
};
/// An entry in the metadata cache in a partition.
/// Contains the whereabouts of the cached content.
class DataCache::CacheEntry {
public:
explicit CacheEntry(CacheFile* file, int64_t offset, int64_t len, uint64_t checksum)
: file_(file), offset_(offset), len_(len), checksum_(checksum) {
}
// Unpack a cache's entry represented by 'slice'. This is done in place of casting
// to avoid any potential alignment issue.
explicit CacheEntry(const Slice& value) {
DCHECK_EQ(value.size(), sizeof(CacheEntry));
memcpy(this, value.data(), value.size());
}
CacheFile* file() const { return file_; }
int64_t offset() const { return offset_; }
int64_t len() const { return len_; }
uint64_t checksum() const { return checksum_; }
private:
/// The backing file holding the cached content.
CacheFile* const file_ = nullptr;
/// The starting byte offset in the backing file at which the content is stored.
const int64_t offset_ = 0;
/// The length in bytes of the cached content.
const int64_t len_ = 0;
/// Optional checksum of the content computed when inserting the cache entry.
const uint64_t checksum_ = 0;
};
/// The key used for look up in the cache.
struct DataCache::CacheKey {
public:
explicit CacheKey(const string& filename, int64_t mtime, int64_t offset)
: key_(filename.size() + sizeof(mtime) + sizeof(offset)) {
DCHECK_GE(mtime, 0);
DCHECK_GE(offset, 0);
key_.append(&mtime, sizeof(mtime));
key_.append(&offset, sizeof(offset));
key_.append(filename);
}
int64_t Hash() const {
return HashUtil::FastHash64(key_.data(), key_.size(), 0);
}
Slice filename() const {
return Slice(key_.data() + OFFSETOF_FILENAME, key_.size() - OFFSETOF_FILENAME);
}
int64_t mtime() const {
return UNALIGNED_LOAD64(key_.data() + OFFSETOF_MTIME);
}
int64_t offset() const {
return UNALIGNED_LOAD64(key_.data() + OFFSETOF_OFFSET);
}
Slice ToSlice() const {
return key_;
}
private:
// Key encoding stored in key_:
//
// int64_t mtime;
// int64_t offset;
// <variable length bytes> filename;
static constexpr int OFFSETOF_MTIME = 0;
static constexpr int OFFSETOF_OFFSET = OFFSETOF_MTIME + sizeof(int64_t);
static constexpr int OFFSETOF_FILENAME = OFFSETOF_OFFSET + sizeof(int64_t);
faststring key_;
};
DataCache::Partition::Partition(const string& path, int64_t capacity,
int max_opened_files)
: path_(path), capacity_(max<int64_t>(capacity, PAGE_SIZE)),
max_opened_files_(max_opened_files),
meta_cache_(NewLRUCache(kudu::DRAM_CACHE, capacity_, path_)) {
}
DataCache::Partition::~Partition() {
if (!closed_) ReleaseResources();
}
Status DataCache::Partition::CreateCacheFile() {
lock_.DCheckLocked();
const string& path =
JoinPathSegments(path_, CACHE_FILE_PREFIX + PrintId(GenerateUUID()));
unique_ptr<CacheFile> cache_file;
RETURN_IF_ERROR(CacheFile::Create(path, &cache_file));
cache_files_.emplace_back(std::move(cache_file));
LOG(INFO) << "Created cache file " << path;
return Status::OK();
}
Status DataCache::Partition::DeleteExistingFiles() const {
vector<string> entries;
RETURN_IF_ERROR(FileSystemUtil::Directory::GetEntryNames(path_, &entries, 0,
FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
for (const string& entry : entries) {
if (entry.find(CACHE_FILE_PREFIX) == 0) {
const string file_path = JoinPathSegments(path_, entry);
KUDU_RETURN_IF_ERROR(kudu::Env::Default()->DeleteFile(file_path),
Substitute("Failed to delete old cache file $0", file_path));
LOG(INFO) << Substitute("Deleted old cache file $0", file_path);
}
}
return Status::OK();
}
Status DataCache::Partition::Init() {
std::unique_lock<SpinLock> partition_lock(lock_);
// Verify the validity of the path specified.
if (!FileSystemUtil::IsCanonicalPath(path_)) {
return Status(Substitute("$0 is not a canonical path", path_));
}
RETURN_IF_ERROR(FileSystemUtil::VerifyIsDirectory(path_));
// Delete all existing backing files left over from previous runs.
RETURN_IF_ERROR(DeleteExistingFiles());
// Check if there is enough space available at this point in time.
uint64_t available_bytes;
RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(path_, &available_bytes));
if (available_bytes < capacity_) {
const string& err = Substitute("Insufficient space for $0. Required $1. Only $2 is "
"available", path_, PrettyPrinter::PrintBytes(capacity_),
PrettyPrinter::PrintBytes(available_bytes));
LOG(ERROR) << err;
return Status(err);
}
// Make sure hole punching is supported for the caching directory.
RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(path_));
if (FLAGS_data_cache_enable_tracing) {
tracer_.reset(new Tracer(path_ + "/" + TRACE_FILE_NAME));
RETURN_IF_ERROR(tracer_->Start());
}
// Create a backing file for the partition.
RETURN_IF_ERROR(CreateCacheFile());
oldest_opened_file_ = 0;
return Status::OK();
}
Status DataCache::Partition::CloseFilesAndVerifySizes() {
int64_t total_size = 0;
for (auto& file : cache_files_) {
uint64_t sz_on_disk;
// Close the backing files before checking sizes as some filesystems (e.g. XFS)
// preallocate the file beyond EOF. Closing the file removes any preallocation.
file->Close();
kudu::Env* env = kudu::Env::Default();
KUDU_RETURN_IF_ERROR(env->GetFileSizeOnDisk(file->path(), &sz_on_disk),
"CloseFilesAndVerifySizes()");
total_size += sz_on_disk;
uint64_t logical_sz;
KUDU_RETURN_IF_ERROR(env->GetFileSize(file->path(), &logical_sz),
"CloseFilesAndVerifySizes()");
DCHECK_LE(logical_sz, FLAGS_data_cache_file_max_size_bytes);
}
if (total_size > capacity_) {
return Status(Substitute("Partition $0 consumed $1 bytes, exceeding capacity of $2 "
"bytes", path_, total_size, capacity_));
}
return Status::OK();
}
void DataCache::Partition::ReleaseResources() {
std::unique_lock<SpinLock> partition_lock(lock_);
if (closed_) return;
closed_ = true;
// Close and delete all backing files in this partition.
cache_files_.clear();
// Free all memory consumed by the metadata cache.
meta_cache_.reset();
}
int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to_read,
uint8_t* buffer) {
DCHECK(!closed_);
Slice key = cache_key.ToSlice();
kudu::Cache::Handle* handle =
meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE);
if (handle == nullptr) {
if (tracer_ != nullptr) {
tracer_->Trace(Tracer::MISS, cache_key, bytes_to_read, /*entry_len=*/-1);
}
return 0;
}
auto handle_release =
MakeScopeExitTrigger([this, &handle]() { meta_cache_->Release(handle); });
// Read from the backing file.
CacheEntry entry(meta_cache_->Value(handle));
if (tracer_ != nullptr) {
tracer_->Trace(Tracer::HIT, cache_key, bytes_to_read, entry.len());
}
CacheFile* cache_file = entry.file();
bytes_to_read = min(entry.len(), bytes_to_read);
VLOG(3) << Substitute("Reading file $0 offset $1 len $2 checksum $3 bytes_to_read $4",
cache_file->path(), entry.offset(), entry.len(), entry.checksum(), bytes_to_read);
if (UNLIKELY(!cache_file->Read(entry.offset(), buffer, bytes_to_read))) {
meta_cache_->Erase(key);
return 0;
}
// Verify checksum if enabled. Delete entry on checksum mismatch.
if (FLAGS_data_cache_checksum && bytes_to_read == entry.len() &&
!VerifyChecksum("read", entry, buffer, bytes_to_read)) {
meta_cache_->Erase(key);
return 0;
}
return bytes_to_read;
}
bool DataCache::Partition::HandleExistingEntry(const Slice& key,
kudu::Cache::Handle* handle, const uint8_t* buffer, int64_t buffer_len) {
// Unpack the cache entry.
CacheEntry entry(meta_cache_->Value(handle));
// Try verifying the checksum of the new buffer matches that of the existing entry.
// On checksum mismatch, delete the existing entry and don't install the new entry
// as it's unclear which one is right.
if (FLAGS_data_cache_checksum && buffer_len >= entry.len()) {
if (!VerifyChecksum("write", entry, buffer, buffer_len)) {
meta_cache_->Erase(key);
return true;
}
}
// If the new entry is not any longer than the existing entry, no work to do.
return entry.len() >= buffer_len;
}
bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_file,
int64_t insertion_offset, const uint8_t* buffer, int64_t buffer_len) {
DCHECK_EQ(insertion_offset % PAGE_SIZE, 0);
const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
// Allocate a cache handle
kudu::Cache::PendingHandle* pending_handle =
meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len);
if (UNLIKELY(pending_handle == nullptr)) return false;
auto release_pending_handle = MakeScopeExitTrigger([this, &pending_handle]() {
if (pending_handle != nullptr) meta_cache_->Free(pending_handle);
});
// Compute checksum if necessary.
int64_t checksum = FLAGS_data_cache_checksum ? Checksum(buffer, buffer_len) : 0;
// Write to backing file.
VLOG(3) << Substitute("Storing file $0 offset $1 len $2 checksum $3 ",
cache_file->path(), insertion_offset, buffer_len, checksum);
if (UNLIKELY(!cache_file->Write(insertion_offset, buffer, buffer_len))) {
return false;
}
// Insert the new entry into the cache.
CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
memcpy(meta_cache_->MutableValue(pending_handle), &entry, sizeof(CacheEntry));
kudu::Cache::Handle* handle = meta_cache_->Insert(pending_handle, this);
meta_cache_->Release(handle);
pending_handle = nullptr;
ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(charge_len);
return true;
}
bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffer,
int64_t buffer_len, bool* start_reclaim) {
DCHECK(!closed_);
*start_reclaim = false;
Slice key = cache_key.ToSlice();
const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
if (charge_len > capacity_) return false;
// Check for existing entry.
kudu::Cache::Handle* handle = meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE);
if (handle != nullptr) {
auto handle_release =
MakeScopeExitTrigger([this, &handle]() { meta_cache_->Release(handle); });
if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
}
CacheFile* cache_file;
int64_t insertion_offset;
{
std::unique_lock<SpinLock> partition_lock(lock_);
// Limit the write concurrency to avoid blocking the caller (which could be calling
// from the critical path of an IO read) when the cache becomes IO bound due to either
// limited memory for page cache or the cache is undersized which leads to eviction.
//
// TODO: defer the writes to another thread which writes asynchronously. Need to bound
// the extra memory consumption for holding the temporary buffer though.
const bool exceed_concurrency =
pending_insert_set_.size() >= FLAGS_data_cache_write_concurrency;
if (exceed_concurrency ||
pending_insert_set_.find(key.ToString()) != pending_insert_set_.end()) {
ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES->Increment(buffer_len);
if (tracer_ != nullptr) {
tracer_->Trace(Tracer::STORE_FAILED_BUSY, cache_key, /*lookup_len=*/-1,
buffer_len);
}
return false;
}
// Allocate from the backing file.
CHECK(!cache_files_.empty());
cache_file = cache_files_.back().get();
insertion_offset = cache_file->Allocate(charge_len, partition_lock);
// Create and append to a new file if necessary.
if (UNLIKELY(insertion_offset < 0)) {
if (!CreateCacheFile().ok()) return false;
cache_file = cache_files_.back().get();
insertion_offset = cache_file->Allocate(charge_len, partition_lock);
if (UNLIKELY(insertion_offset < 0)) return false;
}
// Start deleting old files if there are too many opened.
*start_reclaim = cache_files_.size() > max_opened_files_;
// Do this last. At this point, we are committed to inserting 'key' into the cache.
pending_insert_set_.emplace(key.ToString());
}
if (tracer_ != nullptr) {
tracer_->Trace(Tracer::STORE, cache_key, /* lookup_len=*/-1, buffer_len);
}
// Set up a scoped exit to always remove entry from the pending insertion set.
auto remove_from_pending_set = MakeScopeExitTrigger([this, &key]() {
std::unique_lock<SpinLock> partition_lock(lock_);
pending_insert_set_.erase(key.ToString());
});
// Try inserting into the cache.
return InsertIntoCache(key, cache_file, insertion_offset, buffer, buffer_len);
}
void DataCache::Partition::DeleteOldFiles() {
std::unique_lock<SpinLock> partition_lock(lock_);
DCHECK_GE(oldest_opened_file_, 0);
int target = cache_files_.size() - FLAGS_data_cache_max_opened_files;
while (oldest_opened_file_ < target) {
cache_files_[oldest_opened_file_++]->DeleteFile();
}
}
void DataCache::Partition::EvictedEntry(Slice key, Slice value) {
if (closed_) return;
// Unpack the cache entry.
CacheEntry entry(value);
int64_t eviction_len = BitUtil::RoundUp(entry.len(), PAGE_SIZE);
DCHECK_EQ(entry.offset() % PAGE_SIZE, 0);
entry.file()->PunchHole(entry.offset(), eviction_len);
ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(-eviction_len);
}
// TODO: Switch to using CRC32 once we fix the TODO in hash-util.h
uint64_t DataCache::Partition::Checksum(const uint8_t* buffer, int64_t buffer_len) {
return HashUtil::FastHash64(buffer, buffer_len, 0xcafebeef);
}
bool DataCache::Partition::VerifyChecksum(const string& ops_name, const CacheEntry& entry,
const uint8_t* buffer, int64_t buffer_len) {
DCHECK(FLAGS_data_cache_checksum);
DCHECK_GE(buffer_len, entry.len());
int64_t checksum = Checksum(buffer, entry.len());
if (UNLIKELY(checksum != entry.checksum())) {
LOG(DFATAL) << Substitute("Checksum mismatch during $0 for file $1 "
"offset: $2 len: $3 buffer len: $4. Expected $5, Got $6.", ops_name,
entry.file()->path(), entry.offset(), entry.len(), buffer_len, entry.checksum(),
checksum);
return false;
}
return true;
}
Status DataCache::Init() {
// Verifies all the configured flags are sane.
if (FLAGS_data_cache_file_max_size_bytes < PAGE_SIZE) {
return Status(Substitute("Misconfigured --data_cache_file_max_size_bytes: $0 bytes. "
"Must be at least $1 bytes", FLAGS_data_cache_file_max_size_bytes, PAGE_SIZE));
}
if (FLAGS_data_cache_write_concurrency < 1) {
return Status(Substitute("Misconfigured --data_cache_write_concurrency: $0. "
"Must be at least 1.", FLAGS_data_cache_write_concurrency));
}
// The expected form of the configuration string is: dir1,dir2,..,dirN:capacity
// Example: /tmp/data1,/tmp/data2:1TB
vector<string> all_cache_configs = Split(config_, ":", SkipEmpty());
if (all_cache_configs.size() != 2) {
return Status(Substitute("Malformed data cache configuration $0", config_));
}
// Parse the capacity string to make sure it's well-formed.
bool is_percent;
int64_t capacity = ParseUtil::ParseMemSpec(all_cache_configs[1], &is_percent, 0);
if (is_percent) {
return Status(Substitute("Malformed data cache capacity configuration $0",
all_cache_configs[1]));
}
if (capacity < PAGE_SIZE) {
return Status(Substitute("Configured data cache capacity $0 is too small",
all_cache_configs[1]));
}
set<string> cache_dirs;
SplitStringToSetUsing(all_cache_configs[0], ",", &cache_dirs);
int max_opened_files_per_partition =
FLAGS_data_cache_max_opened_files / cache_dirs.size();
if (max_opened_files_per_partition < 1) {
return Status(Substitute("Misconfigured --data_cache_max_opened_files: $0. Must be "
"at least $1.", FLAGS_data_cache_max_opened_files, cache_dirs.size()));
}
for (const string& dir_path : cache_dirs) {
LOG(INFO) << "Adding partition " << dir_path << " with capacity "
<< PrettyPrinter::PrintBytes(capacity);
std::unique_ptr<Partition> partition =
make_unique<Partition>(dir_path, capacity, max_opened_files_per_partition);
RETURN_IF_ERROR(partition->Init());
partitions_.emplace_back(move(partition));
}
CHECK_GT(partitions_.size(), 0);
// Starts a thread pool which deletes old files from partitions. DataCache::Store()
// will enqueue a request (i.e. a partition index) when it notices the number of files
// in a partition exceeds the per-partition limit. The files in a partition will be
// closed in the order they are created until it's within the per-partition limit.
file_deleter_pool_.reset(new ThreadPool<int>("impala-server",
"data-cache-file-deleter", 1, MAX_FILE_DELETER_QUEUE_SIZE,
bind<void>(&DataCache::DeleteOldFiles, this, _1, _2)));
RETURN_IF_ERROR(file_deleter_pool_->Init());
return Status::OK();
}
void DataCache::ReleaseResources() {
if (file_deleter_pool_) file_deleter_pool_->Shutdown();
for (auto& partition : partitions_) partition->ReleaseResources();
}
int64_t DataCache::Lookup(const string& filename, int64_t mtime, int64_t offset,
int64_t bytes_to_read, uint8_t* buffer) {
DCHECK(!partitions_.empty());
// Bail out early for uncacheable ranges or invalid requests.
if (mtime < 0 || offset < 0 || bytes_to_read < 0) {
VLOG(3) << Substitute("Skipping lookup of invalid entry $0 mtime: $1 offset: $2 "
"bytes_to_read: $3", filename, mtime, offset, bytes_to_read);
return 0;
}
// Construct a cache key. The cache key is also hashed to compute the partition index.
const CacheKey key(filename, mtime, offset);
int idx = key.Hash() % partitions_.size();
int64_t bytes_read = partitions_[idx]->Lookup(key, bytes_to_read, buffer);
if (VLOG_IS_ON(3)) {
stringstream ss;
ss << std::hex << reinterpret_cast<int64_t>(buffer);
LOG(INFO) << Substitute("Looking up $0 mtime: $1 offset: $2 bytes_to_read: $3 "
"buffer: 0x$4 bytes_read: $5", filename, mtime, offset, bytes_to_read,
ss.str(), bytes_read);
}
return bytes_read;
}
bool DataCache::Store(const string& filename, int64_t mtime, int64_t offset,
const uint8_t* buffer, int64_t buffer_len) {
DCHECK(!partitions_.empty());
// Bail out early for uncacheable ranges or invalid requests.
if (mtime < 0 || offset < 0 || buffer_len < 0) {
VLOG(3) << Substitute("Skipping insertion of invalid entry $0 mtime: $1 offset: $2 "
"buffer_len: $3", filename, mtime, offset, buffer_len);
return false;
}
// Construct a cache key. The cache key is also hashed to compute the partition index.
const CacheKey key(filename, mtime, offset);
int idx = key.Hash() % partitions_.size();
bool start_reclaim;
bool stored = partitions_[idx]->Store(key, buffer, buffer_len, &start_reclaim);
if (VLOG_IS_ON(3)) {
stringstream ss;
ss << std::hex << reinterpret_cast<int64_t>(buffer);
LOG(INFO) << Substitute("Storing $0 mtime: $1 offset: $2 bytes_to_read: $3 "
"buffer: 0x$4 stored: $5", filename, mtime, offset, buffer_len, ss.str(), stored);
}
if (start_reclaim) file_deleter_pool_->Offer(idx);
return stored;
}
Status DataCache::CloseFilesAndVerifySizes() {
for (auto& partition : partitions_) {
RETURN_IF_ERROR(partition->CloseFilesAndVerifySizes());
}
return Status::OK();
}
void DataCache::DeleteOldFiles(uint32_t thread_id, int partition_idx) {
DCHECK_LT(partition_idx, partitions_.size());
partitions_[partition_idx]->DeleteOldFiles();
}
void DataCache::Partition::Tracer::Trace(
CacheStatus status, const DataCache::CacheKey& key,
int64_t lookup_len, int64_t entry_len) {
ostringstream buf;
kudu::JsonWriter jw(&buf, kudu::JsonWriter::COMPACT);
jw.StartObject();
jw.String("ts");
jw.Double(WallTime_Now());
jw.String("s");
switch (status) {
case HIT: jw.String("H"); break;
case MISS: jw.String("M"); break;
case STORE: jw.String("S"); break;
case STORE_FAILED_BUSY: jw.String("F"); break;
}
jw.String("f");
if (FLAGS_data_cache_anonymize_trace) {
uint128 hash = util_hash::CityHash128(
reinterpret_cast<const char*>(key.filename().data()),
key.filename().size());
// A 128-bit (16-byte) hash results in a 24-byte base64-encoded string, including
// two characters of padding.
const int BUF_LEN = 24;
DCHECK_EQ(BUF_LEN, CalculateBase64EscapedLen(sizeof(hash)));
char b64_buf[BUF_LEN];
int out_len = Base64Escape(reinterpret_cast<const unsigned char*>(&hash),
sizeof(hash), b64_buf, BUF_LEN);
DCHECK_EQ(out_len, BUF_LEN);
// Chop off the two padding bytes.
DCHECK(b64_buf[23] == '=' && b64_buf[22] == '=');
out_len = 22;
jw.String(b64_buf, out_len);
} else {
jw.String(reinterpret_cast<const char*>(key.filename().data()),
key.filename().size());
}
jw.String("m");
jw.Int64(key.mtime());
jw.String("o");
jw.Int64(key.offset());
if (lookup_len != -1) {
jw.String("lLen");
jw.Int64(lookup_len);
}
if (entry_len != -1) {
jw.String("eLen");
jw.Int64(entry_len);
}
jw.EndObject();
buf << "\n";
string s = buf.str();
logger_->Write(/*force_flush=*/false, /*timestamp=*/0, s.data(), s.size());
}
} // namespace io
} // namespace impala