blob: dbeb8ef6804af4bec42dcb602ac42c3a73b3882f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/tuple-cache-mgr.h"
#include <boost/filesystem.hpp>
#include "common/constant-strings.h"
#include "common/logging.h"
#include "exec/tuple-file-reader.h"
#include "exec/tuple-text-file-reader.h"
#include "gutil/strings/split.h"
#include "gutil/strings/substitute.h"
#include "kudu/util/env.h"
#include "kudu/util/path_util.h"
#include "util/filesystem-util.h"
#include "util/histogram-metric.h"
#include "util/kudu-status-util.h"
#include "util/parse-util.h"
#include "util/pretty-printer.h"
#include "util/test-info.h"
#include "common/names.h"
namespace filesystem = boost::filesystem;
using kudu::JoinPathSegments;
using strings::SkipEmpty;
using strings::Split;
// We ensure each impalad process gets a unique directory because currently cache keys
// can be shared across instances but represent different scan ranges.
DEFINE_string(tuple_cache, "", "The configuration string for the tuple cache. "
"The default is the empty string, which disables the tuple cache. The configuration "
"string is expected to be a directory followed by a ':' and a capacity quota. "
"For example, /data/0:1TB means the cache may use 1TB in /data/0. Please note that "
"each Impala daemon on a host must have a unique cache directory.");
DEFINE_string(tuple_cache_eviction_policy, "LRU",
"(Advanced) The cache eviction policy to use for the tuple cache. "
"Either 'LRU' (default) or 'LIRS' (experimental)");
DEFINE_string(tuple_cache_debug_dump_dir, "",
"Directory for dumping the intermediate query result tuples for debugging purpose.");
DEFINE_uint32(tuple_cache_sync_pool_size, 10,
"(Advanced) Size of the thread pool syncing cache files to disk asynchronously. "
"If set to 0, cache files are flushed sychronously.");
DEFINE_uint32(tuple_cache_sync_pool_queue_depth, 1000,
"(Advanced) Maximum queue depth for the thread pool syncing cache files to disk");
static const string OUTSTANDING_WRITE_LIMIT_MSG =
"(Advanced) Limit on the size of outstanding tuple cache writes. " +
Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
DEFINE_string(tuple_cache_outstanding_write_limit, "1GB",
OUTSTANDING_WRITE_LIMIT_MSG.c_str());
DEFINE_uint32(tuple_cache_outstanding_write_chunk_bytes, 128 * 1024,
"(Advanced) Chunk size for incrementing the outstanding tuple cache write size");
// Global feature flag for tuple caching. If false, enable_tuple_cache cannot be true
// and the coordinator cannot produce plans with TupleCacheNodes. The tuple_cache
// parameter also cannot be specified.
DEFINE_bool(allow_tuple_caching, false, "If false, tuple caching cannot be used.");
namespace impala {
// Minimum tuple cache capacity is 64MB.
static const int64_t MIN_TUPLE_CACHE_CAPACITY_BYTES = 64L << 20;
static const char* MIN_TUPLE_CACHE_CAPACITY_STR = "64MB";
// Maximum tuple cache entry size for the purposes of sizing the histogram of
// entry sizes. This does not currently constrain the actual entries.
static constexpr int64_t STATS_MAX_TUPLE_CACHE_ENTRY_SIZE = 128L << 20;
static const char* CACHE_FILE_PREFIX = "tuple-cache-";
static const char* DUBUG_DUMP_SUB_DIR_NAME = "tuple-cache-debug-dump";
static string ConstructTupleCacheDebugDumpPath() {
// Construct and return the path for debug dumping the tuple cache if configured.
string& cache_debug_dump_dir = FLAGS_tuple_cache_debug_dump_dir;
if (cache_debug_dump_dir != "") {
filesystem::path path(cache_debug_dump_dir);
path /= DUBUG_DUMP_SUB_DIR_NAME;
// Remove and recreate the subdirectory if it exists.
Status cr_status = FileSystemUtil::RemoveAndCreateDirectory(path.string());
if (cr_status.ok()) {
LOG(INFO) << "Created tuple cache debug dump path in " << path.string();
return path.string();
}
LOG(WARNING) << "Unable to create directory for tuple cache dumping: "
<< cr_status.GetDetail();
}
return string();
}
TupleCacheMgr::TupleCacheMgr(MetricGroup* metrics)
: TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, metrics,
/* debug_pos */ 0, FLAGS_tuple_cache_sync_pool_size,
FLAGS_tuple_cache_sync_pool_queue_depth,
FLAGS_tuple_cache_outstanding_write_limit,
FLAGS_tuple_cache_outstanding_write_chunk_bytes) {}
TupleCacheMgr::TupleCacheMgr(
string cache_config, string eviction_policy_str, MetricGroup* metrics,
uint8_t debug_pos, uint32_t sync_pool_size, uint32_t sync_pool_queue_depth,
string outstanding_write_limit_str, uint32_t outstanding_write_chunk_bytes)
: cache_config_(move(cache_config)),
eviction_policy_str_(move(eviction_policy_str)),
outstanding_write_limit_str_(move(outstanding_write_limit_str)),
cache_debug_dump_dir_(ConstructTupleCacheDebugDumpPath()),
debug_pos_(debug_pos),
sync_pool_size_(sync_pool_size),
sync_pool_queue_depth_(sync_pool_queue_depth),
outstanding_write_chunk_bytes_(outstanding_write_chunk_bytes),
tuple_cache_hits_(metrics->AddCounter("impala.tuple-cache.hits", 0)),
tuple_cache_misses_(metrics->AddCounter("impala.tuple-cache.misses", 0)),
tuple_cache_skipped_(metrics->AddCounter("impala.tuple-cache.skipped", 0)),
tuple_cache_halted_(metrics->AddCounter("impala.tuple-cache.halted", 0)),
tuple_cache_backpressure_halted_(
metrics->AddCounter("impala.tuple-cache.backpressure-halted", 0)),
tuple_cache_entries_evicted_(
metrics->AddCounter("impala.tuple-cache.entries-evicted", 0)),
tuple_cache_failed_sync_(metrics->AddCounter("impala.tuple-cache.failed-syncs", 0)),
tuple_cache_dropped_sync_(metrics->AddCounter("impala.tuple-cache.dropped-syncs", 0)),
tuple_cache_entries_in_use_(
metrics->AddGauge("impala.tuple-cache.entries-in-use", 0)),
tuple_cache_entries_in_use_bytes_(
metrics->AddGauge("impala.tuple-cache.entries-in-use-bytes", 0)),
tuple_cache_tombstones_in_use_(
metrics->AddGauge("impala.tuple-cache.tombstones-in-use", 0)),
tuple_cache_outstanding_writes_bytes_(
metrics->AddGauge("impala.tuple-cache.outstanding-writes-bytes", 0)),
tuple_cache_entry_size_stats_(metrics->RegisterMetric(
new HistogramMetric(MetricDefs::Get("impala.tuple-cache.entry-sizes"),
STATS_MAX_TUPLE_CACHE_ENTRY_SIZE, 3))) {}
Status TupleCacheMgr::Init(int64_t process_bytes_limit) {
if (cache_config_.empty()) {
LOG(INFO) << "Tuple Cache is disabled.";
return Status::OK();
}
// The expected form of the configuration string is: dir1:capacity
// Example: /tmp/data1:1TB
vector<string> all_cache_configs = Split(cache_config_, ":", SkipEmpty());
if (all_cache_configs.size() != 2) {
return Status(Substitute("Malformed data cache configuration $0", cache_config_));
}
// Parse the capacity string to make sure it's well-formed
bool is_percent = false;
int64_t capacity = ParseUtil::ParseMemSpec(all_cache_configs[1], &is_percent, 0);
if (is_percent) {
return Status(Substitute("Malformed tuple cache capacity configuration $0",
all_cache_configs[1]));
}
// For normal execution (not backend tests), impose a minimum size on the
// cache of 64MB.
if (!TestInfo::is_be_test() && capacity < MIN_TUPLE_CACHE_CAPACITY_BYTES) {
return Status(Substitute(
"Tuple cache capacity $0 is less than the minimum allowed capacity $1",
all_cache_configs[1], MIN_TUPLE_CACHE_CAPACITY_STR));
}
vector<string> cache_dirs = Split(all_cache_configs[0], ",", SkipEmpty());
if (cache_dirs.size() > 1) {
return Status(Substitute("Malformed tuple cache directory $0. "
"The tuple cache only supports a single directory.", all_cache_configs[0]));
}
cache_dir_ = cache_dirs[0];
// Verify the validity of the path specified.
if (!FileSystemUtil::IsCanonicalPath(cache_dir_)) {
return Status(Substitute("$0 is not a canonical path", cache_dir_));
}
RETURN_IF_ERROR(FileSystemUtil::VerifyIsDirectory(cache_dir_));
// Verify we can create a file in the cache directory
filesystem::path path =
filesystem::path(cache_dir_) / Substitute("$0test", CACHE_FILE_PREFIX);
RETURN_IF_ERROR(FileSystemUtil::CreateFile(path.string()));
// Remove any existing cache files from the cache directory. This ensures that the
// usage starts at zero. This also removes the test file we just wrote.
RETURN_IF_ERROR(DeleteExistingFiles());
// Check that there is enough space available for the specified cache size
uint64_t available_bytes;
RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(cache_dir_, &available_bytes));
if (available_bytes < capacity) {
string err = Substitute("Insufficient space for $0. Required $1. Only $2 is "
"available", cache_dir_, PrettyPrinter::PrintBytes(capacity),
PrettyPrinter::PrintBytes(available_bytes));
LOG(ERROR) << err;
return Status(err);
}
// Verify the cache eviction policy
Cache::EvictionPolicy policy = Cache::ParseEvictionPolicy(eviction_policy_str_);
if (policy != Cache::EvictionPolicy::LRU && policy != Cache::EvictionPolicy::LIRS) {
return Status(Substitute("Unsupported tuple cache eviction policy: $0",
eviction_policy_str_));
}
// The outstanding write limit can either be a specific value, or it can be a
// percentage of the process bytes limit. If the process bytes limit is zero,
// a percentage is not allowed.
outstanding_write_limit_ = ParseUtil::ParseMemSpec(outstanding_write_limit_str_,
&is_percent, process_bytes_limit);
if (outstanding_write_limit_ <= 0) {
CLEAN_EXIT_WITH_ERROR(
Substitute("Invalid tuple cache outstanding write limit configuration: $0.",
FLAGS_tuple_cache_outstanding_write_limit));
}
// Setting sync_pool_size == 0 results in synchronous flushing to disk. This is
// mainly used for backend tests
if (sync_pool_size_ > 0) {
sync_thread_pool_.reset(new ThreadPool<string>("tuple-cache-mgr", "sync-worker",
sync_pool_size_, sync_pool_queue_depth_,
[this] (int thread_id, const string& filename) {
this->SyncFileToDisk(filename);
}));
RETURN_IF_ERROR(sync_thread_pool_->Init());
}
cache_.reset(NewCache(policy, capacity, "Tuple_Cache"));
RETURN_IF_ERROR(cache_->Init());
LOG(INFO) << "Tuple Cache initialized at " << cache_dir_
<< " with capacity " << PrettyPrinter::Print(capacity, TUnit::BYTES)
<< " and outstanding write limit: "
<< PrettyPrinter::Print(outstanding_write_limit_, TUnit::BYTES);
enabled_ = true;
return Status::OK();
}
// TupleCacheState tracks what operations can be performed on a cache entry,
// and is stored as part of a UniqueHandle. The diagram below outlines transitions
// between states and what operations are allowed. State transitions are atomic. State is
// represented by [TupleCacheState, IsAvailableForRead()]. Cache lookup, insert,
// erase all rely on internal locking of the private Cache object.
//
// Lookup(acquire_state=false): If an entry exists return it; else return an empty entry.
// IsAvailableForWrite() always returns false.
//
// Lookup(acquire_state=true): If an entry exists return it (IsAvailableForWrite()=false);
// else acquire creation_lock_ and check if an entry exists
// again, if found return it (IsAvailableForWrite()=false),
// else create a new entry (IsAvailableForWrite()=true).
//
// entry found
// Lookup(acquire_state=true) ---> [ ... ] returns any of the states below
// |
// | entry absent: create new entry
// v CompleteWrite SyncFileToDisk
// [ IN_PROGRESS, false ] ---> [ COMPLETE_UNSYNCED, true ] ---> [ COMPLETE, true ]
// |
// | AbortWrite
// v tombstone=true
// [ IN_PROGRESS, false ] ---> [ TOMBSTONE, false ]
// |
// | tombstone=false
// v
// [ IN_PROGRESS, false ] Scheduled for eviction, will be deleted once ref count=0.
//
enum class TupleCacheState { IN_PROGRESS, TOMBSTONE, COMPLETE_UNSYNCED, COMPLETE };
// An entry consists of a TupleCacheEntry followed by a C-string for the path.
struct TupleCacheEntry {
std::atomic<TupleCacheState> state{TupleCacheState::IN_PROGRESS};
// Charge in the cache when there is a file associated with this entry. This is zero for
// TOMBSTONE and IN_PROGRESS before the first UpdateWriteSize call, but those states
// still have a base charge in the cache. During IN_PROGRESS, this is a reservation that
// exceeds the current size of the file.
size_t charge = 0;
};
struct TupleCacheMgr::Handle {
Cache::UniqueHandle cache_handle;
bool is_writer = false;
// Minimum charge to use if this entry becomes a TOMBSTONE
size_t base_charge = 0;
};
void TupleCacheMgr::HandleDeleter::operator()(Handle* ptr) const { delete ptr; }
static uint8_t* getHandleData(const Cache* cache, TupleCacheMgr::Handle* handle) {
DCHECK(handle->cache_handle != nullptr);
return cache->Value(handle->cache_handle).mutable_data();
}
TupleCacheState TupleCacheMgr::GetState(TupleCacheMgr::Handle* handle) const {
uint8_t* data = getHandleData(cache_.get(), handle);
return reinterpret_cast<TupleCacheEntry*>(data)->state;
}
// Returns true if state was updated.
bool TupleCacheMgr::UpdateState(TupleCacheMgr::Handle* handle,
TupleCacheState requiredState, TupleCacheState newState) {
uint8_t* data = getHandleData(cache_.get(), handle);
return reinterpret_cast<TupleCacheEntry*>(data)->
state.compare_exchange_strong(requiredState, newState);
}
size_t TupleCacheMgr::GetCharge(TupleCacheMgr::Handle* handle) const {
uint8_t* data = getHandleData(cache_.get(), handle);
return reinterpret_cast<TupleCacheEntry*>(data)->charge;
}
void TupleCacheMgr::UpdateWriteSize(TupleCacheMgr::Handle* handle,
size_t charge) {
uint8_t* data = getHandleData(cache_.get(), handle);
// We can only adjust the cache charge while an entry is IN_PROGRESS
DCHECK(TupleCacheState::IN_PROGRESS == GetState(handle));
TupleCacheEntry* entry = reinterpret_cast<TupleCacheEntry*>(data);
int64_t diff = charge - entry->charge;
entry->charge = charge;
cache_->UpdateCharge(handle->cache_handle, charge);
if (diff < 0) {
DCHECK_LE(-diff, tuple_cache_outstanding_writes_bytes_->GetValue());
}
tuple_cache_outstanding_writes_bytes_->Increment(diff);
}
static Cache::UniquePendingHandle CreateEntry(
Cache* cache, const Slice& key, const string& path) {
Cache::UniquePendingHandle pending =
cache->Allocate(key, sizeof(TupleCacheEntry) + path.size() + 1);
if (!pending) {
return pending;
}
// buf is used to store a TupleCacheEntry, followed by the path.
uint8_t* buf = cache->MutableValue(&pending);
new (buf) TupleCacheEntry{};
char* path_s = reinterpret_cast<char*>(buf + sizeof(TupleCacheEntry));
memcpy(path_s, path.data(), path.size());
// Null terminate because UniquePendingHandle doesn't provide access to size.
path_s[path.size()] = '\0';
return pending;
}
// If the entry exists, the Handle pins it so it doesn't go away, but the entry may be in
// any state (IN PROGRESS, TOMBSTONE, COMPLETE_UNSYNCED, COMPLETE). If the entry doesn't
// exist and acquire_write is true, it's created with the state IN_PROGRESS.
TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
const Slice& key, bool acquire_write) {
if (!enabled_) return nullptr;
UniqueHandle handle{new Handle()};
if (Cache::UniqueHandle pos = cache_->Lookup(key); pos) {
handle->cache_handle = move(pos);
} else if (acquire_write) {
lock_guard<mutex> guard(creation_lock_);
// Retry lookup under the creation lock in case another thread added an entry.
if (Cache::UniqueHandle pos = cache_->Lookup(key); pos) {
handle->cache_handle = move(pos);
} else {
// Entry not found, create a new one.
filesystem::path path =
filesystem::path(cache_dir_) / (CACHE_FILE_PREFIX + key.ToString());
Cache::UniquePendingHandle pending = CreateEntry(cache_.get(), key, path.string());
if (UNLIKELY(!pending || debug_pos_ & DebugPos::FAIL_ALLOCATE)) {
VLOG_FILE << "Tuple Cache: CreateEntry failed for " << path;
return handle;
}
// Insert into the cache. If immediately evicted, evict_callback_ handles cleanup
// and decrements the counter.
tuple_cache_entries_in_use_->Increment(1);
Cache::UniqueHandle chandle = cache_->Insert(move(pending), this);
if (UNLIKELY(!chandle || debug_pos_ & DebugPos::FAIL_INSERT)) {
// This shouldn't happen normally because the cache is recency-based and initial
// handle is small.
LOG(WARNING) << "Tuple Cache Entry was immediately evicted";
return handle;
}
VLOG_FILE << "Tuple Cache Entry created for " << path;
handle->cache_handle = move(chandle);
handle->is_writer = true;
handle->base_charge = sizeof(TupleCacheEntry) + path.size() + 1;
}
}
return handle;
}
bool TupleCacheMgr::IsAvailableForRead(UniqueHandle& handle) const {
if (!handle || !handle->cache_handle) return false;
TupleCacheState state = GetState(handle.get());
return TupleCacheState::COMPLETE_UNSYNCED == state ||
TupleCacheState::COMPLETE == state;
}
bool TupleCacheMgr::IsAvailableForWrite(UniqueHandle& handle) const {
if (!handle || !handle->cache_handle) return false;
return handle->is_writer && TupleCacheState::IN_PROGRESS == GetState(handle.get());
}
void TupleCacheMgr::CompleteWrite(UniqueHandle handle, size_t size) {
DCHECK(enabled_);
DCHECK(handle != nullptr && handle->cache_handle != nullptr);
DCHECK(handle->is_writer);
DCHECK_LE(size, MaxSize());
DCHECK_GE(size, 0);
if (sync_pool_size_ > 0 &&
sync_thread_pool_->GetQueueSize() >= sync_pool_queue_depth_) {
// The sync_thread_pool_ has reached its max queue size. This should almost never
// happen, as the outstanding writes limit should kick in before this is overwhelmed.
// If it does happen, bail out.
AbortWrite(move(handle), false);
tuple_cache_dropped_sync_->Increment(1);
return;
}
VLOG_FILE << "Tuple Cache: Complete " << GetPath(handle) << " (" << size << ")";
UpdateWriteSize(handle.get(), size);
CHECK(UpdateState(handle.get(),
TupleCacheState::IN_PROGRESS, TupleCacheState::COMPLETE_UNSYNCED));
tuple_cache_entries_in_use_bytes_->Increment(size);
tuple_cache_entry_size_stats_->Update(size);
// When the sync_pool_size_ is 0, there is no thread pool and this does the sync
// directly. This is used for backend tests to avoid race conditions.
if (sync_pool_size_ > 0) {
// Offer the cache key to the thread pool.
bool success = sync_thread_pool_->Offer(cache_->Key(handle->cache_handle).ToString());
if (!success) {
// The queue is full, so evict this entry
VLOG_FILE << "Tuple Cache: Sync thread pool queue full. Evicting "
<< GetPath(handle);
cache_->Erase(cache_->Key(handle->cache_handle));
tuple_cache_dropped_sync_->Increment(1);
}
} else {
SyncFileToDisk(cache_->Key(handle->cache_handle).ToString());
}
}
void TupleCacheMgr::AbortWrite(UniqueHandle handle, bool tombstone) {
DCHECK(enabled_);
DCHECK(handle != nullptr && handle->cache_handle != nullptr);
DCHECK(handle->is_writer);
if (tombstone) {
VLOG_FILE << "Tuple Cache: Tombstone " << GetPath(handle);
tuple_cache_tombstones_in_use_->Increment(1);
// We update the write size to 0 to remove the existing cache charge
// (and decrement the outstanding writes counter)
UpdateWriteSize(handle.get(), 0);
CHECK(UpdateState(handle.get(),
TupleCacheState::IN_PROGRESS, TupleCacheState::TOMBSTONE));
// We want the tombstone cache entry to have a base charge, so set that now
// (without counting towards the outstanding writes).
cache_->UpdateCharge(handle->cache_handle, handle->base_charge);
} else {
// Remove the cache entry. Leaves state IN_PROGRESS so entry won't be reused until
// successfully evicted.
DCHECK(TupleCacheState::IN_PROGRESS == GetState(handle.get()));
cache_->Erase(cache_->Key(handle->cache_handle));
}
}
Status TupleCacheMgr::RequestWriteSize(UniqueHandle* handle, size_t new_size) {
// The handle better be from a writer
DCHECK((*handle)->is_writer);
uint8_t* data = getHandleData(cache_.get(), handle->get());
size_t cur_charge = reinterpret_cast<TupleCacheEntry*>(data)->charge;
if (new_size > cur_charge) {
// Need to increase the charge, which can fail
// 1. There is a maximum size for any given entry
// 2. There is a maximum amount of outstanding writes (i.e. dirty buffers)
// The chunk size limits the frequency of incrementing the counter in the cache
// itself. The chunk size is disabled for unit tests to have exact counter values.
// The chunk size does not impact enforcement of the maximum entry size.
// An individual entry cannot exceed the MaxSize()
if (new_size > MaxSize()) {
tuple_cache_halted_->Increment(1);
return Status(TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED, MaxSize());
}
size_t new_charge = new_size;
if (outstanding_write_chunk_bytes_ != 0) {
new_charge = ((new_size / outstanding_write_chunk_bytes_) + 1) *
outstanding_write_chunk_bytes_;
// The chunk size should not change the behavior of the MaxSize(), so limit the
// new_charge to MaxSize() if it would otherwise exceed it.
if (new_charge > MaxSize()) {
new_charge = MaxSize();
}
}
int64_t diff = new_charge - cur_charge;
DCHECK_GT(new_charge, cur_charge);
DCHECK_GE(new_charge, new_size);
// Limit the total outstanding writes to avoid excessive dirty buffers for the OS
if (tuple_cache_outstanding_writes_bytes_->GetValue() + diff >
outstanding_write_limit_) {
tuple_cache_backpressure_halted_->Increment(1);
return Status(TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED,
outstanding_write_limit_);
}
UpdateWriteSize(handle->get(), new_charge);
}
return Status::OK();
}
const char* TupleCacheMgr::GetPath(UniqueHandle& handle) const {
DCHECK(enabled_);
DCHECK(handle != nullptr && handle->cache_handle != nullptr);
uint8_t* data = getHandleData(cache_.get(), handle.get());
return reinterpret_cast<const char*>(data + sizeof(TupleCacheEntry));
}
Status TupleCacheMgr::CreateDebugDumpSubdir(const string& sub_dir) {
DCHECK(!sub_dir.empty());
bool path_exists = false;
std::lock_guard<std::mutex> l(debug_dump_subdir_lock_);
// Try to create the subdir if it doesn't already exist.
RETURN_IF_ERROR(FileSystemUtil::PathExists(sub_dir, &path_exists));
if (!path_exists) {
RETURN_IF_ERROR(FileSystemUtil::CreateDirectory(sub_dir));
}
return Status::OK();
}
string TupleCacheMgr::GetDebugDumpPath(
const string& sub_dir, const string& file_name, string* sub_dir_full_path) const {
filesystem::path full_path(cache_debug_dump_dir_);
full_path /= sub_dir;
if (sub_dir_full_path != nullptr) *sub_dir_full_path = full_path.string();
full_path /= file_name;
return full_path.string();
}
void TupleCacheMgr::StoreMetadataForTupleCache(
const string& cache_key, const string& fragment_id) {
std::lock_guard<std::mutex> l(debug_dump_lock_);
DebugDumpCacheMetaData& cache = debug_dump_caches_metadata_[cache_key];
cache.fragment_id = fragment_id;
}
string TupleCacheMgr::GetFragmentIdForTupleCache(const string& cache_key) {
std::lock_guard<std::mutex> l(debug_dump_lock_);
auto iter = debug_dump_caches_metadata_.find(cache_key);
if (iter == debug_dump_caches_metadata_.end()) {
return string();
}
return iter->second.fragment_id;
}
void TupleCacheMgr::RemoveMetadataForTupleCache(const string& cache_key) {
std::lock_guard<std::mutex> l(debug_dump_lock_);
auto it = debug_dump_caches_metadata_.find(cache_key);
if (it != debug_dump_caches_metadata_.end()) {
debug_dump_caches_metadata_.erase(it);
}
}
void TupleCacheMgr::EvictedEntry(Slice key, Slice value) {
const TupleCacheEntry* entry = reinterpret_cast<const TupleCacheEntry*>(value.data());
if (TupleCacheState::TOMBSTONE != entry->state) {
DCHECK(tuple_cache_entries_evicted_ != nullptr);
DCHECK(tuple_cache_entries_in_use_ != nullptr);
DCHECK(tuple_cache_entries_in_use_bytes_ != nullptr);
tuple_cache_entries_evicted_->Increment(1);
tuple_cache_entries_in_use_->Increment(-1);
DCHECK_GE(tuple_cache_entries_in_use_->GetValue(), 0);
// entries_in_use_bytes is incremented only when the entry reaches the
// COMPLETE_UNSYNCED state
if (TupleCacheState::COMPLETE_UNSYNCED == entry->state ||
TupleCacheState::COMPLETE == entry->state) {
tuple_cache_entries_in_use_bytes_->Increment(-entry->charge);
DCHECK_GE(tuple_cache_entries_in_use_bytes_->GetValue(), 0);
}
// Outstanding write bytes are accumulated during IN_PROGRESS, and remain set until
// the transition from COMPLETE_UNSYNCED to COMPLETE.
if (TupleCacheState::COMPLETE_UNSYNCED == entry->state ||
TupleCacheState::IN_PROGRESS == entry->state) {
DCHECK(tuple_cache_outstanding_writes_bytes_ != nullptr);
DCHECK_GE(tuple_cache_outstanding_writes_bytes_->GetValue(), entry->charge);
tuple_cache_outstanding_writes_bytes_->Increment(-entry->charge);
}
} else {
DCHECK(tuple_cache_tombstones_in_use_ != nullptr);
tuple_cache_tombstones_in_use_->Increment(-1);
DCHECK_GE(tuple_cache_tombstones_in_use_->GetValue(), 0);
}
// Retrieve the path following TupleCacheEntry.
value.remove_prefix(sizeof(TupleCacheEntry));
value.truncate(value.size() - 1);
VLOG_FILE << "Tuple Cache: Evict " << key << " at " << value.ToString();
// Delete file on disk.
kudu::Status status = kudu::Env::Default()->DeleteFile(value.ToString());
if (!status.ok()) {
LOG(WARNING) <<
Substitute("Failed to unlink $0: $1", value.ToString(), status.ToString());
}
// If correctness checking is enabled, remove the associated metadata as well.
if (DebugDumpEnabled()) {
string key_str(reinterpret_cast<const char*>(key.data()), key.size());
RemoveMetadataForTupleCache(key_str);
}
}
Status TupleCacheMgr::DeleteExistingFiles() const {
vector<string> entries;
RETURN_IF_ERROR(FileSystemUtil::Directory::GetEntryNames(cache_dir_, &entries, 0,
FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
for (const string& entry : entries) {
if (entry.find(CACHE_FILE_PREFIX) == 0) {
const string file_path = JoinPathSegments(cache_dir_, entry);
KUDU_RETURN_IF_ERROR(kudu::Env::Default()->DeleteFile(file_path),
Substitute("Failed to delete old cache file $0", file_path));
LOG(INFO) << Substitute("Deleted old cache file $0", file_path);
}
}
return Status::OK();
}
void TupleCacheMgr::SyncFileToDisk(const string& cache_key) {
Cache::UniqueHandle pos = cache_->Lookup(cache_key);
// The entry can be evicted while waiting to be synced to disk. If the entry no longer
// exists, there is nothing to do.
if (pos == nullptr) return;
UniqueHandle handle{new Handle()};
handle->cache_handle = move(pos);
// If the entry has a state other than COMPLETE_UNSYNCED, it could have been
// evicted and recreated. There is nothing to do.
if (TupleCacheState::COMPLETE_UNSYNCED != GetState(handle.get())) {
return;
}
bool success = true;
// Some unit tests don't create a real file when testing the TupleCacheMgr, so
// only do the sync if there is a backing file
bool has_backing_file = !(debug_pos_ & DebugPos::NO_FILES);
if (has_backing_file) {
// Open the cache file associated with this key, then call Sync() on it, and
// close it.
std::string file_path = GetPath(handle);
std::unique_ptr<kudu::RWFile> file_to_sync;
kudu::RWFileOptions opts;
opts.mode = kudu::Env::OpenMode::MUST_EXIST;
kudu::Status s = kudu::Env::Default()->NewRWFile(opts, file_path, &file_to_sync);
if (!s.ok()) {
LOG(WARNING) << Substitute("SyncFileToDisk: Failed to open file $0: $1", file_path,
s.ToString());
success = false;
} else {
s = file_to_sync->Sync();
if (!s.ok()) {
LOG(WARNING) << Substitute("SyncFileToDisk: Failed to sync file $0: $1",
file_path, s.ToString());
success = false;
}
// Close the file even if Sync() fails
s = file_to_sync->Close();
if (!s.ok()) {
LOG(WARNING) << Substitute("SyncFileToDisk: Failed to close file $0: $1",
file_path, s.ToString());
success = false;
}
}
}
if (success) {
bool update_succeeded = UpdateState(handle.get(),
TupleCacheState::COMPLETE_UNSYNCED, TupleCacheState::COMPLETE);
if (update_succeeded) {
tuple_cache_outstanding_writes_bytes_->Increment(-GetCharge(handle.get()));
}
// Only crash for a failed state change on debug builds. The sync completed
// and the state change doesn't really impact external behavior. It isn't
// worth crashing on a release build.
DCHECK(update_succeeded);
} else {
// In case of any error, erase this cache entry
VLOG_FILE << "Tuple Cache: SyncFileToDisk failed. Evicting " << GetPath(handle);
cache_->Erase(cache_->Key(handle->cache_handle));
tuple_cache_failed_sync_->Increment(1);
}
}
} // namespace impala