blob: bc6d5179b2ff17f514d97f52e458b1afefd021b9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/options.h"
#include <absl/strings/ascii.h>
#include <absl/strings/str_split.h>
#include <ctype.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/rapidjson.h>
#include <stdlib.h>
#include <algorithm>
#include <memory>
#include <ostream>
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_define.h"
#include "olap/utils.h"
#include "util/path_util.h"
#include "util/string_util.h"
namespace doris {
using namespace ErrorCode;
using std::string;
using std::vector;
static std::string CAPACITY_UC = "CAPACITY";
static std::string MEDIUM_UC = "MEDIUM";
static std::string SSD_UC = "SSD";
static std::string HDD_UC = "HDD";
static std::string REMOTE_CACHE_UC = "REMOTE_CACHE";
static std::string CACHE_PATH = "path";
static std::string CACHE_TOTAL_SIZE = "total_size";
static std::string CACHE_QUERY_LIMIT_SIZE = "query_limit";
static std::string CACHE_NORMAL_PERCENT = "normal_percent";
static std::string CACHE_DISPOSABLE_PERCENT = "disposable_percent";
static std::string CACHE_INDEX_PERCENT = "index_percent";
static std::string CACHE_TTL_PERCENT = "ttl_percent";
static std::string CACHE_STORAGE = "storage";
static std::string CACHE_STORAGE_DISK = "disk";
static std::string CACHE_STORAGE_MEMORY = "memory";
// TODO: should be a general util method
// static std::string to_upper(const std::string& str) {
// std::string out = str;
// std::transform(out.begin(), out.end(), out.begin(), [](auto c) { return std::toupper(c); });
// return out;
// }
// Currently, both of three following formats are supported(see be.conf), remote cache is the
// local cache path for remote storage.
// format 1: /home/disk1/palo.HDD,50
// format 2: /home/disk1/palo,medium:ssd,capacity:50
// remote cache format: /home/disk/palo/cache,medium:remote_cache,capacity:50
Status parse_root_path(const string& root_path, StorePath* path) {
std::vector<string> tmp_vec = absl::StrSplit(root_path, ",", absl::SkipWhitespace());
// parse root path name
absl::StripAsciiWhitespace(&tmp_vec[0]);
tmp_vec[0].erase(tmp_vec[0].find_last_not_of('/') + 1);
if (tmp_vec[0].empty() || tmp_vec[0][0] != '/') {
return Status::Error<INVALID_ARGUMENT>("invalid store path. path={}", tmp_vec[0]);
}
string canonicalized_path;
RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(tmp_vec[0], &canonicalized_path));
path->path = tmp_vec[0];
// parse root path capacity and storage medium
string capacity_str;
string medium_str = HDD_UC;
string extension = path_util::file_extension(canonicalized_path);
if (!extension.empty()) {
medium_str = to_upper(extension.substr(1));
}
for (int i = 1; i < tmp_vec.size(); i++) {
// <property>:<value> or <value>
string property;
string value;
std::pair<string, string> pair = absl::StrSplit(tmp_vec[i], absl::MaxSplits(":", 1));
if (pair.second.empty()) {
// format_1: <value> only supports setting capacity
property = CAPACITY_UC;
value = tmp_vec[i];
} else {
// format_2
property = to_upper(pair.first);
value = pair.second;
}
absl::StripAsciiWhitespace(&property);
absl::StripAsciiWhitespace(&value);
if (property == CAPACITY_UC) {
capacity_str = value;
} else if (property == MEDIUM_UC) {
// property 'medium' has a higher priority than the extension of
// path, so it can override medium_str
medium_str = to_upper(value);
} else {
return Status::Error<INVALID_ARGUMENT>("invalid property of store path, {}",
tmp_vec[i]);
}
}
path->capacity_bytes = -1;
if (!capacity_str.empty()) {
if (!valid_signed_number<int64_t>(capacity_str) ||
strtol(capacity_str.c_str(), nullptr, 10) < 0) {
LOG(WARNING) << "invalid capacity of store path, capacity=" << capacity_str;
return Status::Error<INVALID_ARGUMENT>("invalid capacity of store path, capacity={}",
capacity_str);
}
path->capacity_bytes = strtol(capacity_str.c_str(), nullptr, 10) * GB_EXCHANGE_BYTE;
}
path->storage_medium = TStorageMedium::HDD;
if (!medium_str.empty()) {
if (medium_str == SSD_UC) {
path->storage_medium = TStorageMedium::SSD;
} else if (medium_str == HDD_UC) {
path->storage_medium = TStorageMedium::HDD;
} else if (medium_str == REMOTE_CACHE_UC) {
path->storage_medium = TStorageMedium::REMOTE_CACHE;
} else {
return Status::Error<INVALID_ARGUMENT>("invalid storage medium. medium={}", medium_str);
}
}
return Status::OK();
}
Status parse_conf_store_paths(const string& config_path, std::vector<StorePath>* paths) {
std::vector<string> path_vec = absl::StrSplit(config_path, ";", absl::SkipWhitespace());
if (path_vec.empty()) {
// means compute node
return Status::OK();
}
if (path_vec.back().empty()) {
// deal with the case that user add `;` to the tail
path_vec.pop_back();
}
std::set<std::string> real_paths;
for (auto& item : path_vec) {
StorePath path;
auto res = parse_root_path(item, &path);
if (res.ok()) {
auto success = real_paths.emplace(path.path).second;
if (success) {
paths->emplace_back(std::move(path));
} else {
LOG(WARNING) << "a duplicated path is found " << path.path;
return Status::Error<INVALID_ARGUMENT>("a duplicated path is found, path={}",
path.path);
}
} else {
LOG(WARNING) << "failed to parse store path " << item << ", res=" << res;
}
}
if ((path_vec.size() != paths->size() && !config::ignore_broken_disk)) {
return Status::Error<INVALID_ARGUMENT>("fail to parse storage_root_path config. value={}",
config_path);
}
return Status::OK();
}
void parse_conf_broken_store_paths(const string& config_path, std::set<std::string>* paths) {
std::vector<string> path_vec = absl::StrSplit(config_path, ";", absl::SkipWhitespace());
if (path_vec.empty()) {
return;
}
if (path_vec.back().empty()) {
// deal with the case that user add `;` to the tail
path_vec.pop_back();
}
for (auto& item : path_vec) {
paths->emplace(item);
}
return;
}
/** format:
* [
* {"path": "storage1", "total_size":53687091200,"query_limit": "10737418240"},
* {"path": "storage2", "total_size":53687091200},
* {"path": "storage3", "total_size":53687091200, "ttl_percent":50, "normal_percent":40, "disposable_percent":5, "index_percent":5}
* {"path": "xxx", "total_size":53687091200, "storage": "memory"}
* ]
*/
Status parse_conf_cache_paths(const std::string& config_path, std::vector<CachePath>& paths) {
using namespace rapidjson;
Document document;
document.Parse(config_path.c_str());
DCHECK(document.IsArray()) << config_path << " " << document.GetType();
for (auto& config : document.GetArray()) {
auto map = config.GetObject();
DCHECK(map.HasMember(CACHE_PATH.c_str()));
std::string path = map.FindMember(CACHE_PATH.c_str())->value.GetString();
std::string storage = CACHE_STORAGE_DISK; // disk storage by default
if (map.HasMember(CACHE_STORAGE.c_str())) {
storage = map.FindMember(CACHE_STORAGE.c_str())->value.GetString();
if (storage != CACHE_STORAGE_DISK && storage != CACHE_STORAGE_MEMORY) [[unlikely]] {
return Status::InvalidArgument("invalid file cache storage type: " + storage);
}
if (storage == CACHE_STORAGE_MEMORY) {
// set path to "memory" for memory storage
// so that we can track it by path (use _path_to_cache map)
path = CACHE_STORAGE_MEMORY;
}
}
int64_t total_size = 0, query_limit_bytes = 0;
if (map.HasMember(CACHE_TOTAL_SIZE.c_str())) {
auto& value = map.FindMember(CACHE_TOTAL_SIZE.c_str())->value;
if (value.IsInt64()) {
total_size = value.GetInt64();
} else {
total_size = 0;
LOG(WARNING) << "[FileCache] the value of " << CACHE_TOTAL_SIZE.c_str()
<< " is not int64: " << value.GetString() << " , use 0 as default";
}
}
if (config::enable_file_cache_query_limit) {
if (map.HasMember(CACHE_QUERY_LIMIT_SIZE.c_str())) {
auto& value = map.FindMember(CACHE_QUERY_LIMIT_SIZE.c_str())->value;
if (value.IsInt64()) {
query_limit_bytes = value.GetInt64();
} else {
query_limit_bytes = 0;
LOG(WARNING) << "[FileCache] the value of " << CACHE_QUERY_LIMIT_SIZE.c_str()
<< " is not int64: " << value.GetString() << " , use 0 as default";
}
}
}
if (total_size < 0 || (config::enable_file_cache_query_limit && query_limit_bytes < 0)) {
return Status::InvalidArgument("total_size or query_limit should not less than zero");
}
// percent
auto get_percent_value = [&](const std::string& key, size_t& percent) {
auto& value = map.FindMember(key.c_str())->value;
if (value.IsUint()) {
percent = value.GetUint();
} else {
return Status::InvalidArgument("percent should be uint");
}
return Status::OK();
};
size_t normal_percent = io::DEFAULT_NORMAL_PERCENT;
size_t disposable_percent = io::DEFAULT_DISPOSABLE_PERCENT;
size_t index_percent = io::DEFAULT_INDEX_PERCENT;
size_t ttl_percent = io::DEFAULT_TTL_PERCENT;
bool has_normal_percent = map.HasMember(CACHE_NORMAL_PERCENT.c_str());
bool has_disposable_percent = map.HasMember(CACHE_DISPOSABLE_PERCENT.c_str());
bool has_index_percent = map.HasMember(CACHE_INDEX_PERCENT.c_str());
bool has_ttl_percent = map.HasMember(CACHE_TTL_PERCENT.c_str());
if (has_normal_percent && has_disposable_percent && has_index_percent && has_ttl_percent) {
RETURN_IF_ERROR(get_percent_value(CACHE_NORMAL_PERCENT, normal_percent));
RETURN_IF_ERROR(get_percent_value(CACHE_DISPOSABLE_PERCENT, disposable_percent));
RETURN_IF_ERROR(get_percent_value(CACHE_INDEX_PERCENT, index_percent));
RETURN_IF_ERROR(get_percent_value(CACHE_TTL_PERCENT, ttl_percent));
} else if (has_normal_percent || has_disposable_percent || has_index_percent ||
has_ttl_percent) {
return Status::InvalidArgument(
"cache percent (ttl_percent, index_percent, normal_percent, "
"disposable_percent) must either be all set or all unset. "
"when all unset, use default: ttl_percent=50, index_percent=5, "
"normal_percent=40, disposable_percent=5.");
}
if ((normal_percent + disposable_percent + index_percent + ttl_percent) != 100) {
return Status::InvalidArgument("The sum of cache percent config must equal 100.");
}
paths.emplace_back(std::move(path), total_size, query_limit_bytes, normal_percent,
disposable_percent, index_percent, ttl_percent, storage);
}
if (paths.empty()) {
return Status::InvalidArgument("fail to parse storage_root_path config. value={}",
config_path);
}
return Status::OK();
}
io::FileCacheSettings CachePath::init_settings() const {
return io::get_file_cache_settings(total_bytes, query_limit_bytes, normal_percent,
disposable_percent, index_percent, ttl_percent, storage);
}
} // end namespace doris