blob: afbbe69afaacbe98c601f2db932445a80fb03626 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.cpp
// and modified by Doris
#include "io/cache/block_file_cache_factory.h"
#include <glog/logging.h>
#include <string>
#include <vector>
#if defined(__APPLE__)
#include <sys/mount.h>
#else
#include <sys/statfs.h>
#endif
#include <algorithm>
#include <execution>
#include <ostream>
#include <utility>
#include "common/config.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/slice.h"
#include "vec/core/block.h"
namespace doris {
class TUniqueId;
namespace io {
FileCacheFactory* FileCacheFactory::instance() {
return ExecEnv::GetInstance()->file_cache_factory();
}
size_t FileCacheFactory::try_release() {
int elements = 0;
for (auto& cache : _caches) {
elements += cache->try_release();
}
return elements;
}
size_t FileCacheFactory::try_release(const std::string& base_path) {
auto iter = _path_to_cache.find(base_path);
if (iter != _path_to_cache.end()) {
return iter->second->try_release();
}
return 0;
}
Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
FileCacheSettings file_cache_settings) {
if (file_cache_settings.storage == "memory") {
if (cache_base_path != "memory") {
LOG(WARNING) << "memory storage must use memory path";
return Status::InvalidArgument("memory storage must use memory path");
}
} else {
const auto& fs = global_local_filesystem();
bool exists = false;
RETURN_IF_ERROR(fs->exists(cache_base_path, &exists));
if (!exists) {
auto st = fs->create_directory(cache_base_path);
LOG(INFO) << "path " << cache_base_path << " does not exist, create " << st.msg();
RETURN_IF_ERROR(st);
} else if (config::clear_file_cache) {
RETURN_IF_ERROR(fs->delete_directory(cache_base_path));
RETURN_IF_ERROR(fs->create_directory(cache_base_path));
}
struct statfs stat;
if (statfs(cache_base_path.c_str(), &stat) < 0) {
LOG_ERROR("").tag("file cache path", cache_base_path).tag("error", strerror(errno));
return Status::IOError("{} statfs error {}", cache_base_path, strerror(errno));
}
size_t disk_capacity = static_cast<size_t>(static_cast<size_t>(stat.f_blocks) *
static_cast<size_t>(stat.f_bsize));
if (file_cache_settings.capacity == 0 || disk_capacity < file_cache_settings.capacity) {
LOG_INFO(
"The cache {} config size {} is larger than disk size {} or zero, recalc "
"it.",
cache_base_path, file_cache_settings.capacity, disk_capacity);
file_cache_settings = get_file_cache_settings(disk_capacity,
file_cache_settings.max_query_cache_size);
}
LOG(INFO) << "[FileCache] path: " << cache_base_path
<< " total_size: " << file_cache_settings.capacity
<< " disk_total_size: " << disk_capacity;
}
auto cache = std::make_unique<BlockFileCache>(cache_base_path, file_cache_settings);
RETURN_IF_ERROR(cache->initialize());
{
std::lock_guard lock(_mtx);
_path_to_cache[cache_base_path] = cache.get();
_caches.push_back(std::move(cache));
_capacity += file_cache_settings.capacity;
}
return Status::OK();
}
Status FileCacheFactory::reload_file_cache(const std::vector<CachePath>& cache_base_paths) {
{
std::unique_lock lock(_mtx);
for (const auto& cache_path : cache_base_paths) {
if (_path_to_cache.find(cache_path.path) == _path_to_cache.end()) {
return Status::InternalError(
"Current file cache not support file cache num changes");
}
}
for (const auto& cache_path : cache_base_paths) {
auto cache_map_iter = _path_to_cache.find(cache_path.path);
auto cache_iter = std::find_if(_caches.begin(), _caches.end(),
[cache_map_iter](const auto& cache_uptr) {
return cache_uptr.get() == cache_map_iter->second;
});
if (cache_iter == _caches.end()) {
return Status::InternalError("Target relaod cache in path {} may has been released",
cache_path.path);
}
// deconstruct target reload first
*cache_iter = std::unique_ptr<BlockFileCache>();
// after deconstruct the BlockFileCache, construct the BlockFileCache again
*cache_iter =
std::make_unique<BlockFileCache>(cache_path.path, cache_path.init_settings());
cache_map_iter->second = cache_iter->get();
RETURN_IF_ERROR(cache_iter->get()->initialize());
}
}
return Status::OK();
}
std::vector<doris::CacheBlockPB> FileCacheFactory::get_cache_data_by_path(const std::string& path) {
auto cache_hash = BlockFileCache::hash(path);
return get_cache_data_by_path(cache_hash);
}
std::vector<doris::CacheBlockPB> FileCacheFactory::get_cache_data_by_path(
const UInt128Wrapper& hash) {
std::vector<doris::CacheBlockPB> ret;
BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(hash);
if (cache == nullptr) {
return ret;
}
auto blocks = cache->get_blocks_by_key(hash);
for (auto& [offset, fb] : blocks) {
doris::CacheBlockPB cb;
cb.set_block_offset(static_cast<int64_t>(offset));
cb.set_block_size(static_cast<int64_t>(fb->range().size()));
// try to read data into bytes
std::string data;
data.resize(fb->range().size());
Slice slice(data.data(), data.size());
// read from beginning of this block
Status st = fb->read(slice, /*read_offset=*/0);
if (st.ok()) {
cb.set_data(data);
} else {
// On read failure, skip setting data but still report meta
VLOG_DEBUG << "read cache block failed: " << st;
}
ret.emplace_back(std::move(cb));
}
return ret;
}
std::vector<std::string> FileCacheFactory::get_cache_file_by_path(const UInt128Wrapper& hash) {
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
auto blocks = cache->get_blocks_by_key(hash);
std::vector<std::string> ret;
if (blocks.empty()) {
return ret;
} else {
for (auto& [_, fb] : blocks) {
ret.emplace_back(fb->get_cache_file());
}
}
return ret;
}
int64_t FileCacheFactory::get_cache_file_size_by_path(const UInt128Wrapper& hash) {
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
auto blocks = cache->get_blocks_by_key(hash);
if (blocks.empty()) {
return 0;
}
int64_t cache_size = 0;
for (auto& [_, fb] : blocks) {
cache_size += fb->range().size();
}
return cache_size;
}
BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
// dont need lock mutex because _caches is immutable after create_file_cache
return _caches[KeyHash()(key) % _caches.size()].get();
}
BlockFileCache* FileCacheFactory::get_by_path(const std::string& cache_base_path) {
auto iter = _path_to_cache.find(cache_base_path);
if (iter == _path_to_cache.end()) {
return nullptr;
} else {
return iter->second;
}
}
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
FileCacheFactory::get_query_context_holders(const TUniqueId& query_id) {
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> holders;
for (const auto& cache : _caches) {
holders.push_back(cache->get_query_context_holder(query_id));
}
return holders;
}
std::string FileCacheFactory::clear_file_caches(bool sync) {
std::vector<std::string> results(_caches.size());
#ifndef USE_LIBCPP
std::for_each(std::execution::par, _caches.begin(), _caches.end(), [&](const auto& cache) {
size_t index = &cache - &_caches[0];
results[index] =
sync ? cache->clear_file_cache_directly() : cache->clear_file_cache_async();
});
#else
// libcpp do not support std::execution::par
std::for_each(_caches.begin(), _caches.end(), [&](const auto& cache) {
size_t index = &cache - &_caches[0];
results[index] =
sync ? cache->clear_file_cache_directly() : cache->clear_file_cache_async();
});
#endif
std::stringstream ss;
for (const auto& result : results) {
ss << result << "\n";
}
return ss.str();
}
void FileCacheFactory::dump_all_caches() {
for (const auto& cache : _caches) {
cache->dump_lru_queues(true);
}
}
std::vector<std::string> FileCacheFactory::get_base_paths() {
std::vector<std::string> paths;
for (const auto& pair : _path_to_cache) {
paths.push_back(pair.first);
}
return paths;
}
std::string validate_capacity(const std::string& path, int64_t new_capacity,
int64_t& valid_capacity) {
struct statfs stat;
if (statfs(path.c_str(), &stat) < 0) {
auto ret = fmt::format("reset capacity {} statfs error {}. ", path, strerror(errno));
LOG_ERROR(ret);
valid_capacity = 0; // caller will handle the error
return ret;
}
size_t disk_capacity = static_cast<size_t>(static_cast<size_t>(stat.f_blocks) *
static_cast<size_t>(stat.f_bsize));
if (new_capacity == 0 || disk_capacity < new_capacity) {
auto ret = fmt::format(
"The cache {} config size {} is larger than disk size {} or zero, recalc "
"it to disk size. ",
path, new_capacity, disk_capacity);
valid_capacity = disk_capacity;
LOG_WARNING(ret);
return ret;
}
valid_capacity = new_capacity;
return "";
}
std::string FileCacheFactory::reset_capacity(const std::string& path, int64_t new_capacity) {
std::stringstream ss;
size_t total_capacity = 0;
if (path.empty()) {
for (auto& [p, cache] : _path_to_cache) {
int64_t valid_capacity = 0;
ss << validate_capacity(p, new_capacity, valid_capacity);
if (valid_capacity <= 0) {
return ss.str();
}
ss << cache->reset_capacity(valid_capacity);
total_capacity += cache->capacity();
}
_capacity = total_capacity;
return ss.str();
} else {
if (auto iter = _path_to_cache.find(path); iter != _path_to_cache.end()) {
int64_t valid_capacity = 0;
ss << validate_capacity(path, new_capacity, valid_capacity);
if (valid_capacity <= 0) {
return ss.str();
}
ss << iter->second->reset_capacity(valid_capacity);
for (auto& [p, cache] : _path_to_cache) {
total_capacity += cache->capacity();
}
_capacity = total_capacity;
return ss.str();
}
}
return "Unknown the cache path " + path;
}
void FileCacheFactory::get_cache_stats_block(vectorized::Block* block) {
// std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
TBackend be = BackendOptions::get_local_backend();
int64_t be_id = be.id;
std::string be_ip = be.host;
for (auto& cache : _caches) {
std::map<std::string, double> stats = cache->get_stats();
for (auto& [k, v] : stats) {
SchemaScannerHelper::insert_int64_value(0, be_id, block); // be id
SchemaScannerHelper::insert_string_value(1, be_ip, block); // be ip
SchemaScannerHelper::insert_string_value(2, cache->get_base_path(),
block); // cache path
SchemaScannerHelper::insert_string_value(3, k, block); // metric name
SchemaScannerHelper::insert_string_value(4, std::to_string(v), block); // metric value
}
}
}
} // namespace io
} // namespace doris