blob: 299888ab2e1c3c2d37196c64793304b20abab608 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/cache/file_block.h"
#include <glog/logging.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <sstream>
#include <string>
#include <thread>
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
namespace doris {
namespace io {
std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) {
os << FileBlock::state_to_string(value);
return os;
}
FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr,
State download_state)
: _block_range(key.offset, key.offset + size - 1),
_download_state(download_state),
_mgr(mgr),
_key(key) {
/// On creation, file block state can be EMPTY, DOWNLOADED, SKIP_CACHE.
switch (_download_state) {
case State::DOWNLOADING: {
DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE ";
break;
}
default: {
break;
}
}
}
FileBlock::State FileBlock::state() const {
std::lock_guard block_lock(_mutex);
return _download_state;
}
FileBlock::State FileBlock::state_unsafe() const {
return _download_state;
}
uint64_t FileBlock::get_caller_id() {
uint64_t id;
#if defined(__APPLE__)
// On macOS, use pthread_threadid_np to get the thread ID
pthread_threadid_np(nullptr, &id);
#else
id = static_cast<uint64_t>(pthread_self());
#endif
DCHECK(id != 0);
return id;
}
uint64_t FileBlock::get_or_set_downloader() {
std::lock_guard block_lock(_mutex);
if (_downloader_id == 0 && _download_state != State::DOWNLOADED) {
DCHECK(_download_state != State::DOWNLOADING);
_downloader_id = get_caller_id();
_download_state = State::DOWNLOADING;
} else if (_downloader_id == get_caller_id()) {
LOG(INFO) << "Attempt to set the same downloader for block " << range().to_string()
<< " for the second time";
}
return _downloader_id;
}
void FileBlock::reset_downloader(std::lock_guard<std::mutex>& block_lock) {
DCHECK(_downloader_id != 0) << "There is no downloader";
DCHECK(get_caller_id() == _downloader_id) << "Downloader can be reset only by downloader";
reset_downloader_impl(block_lock);
}
void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) {
if (_downloaded_size == range().size()) {
Status st = set_downloaded(block_lock);
if (!st.ok()) {
LOG(WARNING) << "reset downloader error" << st;
}
} else {
_downloaded_size = 0;
_download_state = State::EMPTY;
_downloader_id = 0;
}
}
Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* block_lock */) {
DCHECK(_download_state != State::DOWNLOADED);
DCHECK_NE(_downloaded_size, 0);
Status status = _mgr->_storage->finalize(_key, this->_block_range.size());
if (status.ok()) [[likely]] {
_download_state = State::DOWNLOADED;
} else {
_download_state = State::EMPTY;
_downloaded_size = 0;
}
_downloader_id = 0;
return status;
}
uint64_t FileBlock::get_downloader() const {
std::lock_guard block_lock(_mutex);
return _downloader_id;
}
bool FileBlock::is_downloader() const {
std::lock_guard block_lock(_mutex);
return get_caller_id() == _downloader_id;
}
bool FileBlock::is_downloader_impl(std::lock_guard<std::mutex>& /* block_lock */) const {
return get_caller_id() == _downloader_id;
}
Status FileBlock::append(Slice data) {
DCHECK(data.size != 0) << "Writing zero size is not allowed";
RETURN_IF_ERROR(_mgr->_storage->append(_key, data));
_downloaded_size += data.size;
return Status::OK();
}
Status FileBlock::finalize() {
if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) {
SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
size_t old_size = _block_range.size();
_block_range.right = _block_range.left + _downloaded_size - 1;
size_t new_size = _block_range.size();
DCHECK(new_size < old_size);
_mgr->reset_range(_key.hash, _block_range.left, old_size, new_size, cache_lock);
}
std::lock_guard block_lock(_mutex);
Status st = set_downloaded(block_lock);
_cv.notify_all();
return st;
}
Status FileBlock::read(Slice buffer, size_t read_offset) {
return _mgr->_storage->read(_key, read_offset, buffer);
}
Status FileBlock::change_cache_type(FileCacheType new_type) {
SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
return change_cache_type_lock(new_type, cache_lock);
}
Status FileBlock::change_cache_type_lock(FileCacheType new_type,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard block_lock(_mutex);
if (new_type == _key.meta.type) {
return Status::OK();
}
if (_download_state == State::DOWNLOADED) {
Status st;
TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type, _block_range.size()));
}
_mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock);
_key.meta.type = new_type;
return Status::OK();
}
FileBlock::State FileBlock::wait() {
std::unique_lock block_lock(_mutex);
if (_downloader_id == 0) {
return _download_state;
}
if (_download_state == State::DOWNLOADING) {
DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id());
_cv.wait_for(block_lock, std::chrono::milliseconds(config::block_cache_wait_timeout_ms));
}
return _download_state;
}
void FileBlock::complete_unlocked(std::lock_guard<std::mutex>& block_lock) {
if (is_downloader_impl(block_lock)) {
reset_downloader(block_lock);
_cv.notify_all();
}
}
std::string FileBlock::get_info_for_log() const {
std::lock_guard block_lock(_mutex);
return get_info_for_log_impl(block_lock);
}
std::string FileBlock::get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const {
std::stringstream info;
info << "File block: " << range().to_string() << ", ";
info << "state: " << state_to_string(_download_state) << ", ";
info << "size: " << _block_range.size() << ", ";
info << "downloader id: " << _downloader_id << ", ";
info << "caller id: " << get_caller_id();
return info.str();
}
FileBlock::State FileBlock::state_unlock(std::lock_guard<std::mutex>&) const {
return _download_state;
}
std::string FileBlock::state_to_string(FileBlock::State state) {
switch (state) {
case FileBlock::State::DOWNLOADED:
return "DOWNLOADED";
case FileBlock::State::EMPTY:
return "EMPTY";
case FileBlock::State::DOWNLOADING:
return "DOWNLOADING";
case FileBlock::State::SKIP_CACHE:
return "SKIP_CACHE";
default:
DCHECK(false);
return "";
}
}
std::string FileBlock::get_cache_file() const {
return _mgr->_storage->get_local_file(this->_key);
}
FileBlocksHolder::~FileBlocksHolder() {
for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) {
auto current_file_block_it = file_block_it;
auto& file_block = *current_file_block_it;
BlockFileCache* _mgr = file_block->_mgr;
{
bool should_remove = false;
{
std::lock_guard block_lock(file_block->_mutex);
file_block->complete_unlocked(block_lock);
if (file_block.use_count() == 2 &&
(file_block->is_deleting() ||
file_block->state_unlock(block_lock) == FileBlock::State::EMPTY)) {
should_remove = true;
}
}
if (should_remove) {
SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
std::lock_guard block_lock(file_block->_mutex);
if (file_block.use_count() == 2) {
DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING);
// one in cache, one in here
if (file_block->is_deleting() ||
file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) {
_mgr->remove(file_block, cache_lock, block_lock, false);
}
}
}
}
file_block_it = file_blocks.erase(current_file_block_it);
}
}
} // namespace io
} // namespace doris