blob: c24d64f2a926d508fe36f33e0e11ec8e3c12f7f6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/cache/whole_file_cache.h"
#include <fmt/format.h>
#include <glog/logging.h>
#include <filesystem>
#include <future>
#include <mutex>
#include <ostream>
#include <string>
#include "io/fs/local_file_system.h"
#include "io/io_common.h"
#include "runtime/exec_env.h"
#include "util/threadpool.h"
namespace doris {
using namespace ErrorCode;
namespace io {
const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE";
WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
io::FileReaderSPtr remote_file_reader)
: _cache_dir(cache_dir),
_alive_time_sec(alive_time_sec),
_remote_file_reader(remote_file_reader),
_cache_file_reader(nullptr) {}
WholeFileCache::~WholeFileCache() = default;
Status WholeFileCache::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
if (io_ctx != nullptr && io_ctx->reader_type != ReaderType::READER_QUERY) {
return _remote_file_reader->read_at(offset, result, bytes_read, io_ctx);
}
if (_cache_file_reader == nullptr) {
RETURN_IF_ERROR(_generate_cache_reader(offset, result.size));
}
std::shared_lock<std::shared_mutex> rlock(_cache_lock);
RETURN_NOT_OK_STATUS_WITH_WARN(
_cache_file_reader->read_at(offset, result, bytes_read, io_ctx),
fmt::format("Read local cache file failed: {}", _cache_file_reader->path().native()));
if (*bytes_read != result.size) {
return Status::Error<OS_ERROR>(
"read cache file failed: {}, bytes read: {} vs required size: {}",
_cache_file_reader->path().native(), *bytes_read, result.size);
}
update_last_match_time();
return Status::OK();
}
Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
Path cache_done_file =
_cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX);
bool done_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file, &done_file_exist),
"Check local cache done file exist failed.");
std::promise<Status> download_st;
std::future<Status> future = download_st.get_future();
if (!done_file_exist) {
ThreadPoolToken* thread_token =
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
if (thread_token != nullptr) {
auto st = thread_token->submit_func([this, &download_st, cache_done_file, cache_file] {
auto func = [this, cache_done_file, cache_file] {
bool done_file_exist = false;
bool cache_dir_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist),
fmt::format("Check local cache dir exist failed. {}",
_cache_dir.native()));
if (!cache_dir_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_directory(_cache_dir),
fmt::format("Create local cache dir failed. {}",
_cache_dir.native()));
} else {
// Judge again whether cache_done_file exists, it is possible that the cache
// is downloaded while waiting in the thread pool
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
cache_done_file, &done_file_exist),
"Check local cache done file exist failed.");
}
bool cache_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_file, &cache_file_exist),
"Check local cache file exist failed.");
if (done_file_exist && cache_file_exist) {
return Status::OK();
} else if (!done_file_exist && cache_file_exist) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->delete_file(cache_file),
fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
}
size_t req_size = _remote_file_reader->size();
RETURN_NOT_OK_STATUS_WITH_WARN(
download_cache_to_local(cache_file, cache_done_file,
_remote_file_reader, req_size),
"Download cache from remote to local failed.");
return Status::OK();
};
download_st.set_value(func());
});
if (!st.ok()) {
LOG(FATAL) << "Failed to submit download cache task to thread pool! " << st;
return st;
}
} else {
return Status::InternalError("Failed to get download cache thread token");
}
auto st = future.get();
if (!st.ok()) {
return st;
}
}
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader));
_cache_file_size = _cache_file_reader->size();
LOG(INFO) << "Create cache file from remote file successfully: "
<< _remote_file_reader->path().native() << " -> " << cache_file.native();
return Status::OK();
}
Status WholeFileCache::clean_timeout_cache() {
std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
_gc_match_time = _last_match_time;
if (time(nullptr) - _last_match_time > _alive_time_sec) {
_clean_cache_internal(nullptr);
}
return Status::OK();
}
Status WholeFileCache::clean_all_cache() {
std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
return _clean_cache_internal(nullptr);
}
Status WholeFileCache::clean_one_cache(size_t* cleaned_size) {
std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
if (_gc_match_time == _last_match_time) {
return _clean_cache_internal(cleaned_size);
}
return Status::OK();
}
Status WholeFileCache::_clean_cache_internal(size_t* cleaned_size) {
_cache_file_reader.reset();
_cache_file_size = 0;
Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
Path done_file =
_cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX);
RETURN_IF_ERROR(_remove_cache_and_done(cache_file, done_file, cleaned_size));
return _check_and_delete_empty_dir(_cache_dir);
}
bool WholeFileCache::is_gc_finish() const {
std::shared_lock<std::shared_mutex> rlock(_cache_lock);
return _cache_file_reader == nullptr;
}
} // namespace io
} // namespace doris