blob: ac63217ed33bc72b14f5c641e28f8ec15d743d90 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/hdfs_util.h"
#include <bthread/bthread.h>
#include <bthread/butex.h>
#include <bvar/latency_recorder.h>
#include <gen_cpp/cloud.pb.h>
#include <ostream>
#include <thread>
#include "common/logging.h"
#include "io/fs/err_utils.h"
#include "io/hdfs_builder.h"
#include "vec/common/string_ref.h"
namespace doris::io {
namespace hdfs_bvar {
bvar::LatencyRecorder hdfs_read_latency("hdfs_read");
bvar::LatencyRecorder hdfs_write_latency("hdfs_write");
bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir");
bvar::LatencyRecorder hdfs_open_latency("hdfs_open");
bvar::LatencyRecorder hdfs_close_latency("hdfs_close");
bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush");
bvar::LatencyRecorder hdfs_hflush_latency("hdfs_hflush");
bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync");
}; // namespace hdfs_bvar
Path convert_path(const Path& path, const std::string& namenode) {
std::string fs_path;
if (path.native().find(namenode) != std::string::npos) {
// `path` is uri format, remove the namenode part in `path`
// FIXME(plat1ko): Not robust if `namenode` doesn't appear at the beginning of `path`
fs_path = path.native().substr(namenode.size());
} else {
fs_path = path;
}
// Always use absolute path (start with '/') in hdfs
if (fs_path.empty() || fs_path[0] != '/') {
fs_path.insert(fs_path.begin(), '/');
}
return fs_path;
}
bool is_hdfs(const std::string& path_or_fs) {
return path_or_fs.rfind("hdfs://") == 0;
}
THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) {
THdfsParams params;
auto build_conf = vault.build_conf();
params.__set_fs_name(build_conf.fs_name());
if (build_conf.has_user()) {
params.__set_user(build_conf.user());
}
if (build_conf.has_hdfs_kerberos_principal()) {
params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_principal());
}
if (build_conf.has_hdfs_kerberos_keytab()) {
params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_keytab());
}
std::vector<THdfsConf> tconfs;
for (const auto& confs : vault.build_conf().hdfs_confs()) {
THdfsConf conf;
conf.__set_key(confs.key());
conf.__set_value(confs.value());
tconfs.emplace_back(conf);
}
params.__set_hdfs_conf(tconfs);
return params;
}
} // namespace doris::io