blob: a94b3321169f252f66a57a06a558f0c50fdf3f60 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/s3_file_writer.h"
#include <aws/s3/model/CompletedPart.h>
#include <bvar/recorder.h>
#include <bvar/reducer.h>
#include <bvar/window.h>
#include <fmt/core.h>
#include <glog/logging.h>
#include <sstream>
#include <tuple>
#include <utility>
#include "common/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/file_writer.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
#include "io/fs/s3_obj_storage_client.h"
#include "runtime/exec_env.h"
#include "util/s3_util.h"
#include "util/stopwatch.hpp"
namespace doris::io {
#include "common/compile_check_begin.h"
bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num");
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written");
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created");
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written");
bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing");
bvar::Adder<uint64_t> s3_file_writer_async_close_processing(
"s3_file_writer_async_close_processing");
bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder;
bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window(
"s3_file_writer_first_append_to_close_ms",
&s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket,
std::string key, const FileWriterOptions* opts)
: _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key),
.bucket = std::move(bucket),
.key = std::move(key)}),
_used_by_s3_committer(opts ? opts->used_by_s3_committer : false),
_obj_client(std::move(client)) {
s3_file_writer_total << 1;
s3_file_being_written << 1;
Aws::Http::SetCompliantRfc3986Encoding(true);
init_cache_builder(opts, _obj_storage_path_opts.path);
}
S3FileWriter::~S3FileWriter() {
if (_async_close_pack != nullptr) {
// For thread safety
std::ignore = _async_close_pack->future.get();
_async_close_pack = nullptr;
} else {
// Consider one situation where the file writer is destructed after it submit at least one async task
// without calling close(), then there exists one occasion where the async task is executed right after
// the correspoding S3 file writer is already destructed
_wait_until_finish(fmt::format("wait s3 file {} upload to be finished",
_obj_storage_path_opts.path.native()));
}
// We won't do S3 abort operation in BE, we let s3 service do it own.
if (state() == State::OPENED && !_failed) {
s3_bytes_written_total << _bytes_appended;
}
s3_file_being_written << -1;
}
Status S3FileWriter::_create_multi_upload_request() {
LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
const auto& client = _obj_client->get();
if (nullptr == client) {
return Status::InternalError<false>("invalid obj storage client");
}
auto resp = client->create_multipart_upload(_obj_storage_path_opts);
if (resp.resp.status.code == ErrorCode::OK) {
_obj_storage_path_opts.upload_id = resp.upload_id;
}
return {resp.resp.status.code, std::move(resp.resp.status.msg)};
}
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
auto timeout_duration = config::s3_file_writer_log_interval_second;
auto msg = fmt::format(
"{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}",
task_name, timeout_duration, _obj_storage_path_opts.bucket,
_obj_storage_path_opts.path.native(),
_obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : "");
timespec current_time;
// We don't need high accuracy here, so we use time(nullptr)
// since it's the fastest way to get current time(second)
auto current_time_second = time(nullptr);
current_time.tv_sec = current_time_second + timeout_duration;
current_time.tv_nsec = 0;
// bthread::countdown_event::timed_wait() should use absolute time
while (0 != _countdown_event.timed_wait(current_time)) {
current_time.tv_sec += timeout_duration;
LOG(WARNING) << msg;
}
}
Status S3FileWriter::close(bool non_block) {
auto record_close_latency = [this]() {
if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
return;
}
auto now = std::chrono::steady_clock::now();
auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *_first_append_timestamp)
.count();
s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
sampler->take_sample();
}
_close_latency_recorded = true;
};
if (state() == State::CLOSED) {
if (_async_close_pack != nullptr) {
_st = _async_close_pack->future.get();
_async_close_pack = nullptr;
// Return the final close status so that a blocking close issued after
// an async close observes the real result just like the legacy behavior.
if (!non_block && _st.ok()) {
record_close_latency();
}
return _st;
}
if (non_block) {
if (_st.ok()) {
record_close_latency();
return Status::Error<ErrorCode::ALREADY_CLOSED>(
"S3FileWriter already closed, file path {}, file key {}",
_obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
}
return _st;
}
if (_st.ok()) {
record_close_latency();
return Status::Error<ErrorCode::ALREADY_CLOSED>(
"S3FileWriter already closed, file path {}, file key {}",
_obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
}
return _st;
}
if (state() == State::ASYNC_CLOSING) {
if (non_block) {
return Status::InternalError("Don't submit async close multi times");
}
CHECK(_async_close_pack != nullptr);
_st = _async_close_pack->future.get();
_async_close_pack = nullptr;
// We should wait for all the pre async task to be finished
_state = State::CLOSED;
// The next time we call close() with no matter non_block true or false, it would always return the
// '_st' value because this writer is already closed.
if (!non_block && _st.ok()) {
record_close_latency();
}
return _st;
}
if (non_block) {
_state = State::ASYNC_CLOSING;
_async_close_pack = std::make_unique<AsyncCloseStatusPack>();
_async_close_pack->future = _async_close_pack->promise.get_future();
s3_file_writer_async_close_queuing << 1;
return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
s3_file_writer_async_close_queuing << -1;
s3_file_writer_async_close_processing << 1;
_st = _close_impl();
_state = State::CLOSED;
_async_close_pack->promise.set_value(_st);
s3_file_writer_async_close_processing << -1;
});
}
_st = _close_impl();
_state = State::CLOSED;
if (!non_block && _st.ok()) {
record_close_latency();
}
return _st;
}
bool S3FileWriter::_complete_part_task_callback(Status s) {
bool ret = false;
if (!s.ok()) [[unlikely]] {
VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
<< ", status: " << s.to_string();
std::unique_lock<std::mutex> _lck {_completed_lock};
_failed = true;
ret = true;
_st = std::move(s);
}
// After the signal, there is a scenario where the previous invocation of _wait_until_finish
// returns to the caller, and subsequently, the S3 file writer is destructed.
// This means that accessing _failed afterwards would result in a heap use after free vulnerability.
_countdown_event.signal();
return ret;
}
Status S3FileWriter::_build_upload_buffer() {
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
.set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
})
.set_file_offset(_bytes_appended)
.set_sync_after_complete_task([this](auto&& PH1) {
return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1));
})
.set_is_cancelled([this]() { return _failed.load(); });
if (_cache_builder != nullptr) {
// We would load the data into file cache asynchronously which indicates
// that this instance of S3FileWriter might have been destructed when we
// try to do writing into file cache, so we make the lambda capture the variable
// we need by value to extend their lifetime
int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0);
builder.set_allocate_file_blocks_holder([builder = *_cache_builder,
offset = _bytes_appended,
tablet_id = id]() -> FileBlocksHolderPtr {
return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id);
});
}
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
return Status::OK();
}
Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();
if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
if (_bytes_appended == 0) {
DCHECK_EQ(_cur_part_num, 1);
// No data written, but need to create an empty file
RETURN_IF_ERROR(_build_upload_buffer());
if (!_used_by_s3_committer) {
auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); });
} else {
RETURN_IF_ERROR(_create_multi_upload_request());
}
}
if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
_countdown_event.add_count();
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}
RETURN_IF_ERROR(_complete());
SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status());
return Status::OK();
}
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
if (state() != State::OPENED) [[unlikely]] {
return Status::InternalError("append to closed file: {}",
_obj_storage_path_opts.path.native());
}
if (!_first_append_timestamp.has_value()) {
_first_append_timestamp = std::chrono::steady_clock::now();
}
size_t buffer_size = config::s3_write_buffer_size;
TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status());
for (size_t i = 0; i < data_cnt; i++) {
size_t data_size = data[i].get_size();
for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) {
if (_failed) {
return _st;
}
if (!_pending_buf) {
RETURN_IF_ERROR(_build_upload_buffer());
}
// we need to make sure all parts except the last one to be 5MB or more
// and shouldn't be larger than buf
data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() +
buffer_size - _bytes_appended);
// if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
// it would be written to cache then S3 if the buffer doesn't have memory preserved
RETURN_IF_ERROR(_pending_buf->append_data(
Slice {data[i].get_data() + pos, data_size_to_append}));
TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);
// If this is the last part and the data size is less than s3_write_buffer_size,
// the pending_buf will be handled by _close_impl() and _complete()
// If this is the last part and the data size is equal to s3_write_buffer_size,
// the pending_buf is handled here and submitted. it will be waited by _complete()
if (_pending_buf->get_size() == buffer_size) {
// only create multiple upload request when the data size is
// larger or equal to s3_write_buffer_size than one memory buffer
if (_cur_part_num == 1) {
RETURN_IF_ERROR(_create_multi_upload_request());
}
_cur_part_num++;
_countdown_event.add_count();
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}
_bytes_appended += data_size_to_append;
}
}
return Status::OK();
}
void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) {
VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
<< " part=" << part_num;
if (buf.is_cancelled()) {
LOG_INFO("file {} skip part {} because previous failure {}",
_obj_storage_path_opts.path.native(), part_num, _st);
return;
}
const auto& client = _obj_client->get();
if (nullptr == client) {
LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client",
_obj_storage_path_opts.key, part_num);
buf.set_status(Status::InternalError<false>("invalid obj storage client"));
return;
}
auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num);
if (resp.resp.status.code != ErrorCode::OK) {
LOG_WARNING("failed to upload part, key={}, part_num={}, status={}",
_obj_storage_path_opts.key, part_num, resp.resp.status.msg);
buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg)));
return;
}
s3_bytes_written_total << buf.get_size();
ObjectCompleteMultiPart completed_part {
static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""};
std::unique_lock<std::mutex> lck {_completed_lock};
_completed_parts.emplace_back(std::move(completed_part));
}
// if enabled check
// 1. issue a head object request for existence check
// 2. check the file size
Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res,
const ObjectStoragePathOptions& path_opt, int64_t bytes_appended,
const std::string& put_or_comp) {
if (!config::enable_s3_object_check_after_upload) return Status::OK();
auto head_res = client->head_object(path_opt);
// clang-format off
auto err_msg = [&]() {
std::stringstream ss;
ss << "failed to check object after upload=" << put_or_comp
<< " file_path=" << path_opt.path.native()
<< fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
<< fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
<< fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code
<< fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id
<< " head_err=" << head_res.resp.status.msg
<< " head_code=" << head_res.resp.status.code
<< " head_http_code=" << head_res.resp.http_code
<< " head_request_id=" << head_res.resp.request_id;
return ss.str();
};
// clang-format on
// TODO(gavin): make it fail by injection
TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) {
LOG(WARNING) << "failed to issue head object after upload, " << err_msg();
DCHECK(false) << "failed to issue head object after upload, " << err_msg();
// FIXME(gavin): we should retry if this HEAD fails?
return Status::IOError(
"failed to issue head object after upload, status_code={}, http_code={}, err={}",
head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg);
}
if (head_res.file_size != bytes_appended) {
LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended
<< " actual_size=" << head_res.file_size << err_msg();
DCHECK_EQ(bytes_appended, head_res.file_size)
<< "failed to check size after upload," << err_msg();
return Status::IOError(
"failed to check object size after upload, expected_size={} actual_size={}",
bytes_appended, head_res.file_size);
}
return Status::OK();
}
Status S3FileWriter::_complete() {
const auto& client = _obj_client->get();
if (nullptr == client) {
return Status::InternalError<false>("invalid obj storage client");
}
if (_failed) {
_wait_until_finish("early quit");
return _st;
}
// When the part num is only one, it means the data is less than 5MB so we can just put it.
if (_cur_part_num == 1) {
_wait_until_finish("PutObject");
return _st;
}
// Wait multipart load and finish.
_wait_until_finish("Complete");
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
std::make_pair(&_failed, &_completed_parts));
if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
s3_file_created_total << 1; // Assume that it will be created successfully
return Status::OK();
}
// check number of parts
int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
!!(_bytes_appended % config::s3_write_buffer_size);
int64_t expected_num_parts2 =
(_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
DCHECK_EQ(expected_num_parts1, expected_num_parts2)
<< " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
<< " s3_write_buffer_size=" << config::s3_write_buffer_size;
if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) ||
expected_num_parts1 != expected_num_parts2) {
_st = Status::InternalError(
"failed to complete multipart upload, error status={} failed={} #complete_parts={} "
"#expected_parts={} "
"completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
_st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
<< " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
<< " s3_write_buffer_size=" << config::s3_write_buffer_size;
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg,
_obj_storage_path_opts.path.native());
return {resp.status.code, std::move(resp.status.msg)};
}
RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
"complete_multipart"));
s3_file_created_total << 1;
return Status::OK();
}
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
// If used_by_s3_committer, we always use multi-parts uploading.
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
} else {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
}
return Status::OK();
}
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
MonotonicStopWatch timer;
timer.start();
if (state() == State::CLOSED) {
DCHECK(state() != State::CLOSED)
<< "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();
LOG_WARNING("failed to put object because file closed, file path {}",
_obj_storage_path_opts.path.native());
buf.set_status(Status::InternalError<false>("try to put closed file"));
return;
}
const auto& client = _obj_client->get();
if (nullptr == client) {
buf.set_status(Status::InternalError<false>("invalid obj storage client"));
return;
}
TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data());
timer.stop();
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms",
resp.status.msg, _obj_storage_path_opts.path.native(),
timer.elapsed_time_milliseconds());
buf.set_status({resp.status.code, std::move(resp.status.msg)});
return;
}
auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
"put_object");
if (!st.ok()) {
buf.set_status(st);
return;
}
LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
<< " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds()
<< "ms";
s3_file_created_total << 1;
s3_bytes_written_total << buf.get_size();
}
std::string S3FileWriter::_dump_completed_part() const {
std::stringstream ss;
ss << "part_numbers:";
for (const auto& part : _completed_parts) {
ss << " " << part.part_num;
}
return ss.str();
}
#include "common/compile_check_end.h"
} // namespace doris::io