blob: 6488a39e36bf87eb7583cd526e86f1c48768f31b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/s3_file_reader.h"
#include <aws/core/http/URI.h>
#include <aws/core/utils/Outcome.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/GetObjectResult.h>
#include <bvar/latency_recorder.h>
#include <bvar/reducer.h>
#include <fmt/format.h>
#include <glog/logging.h>
#include <algorithm>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "io/cache/block_file_cache.h"
#include "io/fs/err_utils.h"
#include "io/fs/obj_storage_client.h"
#include "io/fs/s3_common.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"
namespace doris::io {
bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read");
bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request");
bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS
bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput",
&s3_bytes_read_total);
// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only
// record successfull request, and s3_get_request_qps will record all request.
bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request",
&s3_file_reader_read_counter);
bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency");
Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolder> client,
std::string bucket, std::string key, int64_t file_size,
RuntimeProfile* profile) {
if (file_size < 0) {
auto res = client->object_file_size(bucket, key);
if (!res.has_value()) {
return ResultError(std::move(res.error()));
}
file_size = res.value();
}
return std::make_shared<S3FileReader>(std::move(client), std::move(bucket), std::move(key),
file_size, profile);
}
S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket,
std::string key, size_t file_size, RuntimeProfile* profile)
: _path(fmt::format("s3://{}/{}", bucket, key)),
_file_size(file_size),
_bucket(std::move(bucket)),
_key(std::move(key)),
_client(std::move(client)),
_profile(profile) {
DorisMetrics::instance()->s3_file_open_reading->increment(1);
DorisMetrics::instance()->s3_file_reader_total->increment(1);
s3_file_reader_total << 1;
s3_file_being_read << 1;
Aws::Http::SetCompliantRfc3986Encoding(true);
}
S3FileReader::~S3FileReader() {
static_cast<void>(close());
s3_file_being_read << -1;
}
Status S3FileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
DorisMetrics::instance()->s3_file_open_reading->increment(-1);
}
return Status::OK();
}
Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
"offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size,
_path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
VLOG_DEBUG << fmt::format("S3FileReader::read_at_impl offset={} size={} path={} hash={}",
offset, result.size, _path.native(),
io::BlockFileCache::hash(_path.native()).to_string());
VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req
<< " req=" << result.size << " file size=" << _file_size;
if (UNLIKELY(bytes_req == 0)) {
*bytes_read = 0;
return Status::OK();
}
auto client = _client->get();
if (!client) {
return Status::InternalError("init s3 client error");
}
int retry_count = 0;
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff
int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
LIMIT_REMOTE_SCAN_IO(bytes_read);
DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", {
auto sleep_time = dp->param("sleep", 3);
LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {} s", sleep_time)
.tag("bucket", _bucket)
.tag("key", _key);
std::this_thread::sleep_for(std::chrono::microseconds(sleep_time));
});
Defer defer_latency {[&]() {
int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
s3_file_reader_latency << (end_ts - begin_ts);
}};
SCOPED_RAW_TIMER(&_s3_stats.total_get_request_time_ns);
int total_sleep_time = 0;
while (retry_count <= max_retries) {
*bytes_read = 0;
s3_file_reader_read_counter << 1;
// clang-format off
auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
to, offset, bytes_req, bytes_read);
// clang-format on
_s3_stats.total_get_request_counter++;
if (resp.status.code != ErrorCode::OK) {
if (resp.http_code ==
static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) {
s3_file_reader_too_many_request_counter << 1;
retry_count++;
int wait_time = std::min(base_wait_time * (1 << retry_count),
max_wait_time); // Exponential backoff
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
_s3_stats.too_many_request_err_counter++;
_s3_stats.too_many_request_sleep_time_ms += wait_time;
total_sleep_time += wait_time;
continue;
} else {
// Handle other errors
return std::move(Status(resp.status.code, std::move(resp.status.msg))
.append("failed to read"));
}
}
if (*bytes_read != bytes_req) {
std::string msg = fmt::format(
"failed to get object, path={} offset={} bytes_req={} bytes_read={} "
"file_size={} tries={}",
_path.native(), offset, bytes_req, *bytes_read, _file_size, (retry_count + 1));
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
_s3_stats.total_bytes_read += bytes_req;
s3_bytes_read_total << bytes_req;
s3_bytes_per_read << bytes_req;
DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req);
if (retry_count > 0) {
LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping",
_path.native(), retry_count, total_sleep_time);
}
return Status::OK();
}
std::string msg = fmt::format(
"failed to get object, path={} offset={} bytes_req={} bytes_read={} file_size={} "
"tries={}",
_path.native(), offset, bytes_req, *bytes_read, _file_size, (max_retries + 1));
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
void S3FileReader::_collect_profile_before_close() {
if (_profile != nullptr) {
const char* s3_profile_name = "S3Profile";
ADD_TIMER(_profile, s3_profile_name);
RuntimeProfile::Counter* total_get_request_counter =
ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name);
RuntimeProfile::Counter* too_many_request_err_counter =
ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name);
RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER(
_profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name);
RuntimeProfile::Counter* total_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name);
RuntimeProfile::Counter* total_get_request_time_ns =
ADD_CHILD_TIMER(_profile, "TotalGetRequestTime", s3_profile_name);
COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter);
COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter);
COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms);
COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read);
COUNTER_UPDATE(total_get_request_time_ns, _s3_stats.total_get_request_time_ns);
}
}
} // namespace doris::io