blob: 8de638f81d8e147fdd8760605eefc30d54d3e011 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/hdfs_accessor.h"
#include <bvar/latency_recorder.h>
#include <gen_cpp/cloud.pb.h>
#include "common/stopwatch.h"
#include "recycler/util.h"
#ifdef USE_HADOOP_HDFS
#include <hadoop_hdfs/hdfs.h> // IWYU pragma: export
#else
#include <hdfs/hdfs.h> // IWYU pragma: export
#endif
#include <string_view>
#include "common/config.h"
#include "common/defer.h"
#include "common/logging.h"
#include "common/string_util.h"
#include "cpp/sync_point.h"
#include "recycler/storage_vault_accessor.h"
namespace doris::cloud {
namespace {
std::string hdfs_error() {
#ifdef USE_HADOOP_HDFS
const char* err_msg = hdfsGetLastExceptionRootCause();
#else
const char* err_msg = hdfsGetLastError();
#endif
return fmt::format("({}): {}", std::strerror(errno), err_msg ? err_msg : "");
}
bvar::LatencyRecorder hdfs_write_latency("hdfs_write");
bvar::LatencyRecorder hdfs_open_latency("hdfs_open");
bvar::LatencyRecorder hdfs_close_latency("hdfs_close");
bvar::LatencyRecorder hdfs_list_dir("hdfs_list_dir");
bvar::LatencyRecorder hdfs_exist_latency("hdfs_exist");
bvar::LatencyRecorder hdfs_delete_latency("hdfs_delete");
} // namespace
class HDFSBuilder {
public:
~HDFSBuilder() {
if (hdfs_builder_ != nullptr) {
hdfsFreeBuilder(hdfs_builder_);
}
}
// TODO(plat1ko): template <class Params>
static HdfsSPtr create_fs(const HdfsBuildConf& params) {
HDFSBuilder builder;
int ret = builder.init_hdfs_builder();
if (ret != 0) {
return nullptr;
}
ret = builder.init(params);
if (ret != 0) {
return nullptr;
}
auto* fs = hdfsBuilderConnect(builder.hdfs_builder_);
#ifdef USE_HADOOP_HDFS
// For hadoop hdfs, the `hdfs_builder_` will be freed in hdfsBuilderConnect
builder.hdfs_builder_ = nullptr;
#endif
if (!fs) {
LOG(WARNING) << "failed to connect hdfs: " << hdfs_error();
return nullptr;
}
return {fs, [fs_name = params.fs_name()](auto fs) {
LOG_INFO("disconnect hdfs").tag("fs_name", fs_name);
hdfsDisconnect(fs);
}};
}
private:
HDFSBuilder() = default;
// returns 0 for success otherwise error
int init(const HdfsBuildConf& conf) {
DCHECK(hdfs_builder_);
hdfsBuilderSetNameNode(hdfs_builder_, conf.fs_name().c_str());
// set kerberos conf
bool kerberos_login = false;
if (conf.has_hdfs_kerberos_keytab()) {
kerberos_login = true;
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKerb5Conf(hdfs_builder_, config::kerberos_krb5_conf_path.c_str());
hdfsBuilderSetKeyTabFile(hdfs_builder_, conf.hdfs_kerberos_keytab().c_str());
#endif
}
if (conf.has_hdfs_kerberos_principal()) {
kerberos_login = true;
hdfsBuilderSetPrincipal(hdfs_builder_, conf.hdfs_kerberos_principal().c_str());
} else if (conf.has_user()) {
hdfsBuilderSetUserName(hdfs_builder_, conf.user().c_str());
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKerb5Conf(hdfs_builder_, nullptr);
hdfsBuilderSetKeyTabFile(hdfs_builder_, nullptr);
#endif
}
// set other conf
for (const auto& kv : conf.hdfs_confs()) {
hdfsBuilderConfSetStr(hdfs_builder_, kv.key().c_str(), kv.value().c_str());
#ifdef USE_HADOOP_HDFS
// Set krb5.conf, we should define java.security.krb5.conf in catalog properties
if (kv.key() == "java.security.krb5.conf") {
hdfsBuilderSetKerb5Conf(hdfs_builder_, kv.value().c_str());
}
#endif
}
if (kerberos_login) {
int ret = check_krb_params(conf.hdfs_kerberos_principal(), conf.hdfs_kerberos_keytab());
if (ret != 0) {
return ret;
}
}
hdfsBuilderConfSetStr(hdfs_builder_, "ipc.client.fallback-to-simple-auth-allowed", "true");
return 0;
}
// returns 0 for success otherwise error
int init_hdfs_builder() {
hdfs_builder_ = hdfsNewBuilder();
if (hdfs_builder_ == nullptr) {
LOG(WARNING) << "failed to init HDFSBuilder, please check check be/conf/hdfs-site.xml";
return -1;
}
hdfsBuilderSetForceNewInstance(hdfs_builder_);
return 0;
}
// returns 0 for success otherwise error
int check_krb_params(std::string_view hdfs_kerberos_principal,
std::string_view hdfs_kerberos_keytab) {
const auto& ticket_path = config::kerberos_ccache_path;
if (!ticket_path.empty()) {
hdfsBuilderConfSetStr(hdfs_builder_, "hadoop.security.kerberos.ticket.cache.path",
ticket_path.c_str());
return 0;
}
// we should check hdfs_kerberos_principal and hdfs_kerberos_keytab nonnull to login kdc.
if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
LOG(WARNING) << "Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab";
return -1;
}
// enable auto-renew thread
hdfsBuilderConfSetStr(hdfs_builder_, "hadoop.kerberos.keytab.login.autorenewal.enabled",
"true");
return 0;
}
hdfsBuilder* hdfs_builder_ = nullptr;
};
class HdfsListIterator final : public ListIterator {
private:
class DirEntries {
public:
DirEntries() = default;
DirEntries(hdfsFileInfo* entries, size_t num_entries)
: entries_(entries), num_entries_(num_entries) {
DCHECK_EQ(!!entries, num_entries > 0);
TEST_SYNC_POINT_CALLBACK("DirEntries", &num_entries_);
}
~DirEntries() {
if (entries_) {
TEST_SYNC_POINT_CALLBACK("~DirEntries", &num_entries_);
hdfsFreeFileInfo(entries_, num_entries_);
}
}
DirEntries(const DirEntries&) = delete;
DirEntries& operator=(const DirEntries&) = delete;
DirEntries(DirEntries&& rhs) noexcept
: entries_(rhs.entries_), num_entries_(rhs.num_entries_), offset_(rhs.offset_) {
rhs.entries_ = nullptr;
rhs.num_entries_ = 0;
rhs.offset_ = 0;
}
DirEntries& operator=(DirEntries&& rhs) noexcept {
std::swap(entries_, rhs.entries_);
std::swap(num_entries_, rhs.num_entries_);
std::swap(offset_, rhs.offset_);
return *this;
}
bool empty() const { return offset_ >= num_entries_; }
hdfsFileInfo& front() { return entries_[offset_]; }
void pop_front() { ++offset_; }
private:
hdfsFileInfo* entries_ {nullptr};
size_t num_entries_ {0};
size_t offset_ {0};
};
public:
HdfsListIterator(HdfsSPtr hdfs, std::string dir_path, std::string uri)
: hdfs_(std::move(hdfs)), dir_path_(std::move(dir_path)), uri_(std::move(uri)) {}
~HdfsListIterator() override = default;
bool is_valid() override { return is_valid_; }
bool has_next() override {
if (!is_valid_) {
return false;
}
if (!has_init_) {
has_init_ = true;
auto next_level_dir_entries = list_directory(dir_path_.c_str());
if (!next_level_dir_entries) {
is_valid_ = false;
return false;
}
if (!next_level_dir_entries->empty()) {
level_dirs_.emplace_back(std::move(next_level_dir_entries.value()));
}
}
while (!level_dirs_.empty()) {
auto& dir_entries = level_dirs_.back();
auto& entry = dir_entries.front();
if (entry.mKind == kObjectKindFile) {
return true;
}
// entry is a dir
auto next_level_dir_entries = list_directory(entry.mName);
if (!next_level_dir_entries) {
is_valid_ = false;
return false;
}
dir_entries.pop_front();
if (dir_entries.empty()) {
level_dirs_.pop_back();
}
if (!next_level_dir_entries->empty()) {
level_dirs_.emplace_back(std::move(next_level_dir_entries.value()));
}
}
return false;
}
std::optional<FileMeta> next() override {
std::optional<FileMeta> res;
if (!has_next()) {
return res;
}
auto& dir_entries = level_dirs_.back();
auto& entry = dir_entries.front();
DCHECK_EQ(entry.mKind, kObjectKindFile) << entry.mName;
std::string_view path {entry.mName};
DCHECK(path.starts_with(uri_)) << path << ' ' << uri_;
path.remove_prefix(uri_.length());
res = FileMeta {.path = std::string(path), .size = entry.mSize, .mtime_s = entry.mLastMod};
dir_entries.pop_front();
if (dir_entries.empty()) {
level_dirs_.pop_back();
}
return res;
}
private:
// Return null if error occured, return emtpy DirEntries if dir is empty or doesn't exist.
std::optional<DirEntries> list_directory(const char* dir_path) {
int num_entries = 0;
SCOPED_BVAR_LATENCY(hdfs_list_dir);
auto* file_infos = hdfsListDirectory(hdfs_.get(), dir_path, &num_entries);
if (errno != 0 && errno != ENOENT) {
LOG_WARNING("failed to list hdfs directory")
.tag("uri", uri_)
.tag("dir_path", dir_path)
.tag("error", hdfs_error());
return std::nullopt;
}
return DirEntries {file_infos, static_cast<size_t>(num_entries)};
}
HdfsSPtr hdfs_;
std::string dir_path_; // absolute path start with '/'
std::string uri_; // {fs_name}/{prefix}/
bool is_valid_ {true};
bool has_init_ {false};
std::vector<DirEntries> level_dirs_;
};
HdfsAccessor::HdfsAccessor(const HdfsVaultInfo& info)
: StorageVaultAccessor(AccessorType::HDFS), info_(info), prefix_(info.prefix()) {
if (!prefix_.empty() && prefix_[0] != '/') {
prefix_.insert(prefix_.begin(), '/');
}
uri_ = info.build_conf().fs_name() + prefix_;
}
HdfsAccessor::~HdfsAccessor() = default;
std::string HdfsAccessor::to_fs_path(const std::string& relative_path) {
return prefix_ + '/' + relative_path;
}
std::string HdfsAccessor::to_uri(const std::string& relative_path) {
return uri_ + '/' + relative_path;
}
// extract parent path from prefix
// e.g.
// data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_ -> data/492211/
std::string extract_parent_path(const std::string& path) {
// Find the last '/'
size_t last_slash = path.find_last_of('/');
if (last_slash == std::string::npos) {
LOG_WARNING("no '/' found in path").tag("path", path);
return "";
}
return path.substr(0, last_slash + 1);
}
int HdfsAccessor::init() {
TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", (int)-1);
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
if (!fs_) {
LOG(WARNING) << "failed to init hdfs accessor. uri=" << uri_;
return -1;
}
return 0;
}
int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) {
auto uri = to_uri(path_prefix);
LOG(INFO) << "delete prefix, uri=" << uri;
// If path prefix exists, assume it is a dir or a file.
if (exists(path_prefix) == 0) {
// If it exists, then it is a dir or a file.
// delete_directory func can delete a dir or a file.
if (delete_directory(path_prefix) == 0) {
LOG(INFO) << "delete prefix succ"
<< ", is dir or file = true"
<< ", uri=" << uri;
return 0;
}
// delete failed, return err
LOG_WARNING("delete prefix failed, this is a dir or a file")
.tag("path prefix", path_prefix);
return -1;
}
// If path prefix is not a dir or a file,
// for example: data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_.
// Then we need to extract the parent id path from the given prefix,
// traverse all files in the parent id path, and delete the files that match the prefix.
std::unique_ptr<ListIterator> list_iter;
auto parent_path = extract_parent_path(path_prefix);
if (parent_path.empty()) {
LOG_WARNING("extract parent path failed").tag("path prefix", path_prefix);
return -1;
}
LOG_INFO("path prefix is not a dir, extract parent path success")
.tag("path prefix", path_prefix)
.tag("parent path", parent_path);
int ret = list_directory(parent_path, &list_iter);
if (ret != 0) {
LOG(WARNING) << "delete prefix, failed to list" << uri;
return ret;
}
size_t num_listed = 0, num_deleted = 0;
for (auto file = list_iter->next(); file; file = list_iter->next()) {
++num_listed;
if (file->path.find(path_prefix) != 0) continue;
if (int del_ret = delete_file(file->path); del_ret != 0) {
ret = del_ret;
break;
}
++num_deleted;
}
if (num_deleted == 0) {
LOG_WARNING("recycler delete prefix num = 0, maybe there are some problems?")
.tag("path prefix", path_prefix);
}
LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret
<< " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted;
return ret;
}
int HdfsAccessor::delete_directory_impl(const std::string& dir_path) {
// FIXME(plat1ko): Can we distinguish between RPC failure and the file not existing via
// `hdfsDelete`'s return value or errno to avoid exist rpc?
int ret = exists(dir_path);
if (ret == 1) {
// dir does not exist
return 0;
} else if (ret < 0) {
return ret;
}
// Path exists
auto path = to_fs_path(dir_path);
LOG_INFO("delete directory").tag("uri", to_uri(dir_path)); // Audit log
ret = hdfsDelete(fs_.get(), path.c_str(), 1);
if (ret != 0) {
LOG_WARNING("failed to delete directory")
.tag("path", to_uri(dir_path))
.tag("error", hdfs_error());
return ret;
}
return 0;
}
int HdfsAccessor::delete_directory(const std::string& dir_path) {
auto norm_dir_path = dir_path;
strip_leading(norm_dir_path, "/");
if (norm_dir_path.empty()) {
LOG_WARNING("invalid dir_path {}", dir_path);
return -1;
}
return delete_directory_impl(norm_dir_path);
}
int HdfsAccessor::delete_all(int64_t expiration_time) {
if (expiration_time <= 0) {
return delete_directory_impl("");
}
std::unique_ptr<ListIterator> list_iter;
int ret = list_all(&list_iter);
if (ret != 0) {
return ret;
}
for (auto file = list_iter->next(); file; file = list_iter->next()) {
if (file->mtime_s > expiration_time) {
continue;
}
if (int del_ret = delete_file(file->path); del_ret != 0) {
ret = del_ret;
}
}
return ret;
}
int HdfsAccessor::delete_files(const std::vector<std::string>& paths) {
int ret = 0;
for (auto&& path : paths) {
int del_ret = delete_file(path);
if (del_ret != 0) {
ret = del_ret;
}
}
return ret;
}
int HdfsAccessor::delete_file(const std::string& relative_path) {
// FIXME(plat1ko): Can we distinguish between RPC failure and the file not existing via
// `hdfsDelete`'s return value or errno to avoid exist rpc?
int ret = exists(relative_path);
if (ret == 1) {
return 0;
} else if (ret < 0) {
return ret;
}
// Path exists
auto path = to_fs_path(relative_path);
LOG_INFO("delete object").tag("uri", to_uri(relative_path)); // Audit log
SCOPED_BVAR_LATENCY(hdfs_delete_latency);
ret = hdfsDelete(fs_.get(), path.c_str(), 0);
if (ret != 0) {
LOG_WARNING("failed to delete object")
.tag("path", to_uri(relative_path))
.tag("error", hdfs_error());
return ret;
}
return 0;
}
int HdfsAccessor::put_file(const std::string& relative_path, const std::string& content) {
auto path = to_fs_path(relative_path);
hdfsFile file;
{
SCOPED_BVAR_LATENCY(hdfs_open_latency);
file = hdfsOpenFile(fs_.get(), path.c_str(), O_WRONLY, 0, 0, 0);
}
if (!file) {
LOG_WARNING("failed to create file")
.tag("uri", to_uri(relative_path))
.tag("error", hdfs_error());
return -1;
}
DORIS_CLOUD_DEFER {
if (file) {
SCOPED_BVAR_LATENCY(hdfs_close_latency);
hdfsCloseFile(fs_.get(), file);
}
};
int64_t written_bytes = 0;
{
SCOPED_BVAR_LATENCY(hdfs_write_latency);
written_bytes = hdfsWrite(fs_.get(), file, content.data(), content.size());
}
if (written_bytes < content.size()) {
LOG_WARNING("failed to write file")
.tag("uri", to_uri(relative_path))
.tag("error", hdfs_error());
return -1;
}
int ret = 0;
{
SCOPED_BVAR_LATENCY(hdfs_close_latency);
ret = hdfsCloseFile(fs_.get(), file);
}
file = nullptr;
if (ret != 0) {
LOG_WARNING("failed to close file")
.tag("uri", to_uri(relative_path))
.tag("error", hdfs_error());
return -1;
}
return 0;
}
int HdfsAccessor::list_directory(const std::string& dir_path, std::unique_ptr<ListIterator>* res) {
auto norm_dir_path = dir_path;
strip_leading(norm_dir_path, "/");
if (norm_dir_path.empty()) {
LOG_WARNING("invalid dir_path {}", dir_path);
return -1;
}
*res = std::make_unique<HdfsListIterator>(fs_, to_fs_path(norm_dir_path), uri_ + '/');
return 0;
}
int HdfsAccessor::list_all(std::unique_ptr<ListIterator>* res) {
*res = std::make_unique<HdfsListIterator>(fs_, to_fs_path(""), uri_ + '/');
return 0;
}
int HdfsAccessor::exists(const std::string& relative_path) {
auto path = to_fs_path(relative_path);
SCOPED_BVAR_LATENCY(hdfs_exist_latency);
int ret = hdfsExists(fs_.get(), path.c_str());
#ifdef USE_HADOOP_HDFS
// when calling hdfsExists() and return non-zero code,
// if errno is ENOENT, which means the file does not exist.
// if errno is not ENOENT, which means it encounter other error, should return.
// NOTE: not for libhdfs3 since it only runs on MaxOS, don't have to support it.
//
// See details:
// https://github.com/apache/hadoop/blob/5cda162a804fb0cfc2a5ac0058ab407662c5fb00/
// hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c#L1923-L1924
if (ret != 0 && errno != ENOENT) {
LOG_WARNING("failed to check object existence")
.tag("uri", to_uri(relative_path))
.tag("error", hdfs_error());
return -1;
}
#endif
return ret != 0;
}
int HdfsAccessor::abort_multipart_upload(const std::string& path, const std::string& upload_id) {
LOG_WARNING("abort_multipart_upload not implemented")
.tag("path", path)
.tag("upload_id", upload_id);
return -1;
}
} // namespace doris::cloud