blob: b5a985fa1f16cf53dc367601de94b6ace67886c3 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
// and modified by Doris
#include "io/cache/block_file_cache.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#if defined(__APPLE__)
#include <sys/mount.h>
#else
#include <sys/statfs.h>
#endif
#include <chrono> // IWYU pragma: keep
#include <mutex>
#include <ranges>
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/common/sip_hash.h"
#include "vec/common/uint128.h"
namespace doris::io {
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
_capacity(cache_settings.capacity),
_max_file_block_size(cache_settings.max_file_block_size),
_max_query_cache_size(cache_settings.max_query_cache_size) {
_cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
"file_cache_cache_size", 0);
_cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_capacity", _capacity);
_cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
_cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
_cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
_cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
_cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
_cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
_cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
_cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
_cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
_queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_index_queue_evict_size");
_queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
_queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
_queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
_total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_total_evict_size");
_evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_disposable_to_normal");
_evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_disposable_to_index");
_evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_disposable_to_ttl");
_evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_normal_to_disposable");
_evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_normal_to_index");
_evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_normal_to_ttl");
_evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_index_to_disposable");
_evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_index_to_normal");
_evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_index_to_ttl");
_evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_ttl_to_disposable");
_evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_ttl_to_normal");
_evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_time_ttl_to_index");
_evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_self_lru_disposable");
_evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_self_lru_normal");
_evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
_evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_disposable_to_normal");
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_disposable_to_index");
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_disposable_to_ttl");
_evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_normal_to_disposable");
_evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_normal_to_index");
_evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_normal_to_ttl");
_evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_index_to_disposable");
_evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_index_to_normal");
_evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_index_to_ttl");
_evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_ttl_to_disposable");
_evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_ttl_to_normal");
_evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_evict_by_size_ttl_to_index");
_evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_evict_by_try_release");
_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_num_read_blocks");
_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_num_hit_blocks");
_num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_num_removed_blocks");
_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
_cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
_cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
_cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
_cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
3600);
_hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
"file_cache_hit_ratio", 0.0);
_hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
"file_cache_hit_ratio_5m", 0.0);
_hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
"file_cache_hit_ratio_1h", 0.0);
_disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
_need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
_cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
_get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
_storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
_storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
_storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
_evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
_recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_recycle_keys_length");
_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
cache_settings.disposable_queue_elements, 60 * 60);
_index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
7 * 24 * 60 * 60);
_normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
24 * 60 * 60);
_ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
std::numeric_limits<int>::max());
if (cache_settings.storage == "memory") {
_storage = std::make_unique<MemFileCacheStorage>();
_cache_base_path = "memory";
} else {
_storage = std::make_unique<FSFileCacheStorage>();
}
LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
}
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
uint128_t value;
sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
return UInt128Wrapper(value);
}
std::string BlockFileCache::cache_type_to_string(FileCacheType type) {
switch (type) {
case FileCacheType::INDEX:
return "_idx";
case FileCacheType::DISPOSABLE:
return "_disposable";
case FileCacheType::NORMAL:
return "";
case FileCacheType::TTL:
return "_ttl";
}
return "";
}
FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) {
if (str == "idx") {
return FileCacheType::INDEX;
} else if (str == "disposable") {
return FileCacheType::DISPOSABLE;
} else if (str == "ttl") {
return FileCacheType::TTL;
}
DCHECK(false) << "The string is " << str;
return FileCacheType::DISPOSABLE;
}
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
const TUniqueId& query_id) {
SCOPED_CACHE_LOCK(_mutex, this);
if (!config::enable_file_cache_query_limit) {
return {};
}
/// if enable_filesystem_query_cache_limit is true,
/// we create context query for current query.
auto context = get_or_set_query_context(query_id, cache_lock);
return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
}
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
auto query_iter = _query_map.find(query_id);
return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
}
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
SCOPED_CACHE_LOCK(_mutex, this);
const auto& query_iter = _query_map.find(query_id);
if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
_query_map.erase(query_iter);
}
}
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
if (query_id.lo == 0 && query_id.hi == 0) {
return nullptr;
}
auto context = get_query_context(query_id, cache_lock);
if (context) {
return context;
}
auto query_context = std::make_shared<QueryFileCacheContext>(_max_query_cache_size);
auto query_iter = _query_map.emplace(query_id, query_context).first;
return query_iter->second;
}
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& cache_lock) {
auto pair = std::make_pair(hash, offset);
auto record = records.find(pair);
DCHECK(record != records.end());
auto iter = record->second;
records.erase(pair);
lru_queue.remove(iter, cache_lock);
}
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
size_t size,
std::lock_guard<std::mutex>& cache_lock) {
auto pair = std::make_pair(hash, offset);
if (records.find(pair) == records.end()) {
auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
records.insert({pair, queue_iter});
}
}
Status BlockFileCache::initialize() {
SCOPED_CACHE_LOCK(_mutex, this);
return initialize_unlocked(cache_lock);
}
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
DCHECK(!_is_initialized);
_is_initialized = true;
RETURN_IF_ERROR(_storage->init(this));
_cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
_cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this);
_cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
_cache_background_evict_in_advance_thread =
std::thread(&BlockFileCache::run_background_evict_in_advance, this);
return Status::OK();
}
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
std::lock_guard<std::mutex>& cache_lock) {
if (result) {
result->push_back(cell.file_block);
}
auto& queue = get_queue(cell.file_block->cache_type());
/// Move to the end of the queue. The iterator remains valid.
if (cell.queue_iterator && move_iter_flag) {
queue.move_to_end(*cell.queue_iterator, cache_lock);
}
cell.update_atime();
}
template <class T>
requires IsXLock<T>
BlockFileCache::FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
T& /* cache_lock */) {
auto it = _files.find(hash);
if (it == _files.end()) {
return nullptr;
}
auto& offsets = it->second;
auto cell_it = offsets.find(offset);
if (cell_it == offsets.end()) {
return nullptr;
}
return &cell_it->second;
}
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
}
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
const FileBlock::Range& range,
std::lock_guard<std::mutex>& cache_lock) {
/// Given range = [left, right] and non-overlapping ordered set of file blocks,
/// find list [block1, ..., blockN] of blocks which intersect with given range.
auto it = _files.find(hash);
if (it == _files.end()) {
if (_async_open_done) {
return {};
}
FileCacheKey key;
key.hash = hash;
key.meta.type = context.cache_type;
key.meta.expiration_time = context.expiration_time;
_storage->load_blocks_directly_unlocked(this, key, cache_lock);
it = _files.find(hash);
if (it == _files.end()) [[unlikely]] {
return {};
}
}
auto& file_blocks = it->second;
DCHECK(!file_blocks.empty());
if (file_blocks.empty()) {
LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
<< " cache type=" << context.cache_type
<< " cache expiration time=" << context.expiration_time
<< " cache range=" << range.left << " " << range.right
<< " query id=" << context.query_id;
_files.erase(hash);
return {};
}
// change to ttl if the blocks aren't ttl
if (context.cache_type == FileCacheType::TTL && _key_to_time.find(hash) == _key_to_time.end()) {
for (auto& [_, cell] : file_blocks) {
Status st = cell.file_block->update_expiration_time(context.expiration_time);
if (!st.ok()) {
LOG_WARNING("Failed to change key meta").error(st);
}
FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
auto& ttl_queue = get_queue(FileCacheType::TTL);
cell.queue_iterator =
ttl_queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
} else {
LOG_WARNING("Failed to change key meta").error(st);
}
}
_key_to_time[hash] = context.expiration_time;
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
if (auto iter = _key_to_time.find(hash);
// TODO(zhengyu): Why the hell the type is NORMAL while context set expiration_time?
(context.cache_type == FileCacheType::NORMAL || context.cache_type == FileCacheType::TTL) &&
iter != _key_to_time.end() && iter->second != context.expiration_time) {
// remove from _time_to_key
auto _time_to_key_iter = _time_to_key.equal_range(iter->second);
while (_time_to_key_iter.first != _time_to_key_iter.second) {
if (_time_to_key_iter.first->second == hash) {
_time_to_key_iter.first = _time_to_key.erase(_time_to_key_iter.first);
break;
}
_time_to_key_iter.first++;
}
for (auto& [_, cell] : file_blocks) {
Status st = cell.file_block->update_expiration_time(context.expiration_time);
if (!st.ok()) {
LOG_WARNING("Failed to change key meta").error(st);
}
}
if (context.expiration_time == 0) {
for (auto& [_, cell] : file_blocks) {
auto cache_type = cell.file_block->cache_type();
if (cache_type != FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (cell.queue_iterator) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
}
auto& queue = get_queue(FileCacheType::NORMAL);
cell.queue_iterator =
queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
} else {
LOG_WARNING("Failed to change key meta").error(st);
}
}
_key_to_time.erase(iter);
} else {
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
iter->second = context.expiration_time;
}
}
FileBlocks result;
auto block_it = file_blocks.lower_bound(range.left);
if (block_it == file_blocks.end()) {
/// N - last cached block for given file hash, block{N}.offset < range.left:
/// block{N} block{N}
/// [________ [_______]
/// [__________] OR [________]
/// ^ ^
/// range.left range.left
const auto& cell = file_blocks.rbegin()->second;
if (cell.file_block->range().right < range.left) {
return {};
}
use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
cache_lock);
} else { /// block_it <-- segmment{k}
if (block_it != file_blocks.begin()) {
const auto& prev_cell = std::prev(block_it)->second;
const auto& prev_cell_range = prev_cell.file_block->range();
if (range.left <= prev_cell_range.right) {
/// block{k-1} block{k}
/// [________] [_____
/// [___________
/// ^
/// range.left
use_cell(prev_cell, &result,
need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
cache_lock);
}
}
/// block{k} ... block{k-1} block{k} block{k}
/// [______ [______] [____ [________
/// [_________ OR [________ OR [______] ^
/// ^ ^ ^ block{k}.offset
/// range.left range.left range.right
while (block_it != file_blocks.end()) {
const auto& cell = block_it->second;
if (range.right < cell.file_block->range().left) {
break;
}
use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
cache_lock);
++block_it;
}
}
return result;
}
std::string BlockFileCache::clear_file_cache_async() {
LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
int64_t num_cells_all = 0;
int64_t num_cells_to_delete = 0;
int64_t num_cells_wait_recycle = 0;
int64_t num_files_all = 0;
TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
{
SCOPED_CACHE_LOCK(_mutex, this);
std::vector<FileBlockCell*> deleting_cells;
for (auto& [_, offset_to_cell] : _files) {
++num_files_all;
for (auto& [_, cell] : offset_to_cell) {
++num_cells_all;
deleting_cells.push_back(&cell);
}
}
// we cannot delete the element in the loop above, because it will break the iterator
for (auto& cell : deleting_cells) {
if (!cell->releasable()) {
LOG(INFO) << "cell is not releasable, hash="
<< " offset=" << cell->file_block->offset();
cell->file_block->set_deleting();
++num_cells_wait_recycle;
continue;
}
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, false);
++num_cells_to_delete;
}
}
}
std::stringstream ss;
ss << "finish clear_file_cache_async, path=" << _cache_base_path
<< " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
<< " num_cells_to_delete=" << num_cells_to_delete
<< " num_cells_wait_recycle=" << num_cells_wait_recycle;
auto msg = ss.str();
LOG(INFO) << msg;
return msg;
}
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
const CacheContext& context, size_t offset,
size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock) {
DCHECK(size > 0);
auto current_pos = offset;
auto end_pos_non_included = offset + size;
size_t current_size = 0;
size_t remaining_size = size;
FileBlocks file_blocks;
while (current_pos < end_pos_non_included) {
current_size = std::min(remaining_size, _max_file_block_size);
remaining_size -= current_size;
state = try_reserve(hash, context, current_pos, current_size, cache_lock)
? state
: FileBlock::State::SKIP_CACHE;
if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
FileCacheKey key;
key.hash = hash;
key.offset = current_pos;
key.meta.type = context.cache_type;
key.meta.expiration_time = context.expiration_time;
auto file_block = std::make_shared<FileBlock>(key, current_size, this,
FileBlock::State::SKIP_CACHE);
file_blocks.push_back(std::move(file_block));
} else {
auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
if (cell) {
file_blocks.push_back(cell->file_block);
if (!context.is_cold_data) {
cell->update_atime();
}
}
}
current_pos += current_size;
}
DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
return file_blocks;
}
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
const UInt128Wrapper& hash,
const CacheContext& context,
const FileBlock::Range& range,
std::lock_guard<std::mutex>& cache_lock) {
/// There are blocks [block1, ..., blockN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
/// intersect with given range.
/// It can have holes:
/// [____________________] -- requested range
/// [____] [_] [_________] -- intersecting cache [block1, ..., blockN]
///
/// For each such hole create a cell with file block state EMPTY.
auto it = file_blocks.begin();
auto block_range = (*it)->range();
size_t current_pos = 0;
if (block_range.left < range.left) {
/// [_______ -- requested range
/// [_______
/// ^
/// block1
current_pos = block_range.right + 1;
++it;
} else {
current_pos = range.left;
}
while (current_pos <= range.right && it != file_blocks.end()) {
block_range = (*it)->range();
if (current_pos == block_range.left) {
current_pos = block_range.right + 1;
++it;
continue;
}
DCHECK(current_pos < block_range.left);
auto hole_size = block_range.left - current_pos;
file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
FileBlock::State::EMPTY, cache_lock));
current_pos = block_range.right + 1;
++it;
}
if (current_pos <= range.right) {
/// ________] -- requested range
/// _____]
/// ^
/// blockN
auto hole_size = range.right - current_pos + 1;
file_blocks.splice(file_blocks.end(),
split_range_into_cells(hash, context, current_pos, hole_size,
FileBlock::State::EMPTY, cache_lock));
}
}
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
CacheContext& context) {
FileBlock::Range range(offset, offset + size - 1);
ReadStatistics* stats = context.stats;
DCHECK(stats != nullptr);
MonotonicStopWatch sw;
sw.start();
std::lock_guard cache_lock(_mutex);
stats->lock_wait_timer += sw.elapsed_time();
FileBlocks file_blocks;
int64_t duration = 0;
{
SCOPED_RAW_TIMER(&duration);
if (auto iter = _key_to_time.find(hash);
context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) {
context.cache_type = FileCacheType::TTL;
context.expiration_time = iter->second;
}
/// Get all blocks which intersect with the given range.
{
SCOPED_RAW_TIMER(&stats->get_timer);
file_blocks = get_impl(hash, context, range, cache_lock);
}
if (file_blocks.empty()) {
SCOPED_RAW_TIMER(&stats->set_timer);
file_blocks = split_range_into_cells(hash, context, offset, size,
FileBlock::State::EMPTY, cache_lock);
} else {
SCOPED_RAW_TIMER(&stats->set_timer);
fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
}
DCHECK(!file_blocks.empty());
*_num_read_blocks << file_blocks.size();
for (auto& block : file_blocks) {
if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
*_num_hit_blocks << 1;
}
}
}
*_get_or_set_latency_us << (duration / 1000);
return FileBlocksHolder(std::move(file_blocks));
}
BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash,
const CacheContext& context, size_t offset,
size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock) {
/// Create a file block cell and put it in `files` map by [hash][offset].
if (size == 0) {
return nullptr; /// Empty files are not cached.
}
DCHECK_EQ(_files[hash].count(offset), 0)
<< "Cache already exists for hash: " << hash.to_string() << ", offset: " << offset
<< ", size: " << size
<< ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
auto& offsets = _files[hash];
FileCacheKey key;
key.hash = hash;
key.offset = offset;
key.meta.type = context.cache_type;
key.meta.expiration_time = context.expiration_time;
FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
Status st;
if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::NORMAL);
} else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
}
if (!st.ok()) {
LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
<< " cache_type=" << cache_type_to_string(context.cache_type)
<< " error=" << st.msg();
}
auto& queue = get_queue(cell.file_block->cache_type());
cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
if (cell.file_block->cache_type() == FileCacheType::TTL) {
if (_key_to_time.find(hash) == _key_to_time.end()) {
_key_to_time[hash] = context.expiration_time;
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
_cur_ttl_size += cell.size();
}
auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
_cur_cache_size += size;
return &(it->second);
}
size_t BlockFileCache::try_release() {
SCOPED_CACHE_LOCK(_mutex, this);
std::vector<FileBlockCell*> trash;
for (auto& [hash, blocks] : _files) {
for (auto& [offset, cell] : blocks) {
if (cell.releasable()) {
trash.emplace_back(&cell);
} else {
cell.file_block->set_deleting();
}
}
}
size_t remove_size = 0;
for (auto& cell : trash) {
FileBlockSPtr file_block = cell->file_block;
std::lock_guard lc(cell->file_block->_mutex);
remove_size += file_block->range().size();
remove(file_block, cache_lock, lc);
}
*_evict_by_try_release << remove_size;
LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
return trash.size();
}
BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
switch (type) {
case FileCacheType::INDEX:
return _index_queue;
case FileCacheType::DISPOSABLE:
return _disposable_queue;
case FileCacheType::NORMAL:
return _normal_queue;
case FileCacheType::TTL:
return _ttl_queue;
default:
DCHECK(false);
}
return _normal_queue;
}
const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
switch (type) {
case FileCacheType::INDEX:
return _index_queue;
case FileCacheType::DISPOSABLE:
return _disposable_queue;
case FileCacheType::NORMAL:
return _normal_queue;
case FileCacheType::TTL:
return _ttl_queue;
default:
DCHECK(false);
}
return _normal_queue;
}
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock, bool sync) {
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, sync);
}
};
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}
void BlockFileCache::remove_file_blocks_and_clean_time_maps(
std::vector<FileBlockCell*>& to_evict, std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
auto hash = cell->file_block->get_hash_value();
remove(file_block, cache_lock, block_lock);
if (_files.find(hash) == _files.end()) {
if (auto iter = _key_to_time.find(hash);
_key_to_time.find(hash) != _key_to_time.end()) {
auto _time_to_key_iter = _time_to_key.equal_range(iter->second);
while (_time_to_key_iter.first != _time_to_key_iter.second) {
if (_time_to_key_iter.first->second == hash) {
_time_to_key_iter.first = _time_to_key.erase(_time_to_key_iter.first);
break;
}
_time_to_key_iter.first++;
}
_key_to_time.erase(hash);
}
}
}
};
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_and_clean_time_maps_if);
}
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
size_t& removed_size,
std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock,
size_t& cur_removed_size, bool evict_in_advance) {
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
<< ", offset: " << entry_offset;
size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);
if (cell->releasable()) {
auto& file_block = cell->file_block;
std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
cur_removed_size += cell_size;
}
}
}
// 1. if async load file cache not finish
// a. evict from lru queue
// 2. if ttl cache
// a. evict from disposable/normal/index queue one by one
// 3. if dont reach query limit or dont have query limit
// a. evict from other queue
// b. evict from current queue
// a.1 if the data belong write, then just evict cold data
// 4. if reach query limit
// a. evict from query queue
// b. evict from other queue
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
if (!_async_open_done) {
return try_reserve_during_async_load(size, cache_lock);
}
// use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
// directly eliminate 5 times the size of the space
if (_disk_resource_limit_mode) {
size = 5 * size;
}
auto query_context = config::enable_file_cache_query_limit &&
(context.query_id.hi != 0 || context.query_id.lo != 0)
? get_query_context(context.query_id, cache_lock)
: nullptr;
if (!query_context) {
return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
} else if (query_context->get_cache_size(cache_lock) + size <=
query_context->get_max_cache_size()) {
return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
}
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto& queue = get_queue(context.cache_type);
size_t removed_size = 0;
size_t ghost_remove_size = 0;
size_t queue_size = queue.get_capacity(cache_lock);
size_t cur_cache_size = _cur_cache_size;
size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
std::vector<BlockFileCache::LRUQueue::Iterator> ghost;
std::vector<FileBlockCell*> to_evict;
size_t max_size = queue.get_max_size();
auto is_overflow = [&] {
return _disk_resource_limit_mode ? removed_size < size
: cur_cache_size + size - removed_size > _capacity ||
(queue_size + size - removed_size > max_size) ||
(query_context_cache_size + size -
(removed_size + ghost_remove_size) >
query_context->get_max_cache_size());
};
/// Select the cache from the LRU queue held by query for expulsion.
for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
if (!is_overflow()) {
break;
}
auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
if (!cell) {
/// The cache corresponding to this record may be swapped out by
/// other queries, so it has become invalid.
ghost.push_back(iter);
ghost_remove_size += iter->size;
} else {
size_t cell_size = cell->size();
DCHECK(iter->size == cell_size);
if (cell->releasable()) {
auto& file_block = cell->file_block;
std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
}
}
}
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
};
for (auto& iter : ghost) {
query_context->remove(iter->hash, iter->offset, cache_lock);
}
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
if (is_overflow() &&
!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
return false;
}
query_context->reserve(hash, offset, size, cache_lock);
return true;
}
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
UInt128Wrapper hash = UInt128Wrapper();
size_t offset = 0;
CacheContext context;
/* we pick NORMAL and TTL cache to evict in advance
* we reserve for them but won't acutually give space to them
* on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
* other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
* in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
*/
context.cache_type = FileCacheType::NORMAL;
try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
context.cache_type = FileCacheType::TTL;
try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
}
bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool remove_directly,
std::lock_guard<std::mutex>& cache_lock, bool sync) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
if (auto iter = _key_to_time.find(file_key);
_key_to_time.find(file_key) != _key_to_time.end()) {
if (!remove_directly) {
for (auto& [_, cell] : _files[file_key]) {
if (cell.file_block->cache_type() != FileCacheType::TTL) {
continue;
}
Status st = cell.file_block->update_expiration_time(0);
if (!st.ok()) {
LOG_WARNING("Failed to update expiration time to 0").error(st);
}
if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue;
st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (cell.queue_iterator) {
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
}
auto& queue = get_queue(FileCacheType::NORMAL);
cell.queue_iterator =
queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
} else {
LOG_WARNING("Failed to change cache type to normal").error(st);
}
}
} else {
std::vector<FileBlockCell*> to_remove;
for (auto& [_, cell] : _files[file_key]) {
if (cell.releasable()) {
to_remove.push_back(&cell);
} else {
cell.file_block->set_deleting();
}
}
std::for_each(to_remove.begin(), to_remove.end(), [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, sync);
});
}
// remove from _time_to_key
// the param hash maybe be passed by _time_to_key, if removed it, cannot use it anymore
auto _time_to_key_iter = _time_to_key.equal_range(iter->second);
while (_time_to_key_iter.first != _time_to_key_iter.second) {
if (_time_to_key_iter.first->second == file_key) {
_time_to_key_iter.first = _time_to_key.erase(_time_to_key_iter.first);
break;
}
_time_to_key_iter.first++;
}
_key_to_time.erase(iter);
return true;
}
return false;
}
// remove specific cache synchronously, for critical operations
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
SCOPED_CACHE_LOCK(_mutex, this);
bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, true);
if (!is_ttl_file) {
auto iter = _files.find(file_key);
std::vector<FileBlockCell*> to_remove;
if (iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
if (cell.releasable()) {
to_remove.push_back(&cell);
} else {
cell.file_block->set_deleting();
}
}
}
remove_file_blocks(to_remove, cache_lock, true);
}
}
// the async version of remove_if_cached, for background operations
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
SCOPED_CACHE_LOCK(_mutex, this);
bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, /*sync*/ false);
if (!is_ttl_file) {
auto iter = _files.find(file_key);
std::vector<FileBlockCell*> to_remove;
if (iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
if (cell.releasable()) {
to_remove.push_back(&cell);
} else {
cell.file_block->set_deleting();
}
}
}
remove_file_blocks(to_remove, cache_lock, false);
}
}
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
FileCacheType cur_cache_type) {
switch (cur_cache_type) {
case FileCacheType::TTL:
return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
case FileCacheType::INDEX:
return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
case FileCacheType::NORMAL:
return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
case FileCacheType::DISPOSABLE:
return {FileCacheType::NORMAL, FileCacheType::INDEX};
default:
return {};
}
return {};
}
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
switch (cur_cache_type) {
case FileCacheType::TTL:
return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
case FileCacheType::INDEX:
return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
case FileCacheType::NORMAL:
return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
case FileCacheType::DISPOSABLE:
return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
default:
return {};
}
return {};
}
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
DCHECK(_files.find(hash) != _files.end() &&
_files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
FileBlockCell* cell = get_cell(hash, offset, cache_lock);
DCHECK(cell != nullptr);
if (cell->queue_iterator) {
auto& queue = get_queue(cell->file_block->cache_type());
DCHECK(queue.contains(hash, offset, cache_lock));
auto iter = queue.get(hash, offset, cache_lock);
iter->size = new_size;
queue.cache_size -= old_size;
queue.cache_size += new_size;
}
_cur_cache_size -= old_size;
_cur_cache_size += new_size;
}
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
for (FileCacheType cache_type : other_cache_types) {
auto& queue = get_queue(cache_type);
size_t remove_size_per_type = 0;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
<< ", offset: " << entry_offset;
size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);
if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
break;
}
if (cell->releasable()) {
auto& file_block = cell->file_block;
std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
remove_size_per_type += cell_size;
}
}
*(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
}
bool is_sync_removal = !evict_in_advance;
remove_file_blocks(to_evict, cache_lock, is_sync_removal);
return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
}
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
bool evict_in_advance) const {
bool ret = false;
if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
ret = (removed_size < need_size);
return ret;
}
if (_disk_resource_limit_mode) {
ret = (removed_size < need_size);
} else {
ret = (cur_cache_size + need_size - removed_size > _capacity);
}
return ret;
}
bool BlockFileCache::try_reserve_from_other_queue_by_size(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
// we follow the privilege defined in get_other_cache_types to evict
for (FileCacheType cache_type : other_cache_types) {
auto& queue = get_queue(cache_type);
// we will not drain each of them to the bottom -- i.e., we only
// evict what they have stolen.
size_t cur_queue_size = queue.get_capacity(cache_lock);
size_t cur_queue_max_size = queue.get_max_size();
if (cur_queue_size <= cur_queue_max_size) {
continue;
}
size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
cur_removed_size, evict_in_advance);
*(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
}
bool is_sync_removal = !evict_in_advance;
remove_file_blocks(to_evict, cache_lock, is_sync_removal);
return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
}
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
int64_t cur_time,
std::lock_guard<std::mutex>& cache_lock,
bool evict_in_advance) {
// currently, TTL cache is not considered as a candidate
auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
bool reserve_success = try_reserve_from_other_queue_by_time_interval(
cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
return reserve_success;
}
other_cache_types = get_other_cache_type(cur_cache_type);
auto& cur_queue = get_queue(cur_cache_type);
size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
size_t cur_queue_max_size = cur_queue.get_max_size();
// Hit the soft limit by self, cannot remove from other queues
if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
return false;
}
return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
evict_in_advance);
}
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
QueryFileCacheContextPtr query_context,
const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock,
bool evict_in_advance) {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
evict_in_advance)) {
auto& queue = get_queue(context.cache_type);
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
cur_removed_size, evict_in_advance);
bool is_sync_removal = !evict_in_advance;
remove_file_blocks(to_evict, cache_lock, is_sync_removal);
*(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
return false;
}
}
if (query_context) {
query_context->reserve(hash, offset, size, cache_lock);
}
return true;
}
template <class T, class U>
requires IsXLock<T> && IsXLock<U>
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
auto hash = file_block->get_hash_value();
auto offset = file_block->offset();
auto type = file_block->cache_type();
auto expiration_time = file_block->expiration_time();
auto* cell = get_cell(hash, offset, cache_lock);
DCHECK(cell);
DCHECK(cell->queue_iterator);
if (cell->queue_iterator) {
auto& queue = get_queue(file_block->cache_type());
queue.remove(*cell->queue_iterator, cache_lock);
}
*_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
<< file_block->range().size();
*_total_evict_size_metrics << file_block->range().size();
if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
FileCacheKey key;
key.hash = hash;
key.offset = offset;
key.meta.type = type;
key.meta.expiration_time = expiration_time;
if (sync) {
int64_t duration_ns = 0;
Status st;
{
SCOPED_RAW_TIMER(&duration_ns);
st = _storage->remove(key);
}
*_storage_sync_remove_latency_us << (duration_ns / 1000);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
} else {
// the file will be deleted in the bottom half
// so there will be a window that the file is not in the cache but still in the storage
// but it's ok, because the rowset is stale already
bool ret = _recycle_keys.enqueue(key);
if (ret) [[likely]] {
*_recycle_keys_length_recorder << _recycle_keys.size_approx();
} else {
LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
int64_t duration_ns = 0;
Status st;
{
SCOPED_RAW_TIMER(&duration_ns);
st = _storage->remove(key);
}
*_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
}
} else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
file_block->set_deleting();
return;
}
_cur_cache_size -= file_block->range().size();
if (FileCacheType::TTL == type) {
_cur_ttl_size -= file_block->range().size();
}
auto& offsets = _files[hash];
offsets.erase(file_block->offset());
if (offsets.empty()) {
_files.erase(hash);
}
*_num_removed_blocks << 1;
}
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
SCOPED_CACHE_LOCK(_mutex, this);
return get_used_cache_size_unlocked(cache_type, cache_lock);
}
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
std::lock_guard<std::mutex>& cache_lock) const {
return get_queue(cache_type).get_capacity(cache_lock);
}
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
SCOPED_CACHE_LOCK(_mutex, this);
return get_available_cache_size_unlocked(cache_type, cache_lock);
}
size_t BlockFileCache::get_available_cache_size_unlocked(
FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
return get_queue(cache_type).get_max_element_size() -
get_used_cache_size_unlocked(cache_type, cache_lock);
}
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
SCOPED_CACHE_LOCK(_mutex, this);
return get_file_blocks_num_unlocked(cache_type, cache_lock);
}
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
std::lock_guard<std::mutex>& cache_lock) const {
return get_queue(cache_type).get_elements_num(cache_lock);
}
BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock)
: file_block(file_block) {
/**
* Cell can be created with either DOWNLOADED or EMPTY file block's state.
* File block acquires DOWNLOADING state and creates LRUQueue iterator on first
* successful getOrSetDownaloder call.
*/
switch (file_block->_download_state) {
case FileBlock::State::DOWNLOADED:
case FileBlock::State::EMPTY:
case FileBlock::State::SKIP_CACHE: {
break;
}
default:
DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
<< FileBlock::state_to_string(file_block->_download_state);
}
if (file_block->cache_type() == FileCacheType::TTL) {
update_atime();
}
}
BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::add(
const UInt128Wrapper& hash, size_t offset, size_t size,
std::lock_guard<std::mutex>& /* cache_lock */) {
cache_size += size;
auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
map.insert(std::make_pair(std::make_pair(hash, offset), iter));
return iter;
}
template <class T>
requires IsXLock<T>
void BlockFileCache::LRUQueue::remove(Iterator queue_it, T& /* cache_lock */) {
cache_size -= queue_it->size;
map.erase(std::make_pair(queue_it->hash, queue_it->offset));
queue.erase(queue_it);
}
void BlockFileCache::LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
queue.clear();
map.clear();
cache_size = 0;
}
void BlockFileCache::LRUQueue::move_to_end(Iterator queue_it,
std::lock_guard<std::mutex>& /* cache_lock */) {
queue.splice(queue.end(), queue, queue_it);
}
bool BlockFileCache::LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock */) const {
return map.find(std::make_pair(hash, offset)) != map.end();
}
BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::get(
const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock */) const {
return map.find(std::make_pair(hash, offset))->second;
}
std::string BlockFileCache::LRUQueue::to_string(
std::lock_guard<std::mutex>& /* cache_lock */) const {
std::string result;
for (const auto& [hash, offset, size] : queue) {
if (!result.empty()) {
result += ", ";
}
result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
}
return result;
}
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
SCOPED_CACHE_LOCK(_mutex, this);
return dump_structure_unlocked(hash, cache_lock);
}
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
std::lock_guard<std::mutex>&) {
std::stringstream result;
const auto& cells_by_offset = _files[hash];
for (const auto& [_, cell] : cells_by_offset) {
result << cell.file_block->get_info_for_log() << " "
<< cache_type_to_string(cell.file_block->cache_type()) << "\n";
}
return result.str();
}
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
SCOPED_CACHE_LOCK(_mutex, this);
return dump_single_cache_type_unlocked(hash, offset, cache_lock);
}
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
size_t offset,
std::lock_guard<std::mutex>&) {
std::stringstream result;
const auto& cells_by_offset = _files[hash];
const auto& cell = cells_by_offset.find(offset);
return cache_type_to_string(cell->second.file_block->cache_type());
}
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
FileCacheType new_type,
std::lock_guard<std::mutex>& cache_lock) {
if (auto iter = _files.find(hash); iter != _files.end()) {
auto& file_blocks = iter->second;
if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
FileBlockCell& cell = cell_it->second;
auto& cur_queue = get_queue(cell.file_block->cache_type());
DCHECK(cell.queue_iterator.has_value());
cur_queue.remove(*cell.queue_iterator, cache_lock);
auto& new_queue = get_queue(new_type);
cell.queue_iterator =
new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
}
}
}
// @brief: get a path's disk capacity used percent, inode used percent
// @param: path
// @param: percent.first disk used percent, percent.second inode used percent
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
struct statfs stat;
int ret = statfs(path.c_str(), &stat);
if (ret != 0) {
return ret;
}
// https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
// v->used = stat.f_blocks - stat.f_bfree
// nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
int capacity_percentage = u100 / nonroot_total + (u100 % nonroot_total != 0);
unsigned long long inode_free = stat.f_ffree;
unsigned long long inode_total = stat.f_files;
int inode_percentage = int(inode_free * 1.0 / inode_total * 100);
percent->first = capacity_percentage;
percent->second = 100 - inode_percentage;
// Add sync point for testing
TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
return 0;
}
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
using namespace std::chrono;
int64_t space_released = 0;
size_t old_capacity = 0;
std::stringstream ss;
ss << "finish reset_capacity, path=" << _cache_base_path;
auto start_time = steady_clock::time_point();
{
SCOPED_CACHE_LOCK(_mutex, this);
if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
int64_t need_remove_size = _cur_cache_size - new_capacity;
auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
int64_t queue_released = 0;
std::vector<FileBlockCell*> to_evict;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (need_remove_size <= 0) {
break;
}
need_remove_size -= entry_size;
space_released += entry_size;
queue_released += entry_size;
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
if (!cell->releasable()) {
cell->file_block->set_deleting();
continue;
}
to_evict.push_back(cell);
}
for (auto& cell : to_evict) {
FileBlockSPtr file_block = cell->file_block;
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
return queue_released;
};
int64_t queue_released = remove_blocks(_disposable_queue);
ss << " disposable_queue released " << queue_released;
queue_released = remove_blocks(_normal_queue);
ss << " normal_queue released " << queue_released;
queue_released = remove_blocks(_index_queue);
ss << " index_queue released " << queue_released;
queue_released = remove_blocks(_ttl_queue);
ss << " ttl_queue released " << queue_released;
_disk_resource_limit_mode = true;
_disk_limit_mode_metrics->set_value(1);
ss << " total_space_released=" << space_released;
}
old_capacity = _capacity;
_capacity = new_capacity;
_cache_capacity_metrics->set_value(_capacity);
}
auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - start_time);
LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
<< " use_time=" << static_cast<int64_t>(use_time.count());
ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
LOG(INFO) << ss.str();
return ss.str();
}
void BlockFileCache::check_disk_resource_limit() {
if (_storage->get_type() != FileCacheStorageType::DISK) {
return;
}
if (_capacity > _cur_cache_size) {
_disk_resource_limit_mode = false;
_disk_limit_mode_metrics->set_value(0);
}
std::pair<int, int> percent;
int ret = disk_used_percentage(_cache_base_path, &percent);
if (ret != 0) {
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [space_percentage, inode_percentage] = percent;
auto is_insufficient = [](const int& percentage) {
return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
};
DCHECK_GE(space_percentage, 0);
DCHECK_LE(space_percentage, 100);
DCHECK_GE(inode_percentage, 0);
DCHECK_LE(inode_percentage, 100);
// ATTN: due to that can be changed dynamically, set it to default value if it's invalid
// FIXME: reject with config validator
if (config::file_cache_enter_disk_resource_limit_mode_percent <=
config::file_cache_exit_disk_resource_limit_mode_percent) {
LOG_WARNING("config error, set to default value")
.tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
.tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
config::file_cache_enter_disk_resource_limit_mode_percent = 88;
config::file_cache_exit_disk_resource_limit_mode_percent = 80;
}
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage)) {
_disk_resource_limit_mode = true;
_disk_limit_mode_metrics->set_value(1);
} else if (_disk_resource_limit_mode &&
(space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
(inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
_disk_resource_limit_mode = false;
_disk_limit_mode_metrics->set_value(0);
}
if (_disk_resource_limit_mode) {
LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
<< " inode_percent=" << inode_percentage
<< " is_space_insufficient=" << is_insufficient(space_percentage)
<< " is_inode_insufficient=" << is_insufficient(inode_percentage)
<< " mode run in resource limit";
}
}
void BlockFileCache::check_need_evict_cache_in_advance() {
if (_storage->get_type() != FileCacheStorageType::DISK) {
return;
}
std::pair<int, int> percent;
int ret = disk_used_percentage(_cache_base_path, &percent);
if (ret != 0) {
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [space_percentage, inode_percentage] = percent;
size_t size_percentage = static_cast<size_t>(
(static_cast<double>(_cur_cache_size) / static_cast<double>(_capacity)) * 100);
auto is_insufficient = [](const int& percentage) {
return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
};
DCHECK_GE(space_percentage, 0);
DCHECK_LE(space_percentage, 100);
DCHECK_GE(inode_percentage, 0);
DCHECK_LE(inode_percentage, 100);
// ATTN: due to that can be changed dynamically, set it to default value if it's invalid
// FIXME: reject with config validator
if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
config::file_cache_exit_need_evict_cache_in_advance_percent) {
LOG_WARNING("config error, set to default value")
.tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
.tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
}
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage) ||
is_insufficient(size_percentage)) {
_need_evict_cache_in_advance = true;
_need_evict_cache_in_advance_metrics->set_value(1);
} else if (_need_evict_cache_in_advance &&
(space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
(inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
(size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
_need_evict_cache_in_advance = false;
_need_evict_cache_in_advance_metrics->set_value(0);
}
if (_need_evict_cache_in_advance) {
LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
<< " inode_percent=" << inode_percentage << " size_percent=" << size_percentage
<< " is_space_insufficient=" << is_insufficient(space_percentage)
<< " is_inode_insufficient=" << is_insufficient(inode_percentage)
<< " is_size_insufficient=" << is_insufficient(size_percentage)
<< " need evict cache in advance";
}
}
void BlockFileCache::run_background_monitor() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
check_disk_resource_limit();
if (config::enable_evict_file_cache_in_advance) {
check_need_evict_cache_in_advance();
} else {
_need_evict_cache_in_advance = false;
}
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
if (_close) {
break;
}
}
// report
{
SCOPED_CACHE_LOCK(_mutex, this);
_cur_cache_size_metrics->set_value(_cur_cache_size);
_cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
_index_queue.get_capacity(cache_lock) -
_normal_queue.get_capacity(cache_lock) -
_disposable_queue.get_capacity(cache_lock));
_cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
_ttl_queue.get_capacity(cache_lock));
_cur_ttl_cache_lru_queue_element_count_metrics->set_value(
_ttl_queue.get_elements_num(cache_lock));
_cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
_cur_normal_queue_element_count_metrics->set_value(
_normal_queue.get_elements_num(cache_lock));
_cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
_cur_index_queue_element_count_metrics->set_value(
_index_queue.get_elements_num(cache_lock));
_cur_disposable_queue_cache_size_metrics->set_value(
_disposable_queue.get_capacity(cache_lock));
_cur_disposable_queue_element_count_metrics->set_value(
_disposable_queue.get_elements_num(cache_lock));
if (_num_read_blocks->get_value() > 0) {
_hit_ratio->set_value((double)_num_hit_blocks->get_value() /
_num_read_blocks->get_value());
}
if (_num_read_blocks_5m->get_value() > 0) {
_hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
_num_read_blocks_5m->get_value());
}
if (_num_read_blocks_1h->get_value() > 0) {
_hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
_num_read_blocks_1h->get_value());
}
}
}
}
void BlockFileCache::run_background_ttl_gc() { // TODO(zhengyu): fix!
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
if (_close) {
break;
}
}
{
int64_t cur_time = UnixSeconds();
SCOPED_CACHE_LOCK(_mutex, this);
while (!_time_to_key.empty()) {
auto begin = _time_to_key.begin();
if (cur_time < begin->first) {
break;
}
remove_if_ttl_file_blocks(begin->second, false, cache_lock, false);
}
}
}
}
void BlockFileCache::run_background_gc() {
FileCacheKey key;
static const size_t interval_ms = 100;
const size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
size_t batch_count = 0;
while (!_close) {
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
if (_close) {
break;
}
}
while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
int64_t duration_ns = 0;
Status st;
{
SCOPED_RAW_TIMER(&duration_ns);
st = _storage->remove(key);
}
*_storage_async_remove_latency_us << (duration_ns / 1000);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
batch_count++;
}
*_recycle_keys_length_recorder << _recycle_keys.size_approx();
batch_count = 0;
}
}
void BlockFileCache::run_background_evict_in_advance() {
LOG(INFO) << "Starting background evict in advance thread";
int64_t batch = 0;
while (!_close) {
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(
close_lock,
std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
if (_close) {
LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
break;
}
}
batch = config::file_cache_evict_in_advance_batch_bytes;
// Skip if eviction not needed or too many pending recycles
if (!_need_evict_cache_in_advance || _recycle_keys.size_approx() >= (batch * 10)) {
continue;
}
int64_t duration_ns = 0;
{
SCOPED_CACHE_LOCK(_mutex, this);
SCOPED_RAW_TIMER(&duration_ns);
try_evict_in_advance(batch, cache_lock);
}
*_evict_in_advance_latency_us << (duration_ns / 1000);
}
}
void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
SCOPED_CACHE_LOCK(_mutex, this);
// 1. If new_expiration_time is equal to zero
if (new_expiration_time == 0) {
remove_if_ttl_file_blocks(hash, false, cache_lock, false);
return;
}
// 2. If the hash in ttl cache, modify its expiration time.
if (auto iter = _key_to_time.find(hash); iter != _key_to_time.end()) {
// remove from _time_to_key
auto _time_to_key_iter = _time_to_key.equal_range(iter->second);
while (_time_to_key_iter.first != _time_to_key_iter.second) {
if (_time_to_key_iter.first->second == hash) {
_time_to_key_iter.first = _time_to_key.erase(_time_to_key_iter.first);
break;
}
_time_to_key_iter.first++;
}
_time_to_key.insert(std::make_pair(new_expiration_time, hash));
iter->second = new_expiration_time;
for (auto& [_, cell] : _files[hash]) {
Status st = cell.file_block->update_expiration_time(new_expiration_time);
if (!st.ok()) {
LOG_WARNING("Failed to modify expiration time").error(st);
}
}
return;
}
// 3. change to ttl if the blocks aren't ttl
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
Status st = cell.file_block->update_expiration_time(new_expiration_time);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
auto& ttl_queue = get_queue(FileCacheType::TTL);
cell.queue_iterator = ttl_queue.add(hash, cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
}
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
_key_to_time[hash] = new_expiration_time;
_time_to_key.insert(std::make_pair(new_expiration_time, hash));
}
}
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
SCOPED_CACHE_LOCK(_mutex, this);
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& pair : _files.find(hash)->second) {
const FileBlockCell* cell = &pair.second;
if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
if (cell->file_block->cache_type() == FileCacheType::TTL ||
(cell->atime != 0 &&
cur_time - cell->atime <
get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
blocks_meta.emplace_back(pair.first, cell->size(),
cell->file_block->cache_type(),
cell->file_block->expiration_time());
}
}
}
}
return blocks_meta;
}
bool BlockFileCache::try_reserve_during_async_load(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
size_t index_queue_size = _index_queue.get_capacity(cache_lock);
std::vector<FileBlockCell*> to_evict;
auto collect_eliminate_fragments = [&](LRUQueue& queue) {
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!_disk_resource_limit_mode || removed_size >= size) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
<< ", offset: " << entry_offset;
size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);
if (cell->releasable()) {
auto& file_block = cell->file_block;
std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
}
}
};
if (disposable_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
}
if (normal_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
}
if (index_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
}
remove_file_blocks(to_evict, cache_lock, true);
return !_disk_resource_limit_mode || removed_size >= size;
}
std::string BlockFileCache::clear_file_cache_directly() {
using namespace std::chrono;
std::stringstream ss;
auto start = steady_clock::now();
SCOPED_CACHE_LOCK(_mutex, this);
LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
std::string clear_msg;
auto s = _storage->clear(clear_msg);
if (!s.ok()) {
return clear_msg;
}
int64_t num_files = _files.size();
int64_t cache_size = _cur_cache_size;
int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
_files.clear();
_cur_cache_size = 0;
_cur_ttl_size = 0;
_time_to_key.clear();
_key_to_time.clear();
_index_queue.clear(cache_lock);
_normal_queue.clear(cache_lock);
_disposable_queue.clear(cache_lock);
_ttl_queue.clear(cache_lock);
ss << "finish clear_file_cache_directly"
<< " path=" << _cache_base_path
<< " time_elapsed=" << duration_cast<milliseconds>(steady_clock::now() - start).count()
<< " num_files=" << num_files << " cache_size=" << cache_size
<< " index_queue_size=" << index_queue_size << " normal_queue_size=" << normal_queue_size
<< " disposible_queue_size=" << disposible_queue_size << "ttl_queue_size=" << ttl_queue_size;
auto msg = ss.str();
LOG(INFO) << msg;
return msg;
}
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
std::map<size_t, FileBlockSPtr> offset_to_block;
SCOPED_CACHE_LOCK(_mutex, this);
if (_files.contains(hash)) {
for (auto& [offset, cell] : _files[hash]) {
if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
offset_to_block.emplace(offset, cell.file_block);
use_cell(cell, nullptr,
need_to_move(cell.file_block->cache_type(), FileCacheType::DISPOSABLE),
cache_lock);
}
}
}
return offset_to_block;
}
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
SCOPED_CACHE_LOCK(_mutex, this);
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
cell.update_atime();
}
};
}
std::map<std::string, double> BlockFileCache::get_stats() {
std::map<std::string, double> stats;
stats["hits_ratio"] = (double)_hit_ratio->get_value();
stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
stats["index_queue_curr_size"] = (double)_cur_index_queue_element_count_metrics->get_value();
stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
stats["index_queue_curr_elements"] =
(double)_cur_index_queue_element_count_metrics->get_value();
stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
stats["ttl_queue_curr_elements"] =
(double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
stats["normal_queue_curr_size"] = (double)_cur_normal_queue_element_count_metrics->get_value();
stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
stats["normal_queue_curr_elements"] =
(double)_cur_normal_queue_element_count_metrics->get_value();
stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
stats["disposable_queue_curr_size"] =
(double)_cur_disposable_queue_element_count_metrics->get_value();
stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
stats["disposable_queue_curr_elements"] =
(double)_cur_disposable_queue_element_count_metrics->get_value();
return stats;
}
// for be UTs
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
std::map<std::string, double> stats;
stats["hits_ratio"] = (double)_hit_ratio->get_value();
stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
return stats;
}
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock, bool sync);
} // namespace doris::io