blob: 5113c7f8b3e9439061a81ed70febb46bb5298f5f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.h
// and modified by Doris
#include "io/cache/block_file_cache_downloader.h"
#include <bthread/bthread.h>
#include <bthread/countdown_event.h>
#include <bthread/unstable.h>
#include <bvar/bvar.h>
#include <fmt/core.h>
#include <gen_cpp/internal_service.pb.h>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <variant>
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "io/fs/file_reader.h"
#include "io/io_common.h"
#include "olap/rowset/beta_rowset.h"
#include "util/bvar_helper.h"
namespace doris::io {
bvar::Adder<uint64_t> g_file_cache_download_submitted_size("file_cache_download_submitted_size");
bvar::Adder<uint64_t> g_file_cache_download_finished_size("file_cache_download_finished_size");
bvar::Adder<uint64_t> g_file_cache_download_submitted_num("file_cache_download_submitted_num");
bvar::Adder<uint64_t> g_file_cache_download_finished_num("file_cache_download_finished_num");
bvar::Adder<uint64_t> g_file_cache_download_failed_num("file_cache_download_failed_num");
bvar::Adder<uint64_t> block_file_cache_downloader_task_total("file_cache_downloader_queue_total");
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this);
auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
.set_min_threads(config::file_cache_downloader_thread_num_min)
.set_max_threads(config::file_cache_downloader_thread_num_max)
.build(&_workers);
CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
}
FileCacheBlockDownloader::~FileCacheBlockDownloader() {
{
std::lock_guard lock(_mtx);
_closed = true;
}
_empty.notify_all();
if (_poller.joinable()) {
_poller.join();
}
if (_workers) {
_workers->shutdown();
}
}
void FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
if (!config::enable_file_cache) [[unlikely]] {
LOG(INFO) << "Skip submit download file task because file cache is not enabled";
return;
}
if (task.task_message.index() == 0) { // download file cache block task
std::lock_guard lock(_inflight_mtx);
for (auto& meta : std::get<0>(task.task_message)) {
++_inflight_tablets[meta.tablet_id()];
if (meta.size() > 0) {
g_file_cache_download_submitted_size << meta.size();
}
LOG(INFO) << "submit_download_task: inflight_tablets[" << meta.tablet_id()
<< "] = " << _inflight_tablets[meta.tablet_id()];
}
} else {
int64_t download_size = std::get<1>(task.task_message).download_size;
if (download_size > 0) {
g_file_cache_download_submitted_size << download_size;
}
}
{
std::lock_guard lock(_mtx);
if (_task_queue.size() == _max_size) {
if (_task_queue.front().task_message.index() == 1) { // download segment file task
auto& download_file_meta = std::get<1>(_task_queue.front().task_message);
if (download_file_meta.download_done) {
download_file_meta.download_done(
Status::InternalError("The downloader queue is full"));
}
g_file_cache_download_failed_num << 1;
}
LOG(INFO) << "submit_download_task: task queue full, pop front";
_task_queue.pop_front(); // Eliminate the earliest task in the queue
block_file_cache_downloader_task_total << -1;
}
VLOG_DEBUG << "submit_download_task: push task, queue size before push: "
<< _task_queue.size();
_task_queue.push_back(std::move(task));
block_file_cache_downloader_task_total << 1;
_empty.notify_all();
}
g_file_cache_download_submitted_num << 1;
}
void FileCacheBlockDownloader::polling_download_task() {
constexpr int64_t hot_interval = 2 * 60 * 60; // 2 hours
while (!_closed) {
DownloadTask task;
{
std::unique_lock lock(_mtx);
_empty.wait(lock, [this]() { return !_task_queue.empty() || _closed; });
if (_closed) {
LOG(INFO) << "polling_download_task: downloader closed, exit polling";
break;
}
task = std::move(_task_queue.front());
_task_queue.pop_front();
block_file_cache_downloader_task_total << -1;
VLOG_DEBUG << "polling_download_task: pop task, queue size after pop: "
<< _task_queue.size();
}
if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() -
task.atime)
.count() < hot_interval) {
VLOG_DEBUG << "polling_download_task: submit download_blocks to thread pool";
auto st = _workers->submit_func(
[this, task_ = std::move(task)]() mutable { download_blocks(task_); });
if (!st.ok()) {
LOG(WARNING) << "submit download blocks failed: " << st;
}
}
}
}
void FileCacheBlockDownloader::check_download_task(const std::vector<int64_t>& tablets,
std::map<int64_t, bool>* done) {
std::lock_guard lock(_inflight_mtx);
for (int64_t tablet_id : tablets) {
done->insert({tablet_id, !_inflight_tablets.contains(tablet_id)});
}
}
std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
};
constexpr bool include_stale = false;
tablet->traverse_rowsets(visitor, include_stale);
return id_to_rowset_meta_map;
}
void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::unordered_set<int64_t> synced_tablets;
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id()
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
<< ", offset=" << meta.offset() << ", size=" << meta.size()
<< ", type=" << meta.cache_type();
CloudTabletSPtr tablet;
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) {
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error();
return;
} else {
tablet = std::move(res).value();
}
if (!synced_tablets.contains(meta.tablet_id())) {
auto st = tablet->sync_rowsets();
if (!st) {
// just log failed, try it best
LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
<< " err msg: " << st.to_string();
}
synced_tablets.insert(meta.tablet_id());
}
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
<< " rowset_id not found, rowset_id=" << meta.rowset_id();
return;
}
auto storage_resource = find_it->second->remote_storage_resource();
if (!storage_resource) {
LOG(WARNING) << storage_resource.error();
return;
}
auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
std::lock_guard lock(_inflight_mtx);
auto it = _inflight_tablets.find(tablet_id);
TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
if (it == _inflight_tablets.end()) {
LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id;
} else {
it->second--;
VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id
<< "] = " << it->second;
if (it->second <= 0) {
DCHECK_EQ(it->second, 0) << it->first;
_inflight_tablets.erase(it);
VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id
<< "]";
}
}
LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id
<< " status=" << st.to_string();
};
std::string path;
doris::FileType file_type =
meta.has_file_type() ? meta.file_type() : doris::FileType::SEGMENT_FILE;
bool is_index = (file_type == doris::FileType::INVERTED_INDEX_FILE);
if (is_index) {
path = storage_resource.value()->remote_idx_v2_path(*find_it->second,
meta.segment_id());
} else {
// default .dat
path = storage_resource.value()->remote_segment_path(*find_it->second,
meta.segment_id());
}
DownloadFileMeta download_meta {
.path = path,
.file_size = meta.has_file_size() ? meta.file_size()
: -1, // To avoid trigger get file size IO
.offset = meta.offset(),
.download_size = meta.size(),
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true,
},
.download_done = std::move(download_done),
};
download_segment_file(download_meta);
});
}
void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& meta) {
LOG(INFO) << "download_segment_file: start, path=" << meta.path << ", offset=" << meta.offset
<< ", download_size=" << meta.download_size << ", file_size=" << meta.file_size;
FileReaderSPtr file_reader;
FileReaderOptions opts {
.cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
.is_doris_table = true,
.cache_base_path {},
.file_size = meta.file_size,
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
if (meta.download_done) {
meta.download_done(std::move(st));
}
g_file_cache_download_failed_num << 1;
return;
}
size_t one_single_task_size = config::s3_write_buffer_size;
int64_t download_size = meta.download_size > 0 ? meta.download_size : file_reader->size();
size_t task_num = (download_size + one_single_task_size - 1) / one_single_task_size;
std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
auto sleep_time = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"FileCacheBlockDownloader::download_segment_file_sleep", "sleep_time", 3);
LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep: sleep_time="
<< sleep_time;
sleep(sleep_time);
});
size_t task_offset = 0;
for (size_t i = 0; i < task_num; i++) {
size_t offset = meta.offset + task_offset;
size_t size = std::min(one_single_task_size,
static_cast<size_t>(meta.download_size - task_offset));
size_t bytes_read;
VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", read_at offset=" << offset
<< ", size=" << size;
// TODO(plat1ko):
// 1. Directly append buffer data to file cache
// 2. Provide `FileReader::async_read()` interface
DCHECK(meta.ctx.is_dryrun == config::enable_reader_dryrun_when_download_file_cache);
st = file_reader->read_at(offset, {buffer.get(), size}, &bytes_read, &meta.ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
if (meta.download_done) {
meta.download_done(std::move(st));
}
g_file_cache_download_failed_num << 1;
return;
}
task_offset += size;
g_file_cache_download_finished_size << size;
}
if (meta.download_done) {
LOG(INFO) << "download_segment_file: download finished, path=" << meta.path;
meta.download_done(Status::OK());
}
g_file_cache_download_finished_num << 1;
}
void FileCacheBlockDownloader::download_blocks(DownloadTask& task) {
switch (task.task_message.index()) {
case 0: {
bool should_balance_task = true;
DBUG_EXECUTE_IF("FileCacheBlockDownloader.download_blocks.balance_task",
{ should_balance_task = false; });
if (should_balance_task) {
download_file_cache_block(std::get<0>(task.task_message));
}
break;
}
case 1:
download_segment_file(std::get<1>(task.task_message));
break;
}
}
} // namespace doris::io