[chore](file-cache) Enable file cache for cloud mode by force (#41357)
## Proposed changes
Temp local rowset writer for external sorting replies on file cache.
diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp
index 7753bf7..5f878f5 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -17,6 +17,7 @@
#include "cloud/cloud_rowset_writer.h"
+#include "common/status.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "olap/rowset/rowset_factory.h"
@@ -34,7 +35,7 @@
if (_context.is_local_rowset()) {
// In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
// we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
- _context.tablet_path = io::FileCacheFactory::instance()->get_cache_path();
+ _context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
} else {
_rowset_meta->set_remote_storage_resource(*_context.storage_resource);
}
diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h
index 6365fab..d7b7108 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -21,7 +21,9 @@
#pragma once
#include <memory>
+#include <optional>
#include <string>
+#include <string_view>
#include <vector>
#include "common/status.h"
@@ -46,7 +48,8 @@
size_t try_release(const std::string& base_path);
- const std::string& get_cache_path() {
+ std::string_view pick_one_cache_path() {
+ DCHECK(!_caches.empty());
size_t cur_index = _next_index.fetch_add(1);
return _caches[cur_index % _caches.size()]->get_base_path();
}
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 758a2f3..adb6b7f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -270,7 +270,6 @@
init_file_cache_factory(cache_paths);
doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths,
cache_paths);
-
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
@@ -392,51 +391,58 @@
void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) {
// Load file cache before starting up daemon threads to make sure StorageEngine is read.
- if (doris::config::enable_file_cache) {
- if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
- config::s3_write_buffer_size % config::file_cache_each_block_size != 0) {
- LOG_FATAL(
- "The config file_cache_each_block_size {} must less than or equal to config "
- "s3_write_buffer_size {} and config::s3_write_buffer_size % "
- "config::file_cache_each_block_size must be zero",
- config::file_cache_each_block_size, config::s3_write_buffer_size);
+ if (!config::enable_file_cache) {
+ if (config::is_cloud_mode()) {
+ LOG(FATAL) << "Cloud mode requires to enable file cache, plz set "
+ "config::enable_file_cache "
+ "= true";
exit(-1);
}
- std::unordered_set<std::string> cache_path_set;
- Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
- if (!rest) {
- LOG(FATAL) << "parse config file cache path failed, path="
- << doris::config::file_cache_path;
+ return;
+ }
+ if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
+ config::s3_write_buffer_size % config::file_cache_each_block_size != 0) {
+ LOG_FATAL(
+ "The config file_cache_each_block_size {} must less than or equal to config "
+ "s3_write_buffer_size {} and config::s3_write_buffer_size % "
+ "config::file_cache_each_block_size must be zero",
+ config::file_cache_each_block_size, config::s3_write_buffer_size);
+ exit(-1);
+ }
+ std::unordered_set<std::string> cache_path_set;
+ Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
+ if (!rest) {
+ LOG(FATAL) << "parse config file cache path failed, path="
+ << doris::config::file_cache_path;
+ exit(-1);
+ }
+ std::vector<std::thread> file_cache_init_threads;
+
+ std::list<doris::Status> cache_status;
+ for (auto& cache_path : cache_paths) {
+ if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
+ LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
+ continue;
+ }
+
+ file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() {
+ *status = doris::io::FileCacheFactory::instance()->create_file_cache(
+ cache_path.path, cache_path.init_settings());
+ });
+
+ cache_path_set.emplace(cache_path.path);
+ }
+
+ for (std::thread& thread : file_cache_init_threads) {
+ if (thread.joinable()) {
+ thread.join();
+ }
+ }
+ for (const auto& status : cache_status) {
+ if (!status.ok()) {
+ LOG(FATAL) << "failed to init file cache, err: " << status;
exit(-1);
}
- std::vector<std::thread> file_cache_init_threads;
-
- std::list<doris::Status> cache_status;
- for (auto& cache_path : cache_paths) {
- if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
- LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
- continue;
- }
-
- file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() {
- *status = doris::io::FileCacheFactory::instance()->create_file_cache(
- cache_path.path, cache_path.init_settings());
- });
-
- cache_path_set.emplace(cache_path.path);
- }
-
- for (std::thread& thread : file_cache_init_threads) {
- if (thread.joinable()) {
- thread.join();
- }
- }
- for (const auto& status : cache_status) {
- if (!status.ok()) {
- LOG(FATAL) << "failed to init file cache, err: " << status;
- exit(-1);
- }
- }
}
}