blob: bdf804df76ea084339f890b84105f81435d08e39 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/spill/spill_stream_manager.h"
#include <fmt/format.h>
#include <glog/logging.h>
#include <algorithm>
#include <filesystem>
#include <memory>
#include <numeric>
#include <random>
#include <string>
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_define.h"
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "util/uid_util.h"
#include "vec/spill/spill_stream.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
SpillStreamManager::~SpillStreamManager() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
}
SpillStreamManager::SpillStreamManager(
std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>&&
spill_store_map)
: _spill_store_map(std::move(spill_store_map)), _stop_background_threads_latch(1) {}
Status SpillStreamManager::init() {
LOG(INFO) << "init spill stream manager";
RETURN_IF_ERROR(_init_spill_store_map());
for (const auto& [path, store] : _spill_store_map) {
auto gc_dir_root_dir = store->get_spill_data_gc_path();
bool exists = true;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, &exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir));
}
auto spill_dir = store->get_spill_data_path();
RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, &exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
} else {
auto suffix = ToStringFromUnixMillis(UnixMillis());
auto gc_dir = store->get_spill_data_gc_path(suffix);
if (std::filesystem::exists(gc_dir)) {
LOG(WARNING) << "gc dir already exists: " << gc_dir;
}
(void)io::global_local_filesystem()->rename(spill_dir, gc_dir);
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
}
}
RETURN_IF_ERROR(Thread::create(
"Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); },
&_spill_gc_thread));
LOG(INFO) << "spill gc thread started";
_init_metrics();
return Status::OK();
}
void SpillStreamManager::_init_metrics() {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("spill",
{{"name", "spill"}});
_spill_write_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_write_bytes");
_spill_write_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
_spill_write_bytes_metric.get()));
_spill_read_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_read_bytes");
_spill_read_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
_spill_read_bytes_metric.get()));
}
// clean up stale spilled files
void SpillStreamManager::_spill_gc_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::spill_gc_interval_ms))) {
gc(config::spill_gc_work_time_ms);
for (auto& [path, dir] : _spill_store_map) {
static_cast<void>(dir->update_capacity());
}
}
}
Status SpillStreamManager::_init_spill_store_map() {
for (const auto& store : _spill_store_map) {
RETURN_IF_ERROR(store.second->init());
}
return Status::OK();
}
std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
TStorageMedium::type storage_medium) {
std::vector<std::pair<SpillDataDir*, double>> stores_with_usage;
for (auto& [_, store] : _spill_store_map) {
if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) {
stores_with_usage.emplace_back(store.get(), store->_get_disk_usage(0));
}
}
if (stores_with_usage.empty()) {
return {};
}
std::sort(stores_with_usage.begin(), stores_with_usage.end(),
[](auto&& a, auto&& b) { return a.second < b.second; });
std::vector<SpillDataDir*> stores;
for (const auto& [store, _] : stores_with_usage) {
stores.emplace_back(store);
}
return stores;
}
Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStreamSPtr& spill_stream,
const std::string& query_id,
const std::string& operator_name, int32_t node_id,
int32_t batch_rows, size_t batch_bytes,
RuntimeProfile* operator_profile) {
auto data_dirs = _get_stores_for_spill(TStorageMedium::type::SSD);
if (data_dirs.empty()) {
data_dirs = _get_stores_for_spill(TStorageMedium::type::HDD);
}
if (data_dirs.empty()) {
return Status::Error<ErrorCode::NO_AVAILABLE_ROOT_PATH>(
"no available disk can be used for spill.");
}
uint64_t id = id_++;
std::string spill_dir;
SpillDataDir* data_dir = nullptr;
for (auto& dir : data_dirs) {
std::string spill_root_dir = dir->get_spill_data_path();
// storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, operator_name,
node_id, state->task_id(), id);
auto st = io::global_local_filesystem()->create_directory(spill_dir);
if (!st.ok()) {
std::cerr << "create spill dir failed: " << st.to_string();
continue;
}
data_dir = dir;
break;
}
if (!data_dir) {
return Status::Error<ErrorCode::CE_CMD_PARAMS_ERROR>(
"there is no available disk that can be used to spill.");
}
spill_stream = std::make_shared<SpillStream>(state, id, data_dir, spill_dir, batch_rows,
batch_bytes, operator_profile);
RETURN_IF_ERROR(spill_stream->prepare());
return Status::OK();
}
void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
stream->gc();
}
void SpillStreamManager::gc(int32_t max_work_time_ms) {
bool exists = true;
bool has_work = false;
int64_t max_work_time_ns = max_work_time_ms * 1000L * 1000L;
MonotonicStopWatch watch;
watch.start();
Defer defer {[&]() {
if (has_work) {
std::string msg(
fmt::format("spill gc time: {}",
PrettyPrinter::print(watch.elapsed_time(), TUnit::TIME_NS)));
msg += ", spill storage:\n";
for (const auto& [path, store_dir] : _spill_store_map) {
msg += " " + store_dir->debug_string();
msg += "\n";
}
LOG(INFO) << msg;
}
}};
for (const auto& [path, store_dir] : _spill_store_map) {
std::string gc_root_dir = store_dir->get_spill_data_gc_path();
std::error_code ec;
exists = std::filesystem::exists(gc_root_dir, ec);
if (ec || !exists) {
continue;
}
// dirs of queries
std::vector<io::FileInfo> dirs;
auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists);
if (!st.ok()) {
continue;
}
for (const auto& dir : dirs) {
has_work = true;
if (dir.is_file) {
continue;
}
std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name);
// operator spill sub dirs of a query
std::vector<io::FileInfo> files;
st = io::global_local_filesystem()->list(abs_dir, false, &files, &exists);
if (!st.ok()) {
continue;
}
if (files.empty()) {
static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir));
continue;
}
for (const auto& file : files) {
auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name);
if (file.is_file) {
static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
} else {
static_cast<void>(
io::global_local_filesystem()->delete_directory(abs_file_path));
}
if (watch.elapsed_time() > max_work_time_ns) {
break;
}
}
}
}
}
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_limit, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_avail_capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_data_size, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_data, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_gc_data, MetricUnit::BYTES);
SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
TStorageMedium::type storage_medium)
: _path(std::move(path)),
_disk_capacity_bytes(capacity_bytes),
_storage_medium(storage_medium) {
spill_data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
std::string("spill_data_dir.") + _path, {{"path", _path + "/" + SPILL_DIR_PREFIX}});
INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_capacity);
INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_limit);
INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_avail_capacity);
INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_data_size);
INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_data);
INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_gc_data);
}
bool is_directory_empty(const std::filesystem::path& dir) {
try {
return std::filesystem::is_directory(dir) &&
std::filesystem::directory_iterator(dir) ==
std::filesystem::end(std::filesystem::directory_iterator {});
// this method is not thread safe, the file referenced by directory_iterator
// maybe moved to spill_gc dir during this function call, so need to catch expection
} catch (const std::filesystem::filesystem_error&) {
return true;
}
}
Status SpillDataDir::init() {
bool exists = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
if (!exists) {
RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
"check file exist failed");
}
RETURN_IF_ERROR(update_capacity());
LOG(INFO) << fmt::format(
"spill storage path: {}, capacity: {}, limit: {}, available: "
"{}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_spill_data_limit_bytes),
PrettyPrinter::print_bytes(_available_bytes));
return Status::OK();
}
std::string SpillDataDir::get_spill_data_path(const std::string& query_id) const {
auto dir = fmt::format("{}/{}", _path, SPILL_DIR_PREFIX);
if (!query_id.empty()) {
dir = fmt::format("{}/{}", dir, query_id);
}
return dir;
}
std::string SpillDataDir::get_spill_data_gc_path(const std::string& sub_dir_name) const {
auto dir = fmt::format("{}/{}", _path, SPILL_GC_DIR_PREFIX);
if (!sub_dir_name.empty()) {
dir = fmt::format("{}/{}", dir, sub_dir_name);
}
return dir;
}
Status SpillDataDir::update_capacity() {
std::lock_guard<std::mutex> l(_mutex);
RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
&_available_bytes));
spill_disk_capacity->set_value(_disk_capacity_bytes);
spill_disk_avail_capacity->set_value(_available_bytes);
auto disk_use_max_bytes =
(int64_t)(_disk_capacity_bytes * config::storage_flood_stage_usage_percent / 100);
bool is_percent = true;
_spill_data_limit_bytes = ParseUtil::parse_mem_spec(config::spill_storage_limit, -1,
_disk_capacity_bytes, &is_percent);
if (_spill_data_limit_bytes <= 0) {
spill_disk_limit->set_value(_spill_data_limit_bytes);
auto err_msg = fmt::format("Failed to parse spill storage limit from '{}'",
config::spill_storage_limit);
LOG(WARNING) << err_msg;
return Status::InvalidArgument(err_msg);
}
if (is_percent) {
_spill_data_limit_bytes = (int64_t)(_spill_data_limit_bytes *
config::storage_flood_stage_usage_percent / 100);
}
if (_spill_data_limit_bytes > disk_use_max_bytes) {
_spill_data_limit_bytes = disk_use_max_bytes;
}
spill_disk_limit->set_value(_spill_data_limit_bytes);
std::string spill_root_dir = get_spill_data_path();
std::string spill_gc_root_dir = get_spill_data_gc_path();
spill_disk_has_spill_data->set_value(is_directory_empty(spill_root_dir) ? 0 : 1);
spill_disk_has_spill_gc_data->set_value(is_directory_empty(spill_gc_root_dir) ? 0 : 1);
return Status::OK();
}
bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) {
double used_pct = _get_disk_usage(incoming_data_size);
int64_t left_bytes = _available_bytes - incoming_data_size;
if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
<< ", left bytes: " << left_bytes << ", path: " << _path;
return true;
}
return false;
}
bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
std::lock_guard<std::mutex> l(_mutex);
if (_reach_disk_capacity_limit(incoming_data_size)) {
return true;
}
if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) {
LOG_EVERY_T(WARNING, 1) << fmt::format(
"spill data reach limit, path: {}, capacity: {}, limit: {}, used: {}, available: "
"{}, "
"incoming "
"bytes: {}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_spill_data_limit_bytes),
PrettyPrinter::print_bytes(_spill_data_bytes),
PrettyPrinter::print_bytes(_available_bytes),
PrettyPrinter::print_bytes(incoming_data_size));
return true;
}
return false;
}
std::string SpillDataDir::debug_string() {
return fmt::format(
"path: {}, capacity: {}, limit: {}, used: {}, available: "
"{}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_spill_data_limit_bytes),
PrettyPrinter::print_bytes(_spill_data_bytes),
PrettyPrinter::print_bytes(_available_bytes));
}
} // namespace doris::vectorized