| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/options.h" |
| |
| #include <absl/strings/ascii.h> |
| #include <absl/strings/str_split.h> |
| #include <ctype.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/encodings.h> |
| #include <rapidjson/rapidjson.h> |
| #include <stdlib.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <ostream> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "io/cache/file_cache_common.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/olap_define.h" |
| #include "olap/utils.h" |
| #include "util/path_util.h" |
| #include "util/string_util.h" |
| |
| namespace doris { |
| using namespace ErrorCode; |
| |
| using std::string; |
| using std::vector; |
| |
| static std::string CAPACITY_UC = "CAPACITY"; |
| static std::string MEDIUM_UC = "MEDIUM"; |
| static std::string SSD_UC = "SSD"; |
| static std::string HDD_UC = "HDD"; |
| static std::string REMOTE_CACHE_UC = "REMOTE_CACHE"; |
| |
| static std::string CACHE_PATH = "path"; |
| static std::string CACHE_TOTAL_SIZE = "total_size"; |
| static std::string CACHE_QUERY_LIMIT_SIZE = "query_limit"; |
| static std::string CACHE_NORMAL_PERCENT = "normal_percent"; |
| static std::string CACHE_DISPOSABLE_PERCENT = "disposable_percent"; |
| static std::string CACHE_INDEX_PERCENT = "index_percent"; |
| static std::string CACHE_TTL_PERCENT = "ttl_percent"; |
| static std::string CACHE_STORAGE = "storage"; |
| static std::string CACHE_STORAGE_DISK = "disk"; |
| static std::string CACHE_STORAGE_MEMORY = "memory"; |
| |
| // TODO: should be a general util method |
| // static std::string to_upper(const std::string& str) { |
| // std::string out = str; |
| // std::transform(out.begin(), out.end(), out.begin(), [](auto c) { return std::toupper(c); }); |
| // return out; |
| // } |
| |
| // Currently, both of three following formats are supported(see be.conf), remote cache is the |
| // local cache path for remote storage. |
| // format 1: /home/disk1/palo.HDD,50 |
| // format 2: /home/disk1/palo,medium:ssd,capacity:50 |
| // remote cache format: /home/disk/palo/cache,medium:remote_cache,capacity:50 |
| Status parse_root_path(const string& root_path, StorePath* path) { |
| std::vector<string> tmp_vec = absl::StrSplit(root_path, ",", absl::SkipWhitespace()); |
| |
| // parse root path name |
| absl::StripAsciiWhitespace(&tmp_vec[0]); |
| tmp_vec[0].erase(tmp_vec[0].find_last_not_of('/') + 1); |
| if (tmp_vec[0].empty() || tmp_vec[0][0] != '/') { |
| return Status::Error<INVALID_ARGUMENT>("invalid store path. path={}", tmp_vec[0]); |
| } |
| |
| string canonicalized_path; |
| RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(tmp_vec[0], &canonicalized_path)); |
| path->path = tmp_vec[0]; |
| |
| // parse root path capacity and storage medium |
| string capacity_str; |
| string medium_str = HDD_UC; |
| |
| string extension = path_util::file_extension(canonicalized_path); |
| if (!extension.empty()) { |
| medium_str = to_upper(extension.substr(1)); |
| } |
| |
| for (int i = 1; i < tmp_vec.size(); i++) { |
| // <property>:<value> or <value> |
| string property; |
| string value; |
| std::pair<string, string> pair = absl::StrSplit(tmp_vec[i], absl::MaxSplits(":", 1)); |
| if (pair.second.empty()) { |
| // format_1: <value> only supports setting capacity |
| property = CAPACITY_UC; |
| value = tmp_vec[i]; |
| } else { |
| // format_2 |
| property = to_upper(pair.first); |
| value = pair.second; |
| } |
| |
| absl::StripAsciiWhitespace(&property); |
| absl::StripAsciiWhitespace(&value); |
| if (property == CAPACITY_UC) { |
| capacity_str = value; |
| } else if (property == MEDIUM_UC) { |
| // property 'medium' has a higher priority than the extension of |
| // path, so it can override medium_str |
| medium_str = to_upper(value); |
| } else { |
| return Status::Error<INVALID_ARGUMENT>("invalid property of store path, {}", |
| tmp_vec[i]); |
| } |
| } |
| |
| path->capacity_bytes = -1; |
| if (!capacity_str.empty()) { |
| if (!valid_signed_number<int64_t>(capacity_str) || |
| strtol(capacity_str.c_str(), nullptr, 10) < 0) { |
| LOG(WARNING) << "invalid capacity of store path, capacity=" << capacity_str; |
| return Status::Error<INVALID_ARGUMENT>("invalid capacity of store path, capacity={}", |
| capacity_str); |
| } |
| path->capacity_bytes = strtol(capacity_str.c_str(), nullptr, 10) * GB_EXCHANGE_BYTE; |
| } |
| |
| path->storage_medium = TStorageMedium::HDD; |
| if (!medium_str.empty()) { |
| if (medium_str == SSD_UC) { |
| path->storage_medium = TStorageMedium::SSD; |
| } else if (medium_str == HDD_UC) { |
| path->storage_medium = TStorageMedium::HDD; |
| } else if (medium_str == REMOTE_CACHE_UC) { |
| path->storage_medium = TStorageMedium::REMOTE_CACHE; |
| } else { |
| return Status::Error<INVALID_ARGUMENT>("invalid storage medium. medium={}", medium_str); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status parse_conf_store_paths(const string& config_path, std::vector<StorePath>* paths) { |
| std::vector<string> path_vec = absl::StrSplit(config_path, ";", absl::SkipWhitespace()); |
| if (path_vec.empty()) { |
| // means compute node |
| return Status::OK(); |
| } |
| if (path_vec.back().empty()) { |
| // deal with the case that user add `;` to the tail |
| path_vec.pop_back(); |
| } |
| |
| std::set<std::string> real_paths; |
| for (auto& item : path_vec) { |
| StorePath path; |
| auto res = parse_root_path(item, &path); |
| if (res.ok()) { |
| auto success = real_paths.emplace(path.path).second; |
| if (success) { |
| paths->emplace_back(std::move(path)); |
| } else { |
| LOG(WARNING) << "a duplicated path is found " << path.path; |
| return Status::Error<INVALID_ARGUMENT>("a duplicated path is found, path={}", |
| path.path); |
| } |
| } else { |
| LOG(WARNING) << "failed to parse store path " << item << ", res=" << res; |
| } |
| } |
| if ((path_vec.size() != paths->size() && !config::ignore_broken_disk)) { |
| return Status::Error<INVALID_ARGUMENT>("fail to parse storage_root_path config. value={}", |
| config_path); |
| } |
| return Status::OK(); |
| } |
| |
| void parse_conf_broken_store_paths(const string& config_path, std::set<std::string>* paths) { |
| std::vector<string> path_vec = absl::StrSplit(config_path, ";", absl::SkipWhitespace()); |
| if (path_vec.empty()) { |
| return; |
| } |
| if (path_vec.back().empty()) { |
| // deal with the case that user add `;` to the tail |
| path_vec.pop_back(); |
| } |
| for (auto& item : path_vec) { |
| paths->emplace(item); |
| } |
| return; |
| } |
| |
| /** format: |
| * [ |
| * {"path": "storage1", "total_size":53687091200,"query_limit": "10737418240"}, |
| * {"path": "storage2", "total_size":53687091200}, |
| * {"path": "storage3", "total_size":53687091200, "ttl_percent":50, "normal_percent":40, "disposable_percent":5, "index_percent":5} |
| * {"path": "xxx", "total_size":53687091200, "storage": "memory"} |
| * ] |
| */ |
| Status parse_conf_cache_paths(const std::string& config_path, std::vector<CachePath>& paths) { |
| using namespace rapidjson; |
| Document document; |
| document.Parse(config_path.c_str()); |
| DCHECK(document.IsArray()) << config_path << " " << document.GetType(); |
| for (auto& config : document.GetArray()) { |
| auto map = config.GetObject(); |
| DCHECK(map.HasMember(CACHE_PATH.c_str())); |
| std::string path = map.FindMember(CACHE_PATH.c_str())->value.GetString(); |
| std::string storage = CACHE_STORAGE_DISK; // disk storage by default |
| if (map.HasMember(CACHE_STORAGE.c_str())) { |
| storage = map.FindMember(CACHE_STORAGE.c_str())->value.GetString(); |
| if (storage != CACHE_STORAGE_DISK && storage != CACHE_STORAGE_MEMORY) [[unlikely]] { |
| return Status::InvalidArgument("invalid file cache storage type: " + storage); |
| } |
| if (storage == CACHE_STORAGE_MEMORY) { |
| // set path to "memory" for memory storage |
| // so that we can track it by path (use _path_to_cache map) |
| path = CACHE_STORAGE_MEMORY; |
| } |
| } |
| int64_t total_size = 0, query_limit_bytes = 0; |
| if (map.HasMember(CACHE_TOTAL_SIZE.c_str())) { |
| auto& value = map.FindMember(CACHE_TOTAL_SIZE.c_str())->value; |
| if (value.IsInt64()) { |
| total_size = value.GetInt64(); |
| } else { |
| total_size = 0; |
| LOG(WARNING) << "[FileCache] the value of " << CACHE_TOTAL_SIZE.c_str() |
| << " is not int64: " << value.GetString() << " , use 0 as default"; |
| } |
| } |
| if (config::enable_file_cache_query_limit) { |
| if (map.HasMember(CACHE_QUERY_LIMIT_SIZE.c_str())) { |
| auto& value = map.FindMember(CACHE_QUERY_LIMIT_SIZE.c_str())->value; |
| if (value.IsInt64()) { |
| query_limit_bytes = value.GetInt64(); |
| } else { |
| query_limit_bytes = 0; |
| LOG(WARNING) << "[FileCache] the value of " << CACHE_QUERY_LIMIT_SIZE.c_str() |
| << " is not int64: " << value.GetString() << " , use 0 as default"; |
| } |
| } |
| } |
| if (total_size < 0 || (config::enable_file_cache_query_limit && query_limit_bytes < 0)) { |
| return Status::InvalidArgument("total_size or query_limit should not less than zero"); |
| } |
| |
| // percent |
| auto get_percent_value = [&](const std::string& key, size_t& percent) { |
| auto& value = map.FindMember(key.c_str())->value; |
| if (value.IsUint()) { |
| percent = value.GetUint(); |
| } else { |
| return Status::InvalidArgument("percent should be uint"); |
| } |
| return Status::OK(); |
| }; |
| |
| size_t normal_percent = io::DEFAULT_NORMAL_PERCENT; |
| size_t disposable_percent = io::DEFAULT_DISPOSABLE_PERCENT; |
| size_t index_percent = io::DEFAULT_INDEX_PERCENT; |
| size_t ttl_percent = io::DEFAULT_TTL_PERCENT; |
| bool has_normal_percent = map.HasMember(CACHE_NORMAL_PERCENT.c_str()); |
| bool has_disposable_percent = map.HasMember(CACHE_DISPOSABLE_PERCENT.c_str()); |
| bool has_index_percent = map.HasMember(CACHE_INDEX_PERCENT.c_str()); |
| bool has_ttl_percent = map.HasMember(CACHE_TTL_PERCENT.c_str()); |
| if (has_normal_percent && has_disposable_percent && has_index_percent && has_ttl_percent) { |
| RETURN_IF_ERROR(get_percent_value(CACHE_NORMAL_PERCENT, normal_percent)); |
| RETURN_IF_ERROR(get_percent_value(CACHE_DISPOSABLE_PERCENT, disposable_percent)); |
| RETURN_IF_ERROR(get_percent_value(CACHE_INDEX_PERCENT, index_percent)); |
| RETURN_IF_ERROR(get_percent_value(CACHE_TTL_PERCENT, ttl_percent)); |
| } else if (has_normal_percent || has_disposable_percent || has_index_percent || |
| has_ttl_percent) { |
| return Status::InvalidArgument( |
| "cache percent (ttl_percent, index_percent, normal_percent, " |
| "disposable_percent) must either be all set or all unset. " |
| "when all unset, use default: ttl_percent=50, index_percent=5, " |
| "normal_percent=40, disposable_percent=5."); |
| } |
| if ((normal_percent + disposable_percent + index_percent + ttl_percent) != 100) { |
| return Status::InvalidArgument("The sum of cache percent config must equal 100."); |
| } |
| |
| paths.emplace_back(std::move(path), total_size, query_limit_bytes, normal_percent, |
| disposable_percent, index_percent, ttl_percent, storage); |
| } |
| if (paths.empty()) { |
| return Status::InvalidArgument("fail to parse storage_root_path config. value={}", |
| config_path); |
| } |
| return Status::OK(); |
| } |
| |
| io::FileCacheSettings CachePath::init_settings() const { |
| return io::get_file_cache_settings(total_bytes, query_limit_bytes, normal_percent, |
| disposable_percent, index_percent, ttl_percent, storage); |
| } |
| |
| } // end namespace doris |