| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "io/fs/s3_file_reader.h" |
| |
| #include <aws/core/http/URI.h> |
| #include <aws/core/utils/Outcome.h> |
| #include <aws/s3/S3Client.h> |
| #include <aws/s3/S3Errors.h> |
| #include <aws/s3/model/GetObjectRequest.h> |
| #include <aws/s3/model/GetObjectResult.h> |
| #include <bvar/latency_recorder.h> |
| #include <bvar/reducer.h> |
| #include <fmt/format.h> |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "io/cache/block_file_cache.h" |
| #include "io/fs/err_utils.h" |
| #include "io/fs/obj_storage_client.h" |
| #include "io/fs/s3_common.h" |
| #include "runtime/thread_context.h" |
| #include "runtime/workload_management/io_throttle.h" |
| #include "util/bvar_helper.h" |
| #include "util/debug_points.h" |
| #include "util/doris_metrics.h" |
| #include "util/runtime_profile.h" |
| #include "util/s3_util.h" |
| |
| namespace doris::io { |
| |
| bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at"); |
| bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num"); |
| bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read"); |
| bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read"); |
| bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request"); |
| bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS |
| bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput", |
| &s3_bytes_read_total); |
| // Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only |
| // record successfull request, and s3_get_request_qps will record all request. |
| bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request", |
| &s3_file_reader_read_counter); |
| bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency"); |
| |
| Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolder> client, |
| std::string bucket, std::string key, int64_t file_size, |
| RuntimeProfile* profile) { |
| if (file_size < 0) { |
| auto res = client->object_file_size(bucket, key); |
| if (!res.has_value()) { |
| return ResultError(std::move(res.error())); |
| } |
| |
| file_size = res.value(); |
| } |
| |
| return std::make_shared<S3FileReader>(std::move(client), std::move(bucket), std::move(key), |
| file_size, profile); |
| } |
| |
| S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket, |
| std::string key, size_t file_size, RuntimeProfile* profile) |
| : _path(fmt::format("s3://{}/{}", bucket, key)), |
| _file_size(file_size), |
| _bucket(std::move(bucket)), |
| _key(std::move(key)), |
| _client(std::move(client)), |
| _profile(profile) { |
| DorisMetrics::instance()->s3_file_open_reading->increment(1); |
| DorisMetrics::instance()->s3_file_reader_total->increment(1); |
| s3_file_reader_total << 1; |
| s3_file_being_read << 1; |
| |
| Aws::Http::SetCompliantRfc3986Encoding(true); |
| } |
| |
| S3FileReader::~S3FileReader() { |
| static_cast<void>(close()); |
| s3_file_being_read << -1; |
| } |
| |
| Status S3FileReader::close() { |
| bool expected = false; |
| if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
| DorisMetrics::instance()->s3_file_open_reading->increment(-1); |
| } |
| return Status::OK(); |
| } |
| |
| Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
| const IOContext* /*io_ctx*/) { |
| DCHECK(!closed()); |
| if (offset > _file_size) { |
| return Status::InternalError( |
| "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, |
| _path.native()); |
| } |
| size_t bytes_req = result.size; |
| char* to = result.data; |
| bytes_req = std::min(bytes_req, _file_size - offset); |
| VLOG_DEBUG << fmt::format("S3FileReader::read_at_impl offset={} size={} path={} hash={}", |
| offset, result.size, _path.native(), |
| io::BlockFileCache::hash(_path.native()).to_string()); |
| VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req |
| << " req=" << result.size << " file size=" << _file_size; |
| if (UNLIKELY(bytes_req == 0)) { |
| *bytes_read = 0; |
| return Status::OK(); |
| } |
| |
| auto client = _client->get(); |
| if (!client) { |
| return Status::InternalError("init s3 client error"); |
| } |
| |
| int retry_count = 0; |
| const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds |
| const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds |
| const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff |
| |
| int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| LIMIT_REMOTE_SCAN_IO(bytes_read); |
| DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", { |
| auto sleep_time = dp->param("sleep", 3); |
| LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {} s", sleep_time) |
| .tag("bucket", _bucket) |
| .tag("key", _key); |
| std::this_thread::sleep_for(std::chrono::microseconds(sleep_time)); |
| }); |
| Defer defer_latency {[&]() { |
| int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| s3_file_reader_latency << (end_ts - begin_ts); |
| }}; |
| SCOPED_RAW_TIMER(&_s3_stats.total_get_request_time_ns); |
| |
| int total_sleep_time = 0; |
| while (retry_count <= max_retries) { |
| *bytes_read = 0; |
| s3_file_reader_read_counter << 1; |
| // clang-format off |
| auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, |
| to, offset, bytes_req, bytes_read); |
| // clang-format on |
| _s3_stats.total_get_request_counter++; |
| if (resp.status.code != ErrorCode::OK) { |
| if (resp.http_code == |
| static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) { |
| s3_file_reader_too_many_request_counter << 1; |
| retry_count++; |
| int wait_time = std::min(base_wait_time * (1 << retry_count), |
| max_wait_time); // Exponential backoff |
| std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); |
| _s3_stats.too_many_request_err_counter++; |
| _s3_stats.too_many_request_sleep_time_ms += wait_time; |
| total_sleep_time += wait_time; |
| continue; |
| } else { |
| // Handle other errors |
| return std::move(Status(resp.status.code, std::move(resp.status.msg)) |
| .append("failed to read")); |
| } |
| } |
| if (*bytes_read != bytes_req) { |
| std::string msg = fmt::format( |
| "failed to get object, path={} offset={} bytes_req={} bytes_read={} " |
| "file_size={} tries={}", |
| _path.native(), offset, bytes_req, *bytes_read, _file_size, (retry_count + 1)); |
| LOG(WARNING) << msg; |
| return Status::InternalError(msg); |
| } |
| _s3_stats.total_bytes_read += bytes_req; |
| s3_bytes_read_total << bytes_req; |
| s3_bytes_per_read << bytes_req; |
| DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req); |
| if (retry_count > 0) { |
| LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", |
| _path.native(), retry_count, total_sleep_time); |
| } |
| return Status::OK(); |
| } |
| std::string msg = fmt::format( |
| "failed to get object, path={} offset={} bytes_req={} bytes_read={} file_size={} " |
| "tries={}", |
| _path.native(), offset, bytes_req, *bytes_read, _file_size, (max_retries + 1)); |
| LOG(WARNING) << msg; |
| return Status::InternalError(msg); |
| } |
| |
| void S3FileReader::_collect_profile_before_close() { |
| if (_profile != nullptr) { |
| const char* s3_profile_name = "S3Profile"; |
| ADD_TIMER(_profile, s3_profile_name); |
| RuntimeProfile::Counter* total_get_request_counter = |
| ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name); |
| RuntimeProfile::Counter* too_many_request_err_counter = |
| ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name); |
| RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER( |
| _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name); |
| RuntimeProfile::Counter* total_bytes_read = |
| ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name); |
| RuntimeProfile::Counter* total_get_request_time_ns = |
| ADD_CHILD_TIMER(_profile, "TotalGetRequestTime", s3_profile_name); |
| |
| COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter); |
| COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter); |
| COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms); |
| COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read); |
| COUNTER_UPDATE(total_get_request_time_ns, _s3_stats.total_get_request_time_ns); |
| } |
| } |
| |
| } // namespace doris::io |