blob: 0e278dff0c884757071ba034b5900f9730f5603d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/hdfs_file_reader.h"
#include <stdint.h>
#include <algorithm>
#include <filesystem>
#include <ostream>
#include <utility>
#include "bvar/latency_recorder.h"
#include "bvar/reducer.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "io/fs/err_utils.h"
#include "io/hdfs_util.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
namespace doris::io {
#include "common/compile_check_begin.h"
bvar::Adder<uint64_t> hdfs_bytes_read_total("hdfs_file_reader", "bytes_read");
bvar::LatencyRecorder hdfs_bytes_per_read("hdfs_file_reader", "bytes_per_read"); // also QPS
bvar::PerSecond<bvar::Adder<uint64_t>> hdfs_read_througthput("hdfs_file_reader",
"hdfs_read_throughput",
&hdfs_bytes_read_total);
namespace {
Result<FileHandleCache::Accessor> get_file(const hdfsFS& fs, const Path& file, int64_t mtime,
int64_t file_size) {
static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16,
config::max_hdfs_file_handle_cache_time_sec);
bool cache_hit;
FileHandleCache::Accessor accessor;
RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false,
&accessor, &cache_hit));
return accessor;
}
} // namespace
Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs, std::string fs_name,
const FileReaderOptions& opts,
RuntimeProfile* profile) {
auto path = convert_path(full_path, fs_name);
return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) {
return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name),
std::move(accessor), profile);
});
}
HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
RuntimeProfile* profile)
: _path(std::move(path)),
_fs_name(std::move(fs_name)),
_accessor(std::move(accessor)),
_profile(profile) {
_handle = _accessor.get();
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
if (_profile != nullptr && is_hdfs(_fs_name)) {
#ifdef USE_HADOOP_HDFS
const char* hdfs_profile_name = "HdfsIO";
ADD_TIMER(_profile, hdfs_profile_name);
_hdfs_profile.total_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_local_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
_profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
_profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_hedged_read =
ADD_CHILD_COUNTER(_profile, "TotalHedgedRead", TUnit::UNIT, hdfs_profile_name);
_hdfs_profile.hedged_read_in_cur_thread = ADD_CHILD_COUNTER(
_profile, "HedgedReadInCurThread", TUnit::UNIT, hdfs_profile_name);
_hdfs_profile.hedged_read_wins =
ADD_CHILD_COUNTER(_profile, "HedgedReadWins", TUnit::UNIT, hdfs_profile_name);
#endif
}
}
HdfsFileReader::~HdfsFileReader() {
static_cast<void>(close());
}
Status HdfsFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
}
return Status::OK();
}
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
auto st = do_read_at_impl(offset, result, bytes_read, io_ctx);
if (!st.ok()) {
_handle = nullptr;
_accessor.destroy();
}
return st;
}
#ifdef USE_HADOOP_HDFS
Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
if (closed()) [[unlikely]] {
return Status::InternalError("read closed file: {}", _path.native());
}
if (_handle == nullptr) [[unlikely]] {
return Status::InternalError("cached hdfs file handle has been destroyed: {}",
_path.native());
}
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
offset, _handle->file_size(), _path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset));
*bytes_read = 0;
if (UNLIKELY(bytes_req == 0)) {
return Status::OK();
}
LIMIT_REMOTE_SCAN_IO(bytes_read);
size_t has_read = 0;
while (has_read < bytes_req) {
int64_t max_to_read = bytes_req - has_read;
tSize to_read = static_cast<tSize>(
std::min(max_to_read, static_cast<int64_t>(std::numeric_limits<tSize>::max())));
tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read,
to + has_read, to_read);
{
[[maybe_unused]] Status error_ret;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret);
}
if (loop_read < 0) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = hdfs_error();
if (_err_msg.find("No such file or directory") != std::string::npos) {
return Status::NotFound(_err_msg);
}
return Status::InternalError(
"Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg);
}
if (loop_read == 0) {
break;
}
has_read += loop_read;
}
*bytes_read = has_read;
hdfs_bytes_read_total << *bytes_read;
hdfs_bytes_per_read << *bytes_read;
return Status::OK();
}
#else
// The hedged read only support hdfsPread().
// TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead()
Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
if (closed()) [[unlikely]] {
return Status::InternalError("read closed file: ", _path.native());
}
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
offset, _handle->file_size(), _path.native());
}
int res = hdfsSeek(_handle->fs(), _handle->file(), offset);
if (res != 0) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = hdfs_error();
if (_err_msg.find("No such file or directory") != std::string::npos) {
return Status::NotFound(_err_msg);
}
return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}",
BackendOptions::get_localhost(), offset, _err_msg);
}
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset));
*bytes_read = 0;
if (UNLIKELY(bytes_req == 0)) {
return Status::OK();
}
LIMIT_REMOTE_SCAN_IO(bytes_read);
size_t has_read = 0;
while (has_read < bytes_req) {
int64_t loop_read = hdfsRead(_handle->fs(), _handle->file(), to + has_read,
static_cast<int32_t>(bytes_req - has_read));
if (loop_read < 0) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = hdfs_error();
if (_err_msg.find("No such file or directory") != std::string::npos) {
return Status::NotFound(_err_msg);
}
return Status::InternalError(
"Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg);
}
if (loop_read == 0) {
break;
}
has_read += loop_read;
}
*bytes_read = has_read;
hdfs_bytes_read_total << *bytes_read;
hdfs_bytes_per_read << *bytes_read;
return Status::OK();
}
#endif
void HdfsFileReader::_collect_profile_before_close() {
if (_profile != nullptr && is_hdfs(_fs_name)) {
#ifdef USE_HADOOP_HDFS
if (_handle == nullptr) [[unlikely]] {
return;
}
struct hdfsReadStatistics* hdfs_statistics = nullptr;
auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
if (r != 0) {
LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r
<< ", name node: " << _fs_name;
return;
}
COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead);
COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead);
COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
hdfs_statistics->totalShortCircuitBytesRead);
COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
hdfs_statistics->totalZeroCopyBytesRead);
hdfsFileFreeReadStatistics(hdfs_statistics);
struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr;
r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics);
if (r != 0) {
LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r
<< ", name node: " << _fs_name;
return;
}
COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps);
COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
hdfs_hedged_read_statistics->hedgedReadOpsWin);
hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
hdfsFileClearReadStatistics(_handle->file());
#endif
}
}
#include "common/compile_check_end.h"
} // namespace doris::io