blob: 0a23a1bc19065ff08131efe7ecfb65ca6ad68add [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "s3_file_bufferpool.h"
#include <bvar/bvar.h>
#include <crc32c/crc32c.h>
#include <chrono>
#include <memory>
#include "common/config.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/slice.h"
#include "vec/common/arena.h"
namespace doris {
namespace io {
bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated");
template <typename Allocator = Allocator<false>>
struct Memory : boost::noncopyable, Allocator {
Memory() = default;
explicit Memory(size_t size) : _size(size) {
alloc(size);
s3_file_buffer_allocated << 1;
}
~Memory() {
dealloc();
s3_file_buffer_allocated << -1;
}
void alloc(size_t size) { _data = static_cast<char*>(Allocator::alloc(size, 0)); }
void dealloc() {
if (_data == nullptr) {
return;
}
Allocator::free(_data, _size);
_data = nullptr;
}
size_t _size;
char* _data;
};
struct FileBuffer::PartData {
Memory<> _memory;
PartData() : _memory(config::s3_write_buffer_size) {}
~PartData() = default;
[[nodiscard]] Slice data() const { return Slice {_memory._data, _memory._size}; }
[[nodiscard]] size_t size() const { return _memory._size; }
};
Slice FileBuffer::get_slice() const {
return _inner_data->data();
}
FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder,
size_t offset, OperationState state)
: _type(type),
_alloc_holder(std::move(alloc_holder)),
_offset(offset),
_size(0),
_state(std::move(state)),
_inner_data(std::make_unique<FileBuffer::PartData>()),
_capacity(_inner_data->size()) {}
FileBuffer::~FileBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
_inner_data.reset();
}
/**
* 0. when there is memory preserved, directly write data to buf
* 1. write to file cache otherwise, then we'll wait for free buffer and to rob it
*/
Status UploadFileBuffer::append_data(const Slice& data) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::append_data", Status::OK(), this,
data.get_size());
std::memcpy((void*)(_inner_data->data().get_data() + _size), data.get_data(), data.get_size());
_size += data.get_size();
_crc_value = crc32c::Extend(_crc_value, (const uint8_t*)data.get_data(), data.get_size());
return Status::OK();
}
/**
* 0. constrcut the stream ptr if the buffer is not empty
* 1. submit the on_upload() callback to executor
*/
static Status submit_upload_buffer(std::shared_ptr<FileBuffer> buffer) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::submit", Status::OK(), buffer.get());
return ExecEnv::GetInstance()->s3_file_upload_thread_pool()->submit_func(
[buf = std::move(buffer)]() { buf->execute_async(); });
}
std::ostream& operator<<(std::ostream& os, const BufferType& value) {
switch (value) {
case BufferType::UPLOAD:
os << "upload";
break;
case BufferType::DOWNLOAD:
os << "download";
break;
default:
auto cast_value = static_cast<uint32_t>(value);
os << cast_value;
}
return os;
}
Status FileBuffer::submit(std::shared_ptr<FileBuffer> buf) {
switch (buf->_type) {
case BufferType::UPLOAD:
return submit_upload_buffer(std::move(buf));
break;
default:
CHECK(false) << "should never come here, the illegal type is " << buf->_type;
};
return Status::InternalError("should never come here");
}
std::string_view FileBuffer::get_string_view_data() const {
return {_inner_data->data().get_data(), _size};
}
void UploadFileBuffer::on_upload() {
_stream_ptr = std::make_shared<StringViewStream>(_inner_data->data().get_data(), _size);
if (_crc_value != crc32c::Crc32c(_inner_data->data().get_data(), _size)) {
DCHECK(false);
set_status(Status::IOError("Buffer checksum not match"));
return;
}
_upload_to_remote(*this);
if (config::enable_flush_file_cache_async) {
// If we call is_cancelled() after _state.set_status() then there might one situation where
// s3 file writer is already destructed
bool cancelled = is_cancelled();
_state.set_status();
// this control flow means the buf and the stream shares one memory
// so we can directly use buf here
upload_to_local_file_cache(cancelled);
} else {
upload_to_local_file_cache(is_cancelled());
_state.set_status();
}
}
/**
* write the content of the memory buffer to local file cache
*/
void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
if (!config::enable_file_cache || _alloc_holder == nullptr) {
return;
}
if (_holder) {
return;
}
if (is_cancelled) {
return;
}
TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache");
// the data is already written to S3 in this situation
// so i didn't handle the file cache write error
_holder = _alloc_holder();
size_t pos = 0;
size_t data_remain_size = _size;
for (auto& block : _holder->file_blocks) {
if (data_remain_size == 0) {
break;
}
size_t block_size = block->range().size();
size_t append_size = std::min(data_remain_size, block_size);
if (block->state() == FileBlock::State::EMPTY) {
block->get_or_set_downloader();
// Another thread may have started downloading due to a query
// Just skip putting to cache from UploadFileBuffer
if (block->is_downloader()) {
Slice s(_inner_data->data().get_data() + pos, append_size);
Status st = block->append(s);
TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache_inject",
&st);
if (st.ok()) {
st = block->finalize();
}
if (!st.ok()) {
{
[[maybe_unused]] bool ret = false;
TEST_SYNC_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache",
&ret);
}
LOG_WARNING("failed to append data to file cache").error(st);
}
}
}
data_remain_size -= append_size;
pos += append_size;
}
}
FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
_type = type;
return *this;
}
FileBufferBuilder& FileBufferBuilder::set_upload_callback(
std::function<void(UploadFileBuffer& buf)> cb) {
_upload_cb = std::move(cb);
return *this;
}
// set callback to do task sync for the caller
FileBufferBuilder& FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) {
_sync_after_complete_task = std::move(cb);
return *this;
}
FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder(
std::function<FileBlocksHolderPtr()> cb) {
_alloc_holder_cb = std::move(cb);
return *this;
}
Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
OperationState state(_sync_after_complete_task, _is_cancelled);
if (_type == BufferType::UPLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
std::move(_upload_cb), std::move(state), _offset,
std::move(_alloc_holder_cb)));
return Status::OK();
}
if (_type == BufferType::DOWNLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<DownloadFileBuffer>(
std::move(_download),
std::move(_write_to_local_file_cache),
std::move(_write_to_use_buffer), std::move(state),
_offset, std::move(_alloc_holder_cb)));
return Status::OK();
}
// should never come here
return Status::InternalError("unsupport buffer type {}", _type);
}
/**
* 0. check if we need to write into cache
* 1. check if there is free space inside the file cache
* 2. call the download callback
* 3. write the downloaded content into user buffer if necessary
*/
void DownloadFileBuffer::on_download() {
auto s = Status::OK();
Defer def {[&]() { _state.set_status(std::move(s)); }};
if (is_cancelled()) {
return;
}
FileBlocksHolderPtr holder = nullptr;
bool need_to_download_into_cache = false;
if (_alloc_holder != nullptr) {
holder = _alloc_holder();
std::for_each(holder->file_blocks.begin(), holder->file_blocks.end(),
[&need_to_download_into_cache](FileBlockSPtr& file_block) {
if (file_block->state() == FileBlock::State::EMPTY) {
file_block->get_or_set_downloader();
if (file_block->is_downloader()) {
need_to_download_into_cache = true;
}
}
});
if (!need_to_download_into_cache && !_write_to_use_buffer) [[unlikely]] {
LOG(INFO) << "Skipping download because that there is no space for catch data.";
} else {
Slice tmp = _inner_data->data();
s = _download(tmp);
if (s) {
_size = tmp.get_size();
if (_write_to_use_buffer != nullptr) {
_write_to_use_buffer({_inner_data->data().get_data(), get_size()},
get_file_offset());
}
if (need_to_download_into_cache) {
_write_to_local_file_cache(std::move(holder),
Slice {_inner_data->data().get_data(), _size});
}
} else {
LOG(WARNING) << s;
}
_state.set_status(std::move(s));
}
} else {
Slice tmp = _inner_data->data();
s = _download(tmp);
_size = tmp.get_size();
if (s.ok() && _write_to_use_buffer != nullptr) {
_write_to_use_buffer({_inner_data->data().get_data(), get_size()}, get_file_offset());
}
}
}
} // namespace io
} // namespace doris