| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "io/fs/s3_file_writer.h" |
| |
| #include <aws/s3/model/CompletedPart.h> |
| #include <bvar/reducer.h> |
| #include <fmt/core.h> |
| #include <glog/logging.h> |
| |
| #include <sstream> |
| #include <tuple> |
| #include <utility> |
| |
| #include "common/config.h" |
| #include "common/status.h" |
| #include "cpp/sync_point.h" |
| #include "io/cache/block_file_cache.h" |
| #include "io/cache/block_file_cache_factory.h" |
| #include "io/cache/file_block.h" |
| #include "io/cache/file_cache_common.h" |
| #include "io/fs/file_writer.h" |
| #include "io/fs/path.h" |
| #include "io/fs/s3_file_bufferpool.h" |
| #include "io/fs/s3_file_system.h" |
| #include "io/fs/s3_obj_storage_client.h" |
| #include "runtime/exec_env.h" |
| #include "util/s3_util.h" |
| |
| namespace doris::io { |
| |
| bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num"); |
| bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written"); |
| bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created"); |
| bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written"); |
| bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing"); |
| bvar::Adder<uint64_t> s3_file_writer_async_close_processing( |
| "s3_file_writer_async_close_processing"); |
| |
| S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket, |
| std::string key, const FileWriterOptions* opts) |
| : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key), |
| .bucket = std::move(bucket), |
| .key = std::move(key)}), |
| _used_by_s3_committer(opts ? opts->used_by_s3_committer : false), |
| _obj_client(std::move(client)) { |
| s3_file_writer_total << 1; |
| s3_file_being_written << 1; |
| Aws::Http::SetCompliantRfc3986Encoding(true); |
| if (config::enable_file_cache && opts != nullptr && opts->write_file_cache) { |
| _cache_builder = std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder { |
| opts ? opts->is_cold_data : false, opts ? opts->file_cache_expiration : 0, |
| BlockFileCache::hash(_obj_storage_path_opts.path.filename().native()), |
| FileCacheFactory::instance()->get_by_path( |
| BlockFileCache::hash(_obj_storage_path_opts.path.filename().native()))}); |
| } |
| } |
| |
| S3FileWriter::~S3FileWriter() { |
| if (_async_close_pack != nullptr) { |
| // For thread safety |
| std::ignore = _async_close_pack->future.get(); |
| _async_close_pack = nullptr; |
| } else { |
| // Consider one situation where the file writer is destructed after it submit at least one async task |
| // without calling close(), then there exists one occasion where the async task is executed right after |
| // the correspoding S3 file writer is already destructed |
| _wait_until_finish(fmt::format("wait s3 file {} upload to be finished", |
| _obj_storage_path_opts.path.native())); |
| } |
| // We won't do S3 abort operation in BE, we let s3 service do it own. |
| if (state() == State::OPENED && !_failed) { |
| s3_bytes_written_total << _bytes_appended; |
| } |
| s3_file_being_written << -1; |
| } |
| |
| Status S3FileWriter::_create_multi_upload_request() { |
| LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native(); |
| const auto& client = _obj_client->get(); |
| if (nullptr == client) { |
| return Status::InternalError<false>("invalid obj storage client"); |
| } |
| auto resp = client->create_multipart_upload(_obj_storage_path_opts); |
| if (resp.resp.status.code == ErrorCode::OK) { |
| _obj_storage_path_opts.upload_id = resp.upload_id; |
| } |
| return {resp.resp.status.code, std::move(resp.resp.status.msg)}; |
| } |
| |
| void S3FileWriter::_wait_until_finish(std::string_view task_name) { |
| auto timeout_duration = config::s3_file_writer_log_interval_second; |
| auto msg = fmt::format( |
| "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}", |
| task_name, timeout_duration, _obj_storage_path_opts.bucket, |
| _obj_storage_path_opts.path.native(), |
| _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : ""); |
| timespec current_time; |
| // We don't need high accuracy here, so we use time(nullptr) |
| // since it's the fastest way to get current time(second) |
| auto current_time_second = time(nullptr); |
| current_time.tv_sec = current_time_second + timeout_duration; |
| current_time.tv_nsec = 0; |
| // bthread::countdown_event::timed_wait() should use absolute time |
| while (0 != _countdown_event.timed_wait(current_time)) { |
| current_time.tv_sec += timeout_duration; |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| Status S3FileWriter::close(bool non_block) { |
| if (state() == State::CLOSED) { |
| return Status::InternalError("S3FileWriter already closed, file path {}, file key {}", |
| _obj_storage_path_opts.path.native(), |
| _obj_storage_path_opts.key); |
| } |
| if (state() == State::ASYNC_CLOSING) { |
| if (non_block) { |
| return Status::InternalError("Don't submit async close multi times"); |
| } |
| CHECK(_async_close_pack != nullptr); |
| _st = _async_close_pack->future.get(); |
| _async_close_pack = nullptr; |
| // We should wait for all the pre async task to be finished |
| _state = State::CLOSED; |
| // The next time we call close() with no matter non_block true or false, it would always return the |
| // '_st' value because this writer is already closed. |
| return _st; |
| } |
| if (non_block) { |
| _state = State::ASYNC_CLOSING; |
| _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); |
| _async_close_pack->future = _async_close_pack->promise.get_future(); |
| s3_file_writer_async_close_queuing << 1; |
| return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { |
| s3_file_writer_async_close_queuing << -1; |
| s3_file_writer_async_close_processing << 1; |
| _async_close_pack->promise.set_value(_close_impl()); |
| s3_file_writer_async_close_processing << -1; |
| }); |
| } |
| _st = _close_impl(); |
| _state = State::CLOSED; |
| return _st; |
| } |
| |
| bool S3FileWriter::_complete_part_task_callback(Status s) { |
| bool ret = false; |
| if (!s.ok()) [[unlikely]] { |
| VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key |
| << ", status: " << s.to_string(); |
| std::unique_lock<std::mutex> _lck {_completed_lock}; |
| _failed = true; |
| ret = true; |
| _st = std::move(s); |
| } |
| // After the signal, there is a scenario where the previous invocation of _wait_until_finish |
| // returns to the caller, and subsequently, the S3 file writer is destructed. |
| // This means that accessing _failed afterwards would result in a heap use after free vulnerability. |
| _countdown_event.signal(); |
| return ret; |
| } |
| |
| Status S3FileWriter::_build_upload_buffer() { |
| auto builder = FileBufferBuilder(); |
| builder.set_type(BufferType::UPLOAD) |
| .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
| _upload_one_part(part_num, buf); |
| }) |
| .set_file_offset(_bytes_appended) |
| .set_sync_after_complete_task([this](auto&& PH1) { |
| return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1)); |
| }) |
| .set_is_cancelled([this]() { return _failed.load(); }); |
| if (_cache_builder != nullptr) { |
| // We would load the data into file cache asynchronously which indicates |
| // that this instance of S3FileWriter might have been destructed when we |
| // try to do writing into file cache, so we make the lambda capture the variable |
| // we need by value to extend their lifetime |
| builder.set_allocate_file_blocks_holder( |
| [builder = *_cache_builder, offset = _bytes_appended]() -> FileBlocksHolderPtr { |
| return builder.allocate_cache_holder(offset, config::s3_write_buffer_size); |
| }); |
| } |
| RETURN_IF_ERROR(builder.build(&_pending_buf)); |
| auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
| DCHECK(buf != nullptr); |
| return Status::OK(); |
| } |
| |
| Status S3FileWriter::_close_impl() { |
| VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); |
| |
| if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size |
| RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); |
| } |
| |
| if (_bytes_appended == 0) { |
| DCHECK_EQ(_cur_part_num, 1); |
| // No data written, but need to create an empty file |
| RETURN_IF_ERROR(_build_upload_buffer()); |
| if (!_used_by_s3_committer) { |
| auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
| pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); }); |
| } else { |
| RETURN_IF_ERROR(_create_multi_upload_request()); |
| } |
| } |
| |
| if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded |
| _countdown_event.add_count(); |
| RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); |
| _pending_buf = nullptr; |
| } |
| |
| RETURN_IF_ERROR(_complete()); |
| SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status()); |
| |
| return Status::OK(); |
| } |
| |
| Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { |
| if (state() != State::OPENED) [[unlikely]] { |
| return Status::InternalError("append to closed file: {}", |
| _obj_storage_path_opts.path.native()); |
| } |
| |
| size_t buffer_size = config::s3_write_buffer_size; |
| TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status()); |
| for (size_t i = 0; i < data_cnt; i++) { |
| size_t data_size = data[i].get_size(); |
| for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) { |
| if (_failed) { |
| return _st; |
| } |
| if (!_pending_buf) { |
| RETURN_IF_ERROR(_build_upload_buffer()); |
| } |
| // we need to make sure all parts except the last one to be 5MB or more |
| // and shouldn't be larger than buf |
| data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() + |
| buffer_size - _bytes_appended); |
| |
| // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache |
| // it would be written to cache then S3 if the buffer doesn't have memory preserved |
| RETURN_IF_ERROR(_pending_buf->append_data( |
| Slice {data[i].get_data() + pos, data_size_to_append})); |
| TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num); |
| |
| // If this is the last part and the data size is less than s3_write_buffer_size, |
| // the pending_buf will be handled by _close_impl() and _complete() |
| // If this is the last part and the data size is equal to s3_write_buffer_size, |
| // the pending_buf is handled here and submitted. it will be waited by _complete() |
| if (_pending_buf->get_size() == buffer_size) { |
| // only create multiple upload request when the data size is |
| // larger or equal to s3_write_buffer_size than one memory buffer |
| if (_cur_part_num == 1) { |
| RETURN_IF_ERROR(_create_multi_upload_request()); |
| } |
| _cur_part_num++; |
| _countdown_event.add_count(); |
| RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); |
| _pending_buf = nullptr; |
| } |
| _bytes_appended += data_size_to_append; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { |
| VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native() |
| << " part=" << part_num; |
| if (buf.is_cancelled()) { |
| LOG_INFO("file {} skip part {} because previous failure {}", |
| _obj_storage_path_opts.path.native(), part_num, _st); |
| return; |
| } |
| const auto& client = _obj_client->get(); |
| if (nullptr == client) { |
| LOG_WARNING("failed at key: {}, load part {} bacause of invalid obj client", |
| _obj_storage_path_opts.key, part_num); |
| buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
| return; |
| } |
| auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num); |
| if (resp.resp.status.code != ErrorCode::OK) { |
| LOG_INFO("failed at key: {}, load part {}, st {}", _obj_storage_path_opts.key, part_num, |
| resp.resp.status.msg); |
| buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg))); |
| return; |
| } |
| s3_bytes_written_total << buf.get_size(); |
| |
| ObjectCompleteMultiPart completed_part { |
| static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""}; |
| |
| std::unique_lock<std::mutex> lck {_completed_lock}; |
| _completed_parts.emplace_back(std::move(completed_part)); |
| } |
| |
| Status S3FileWriter::_complete() { |
| const auto& client = _obj_client->get(); |
| if (nullptr == client) { |
| return Status::InternalError<false>("invalid obj storage client"); |
| } |
| if (_failed) { |
| _wait_until_finish("early quit"); |
| return _st; |
| } |
| // When the part num is only one, it means the data is less than 5MB so we can just put it. |
| if (_cur_part_num == 1) { |
| _wait_until_finish("PutObject"); |
| return _st; |
| } |
| // Wait multipart load and finish. |
| _wait_until_finish("Complete"); |
| TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1", |
| std::make_pair(&_failed, &_completed_parts)); |
| if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. |
| s3_file_created_total << 1; // Assume that it will be created successfully |
| return Status::OK(); |
| } |
| |
| // check number of parts |
| int expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) + |
| !!(_bytes_appended % config::s3_write_buffer_size); |
| int expected_num_parts2 = |
| (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1; |
| DCHECK_EQ(expected_num_parts1, expected_num_parts2) |
| << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num |
| << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
| if (_failed || _completed_parts.size() != expected_num_parts1 || |
| expected_num_parts1 != expected_num_parts2) { |
| _st = Status::InternalError( |
| "error status={} failed={} #complete_parts={} #expected_parts={} " |
| "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}", |
| _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(), |
| _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); |
| LOG(WARNING) << _st; |
| return _st; |
| } |
| // make sure _completed_parts are ascending order |
| std::sort(_completed_parts.begin(), _completed_parts.end(), |
| [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); |
| TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); |
| LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native() |
| << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size() |
| << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
| auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); |
| if (resp.status.code != ErrorCode::OK) { |
| LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg, |
| _obj_storage_path_opts.path.native()); |
| return {resp.status.code, std::move(resp.status.msg)}; |
| } |
| s3_file_created_total << 1; |
| return Status::OK(); |
| } |
| |
| Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { |
| auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
| DCHECK(buf != nullptr); |
| if (_used_by_s3_committer) { |
| // If used_by_s3_committer, we always use multi-parts uploading. |
| buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
| _upload_one_part(part_num, buf); |
| }); |
| DCHECK(_cur_part_num == 1); |
| RETURN_IF_ERROR(_create_multi_upload_request()); |
| } else { |
| // if we only need to upload one file less than 5MB, we can just |
| // call PutObject to reduce the network IO |
| buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); |
| } |
| return Status::OK(); |
| } |
| |
| void S3FileWriter::_put_object(UploadFileBuffer& buf) { |
| LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native() |
| << " size=" << _bytes_appended; |
| if (state() == State::CLOSED) { |
| DCHECK(state() != State::CLOSED) |
| << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native(); |
| LOG_WARNING("failed to put object because file closed, file path {}", |
| _obj_storage_path_opts.path.native()); |
| buf.set_status(Status::InternalError<false>("try to put closed file")); |
| return; |
| } |
| const auto& client = _obj_client->get(); |
| if (nullptr == client) { |
| buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
| return; |
| } |
| TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf); |
| auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data()); |
| if (resp.status.code != ErrorCode::OK) { |
| LOG_WARNING("put object failed because {}, file path {}", resp.status.msg, |
| _obj_storage_path_opts.path.native()); |
| buf.set_status({resp.status.code, std::move(resp.status.msg)}); |
| return; |
| } |
| s3_file_created_total << 1; |
| } |
| |
| std::string S3FileWriter::_dump_completed_part() const { |
| std::stringstream ss; |
| ss << "part_numbers:"; |
| for (const auto& part : _completed_parts) { |
| ss << " " << part.part_num; |
| } |
| return ss.str(); |
| } |
| |
| } // namespace doris::io |