blob: 7c71603f07c58fecc08b3139b4e3d836c4b13ef0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/hdfs_file_system.h"
#include <errno.h>
#include <fcntl.h>
#include <gen_cpp/PlanNodes_types.h>
#include <algorithm>
#include <filesystem>
#include <map>
#include <mutex>
#include <ostream>
#include <unordered_map>
#include <utility>
#include "common/config.h"
#include "common/status.h"
#include "io/fs/err_utils.h"
#include "io/fs/hdfs/hdfs_mgr.h"
#include "io/fs/hdfs_file_reader.h"
#include "io/fs/hdfs_file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/hdfs_builder.h"
#include "io/hdfs_util.h"
#include "runtime/exec_env.h"
#include "util/obj_lru_cache.h"
#include "util/slice.h"
namespace doris::io {
#ifndef CHECK_HDFS_HANDLER
#define CHECK_HDFS_HANDLER(handler) \
if (!handler) { \
return Status::IOError("init Hdfs handler error"); \
}
#endif
Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(
const std::map<std::string, std::string>& properties, std::string fs_name, std::string id,
RuntimeProfile* profile, std::string root_path) {
return HdfsFileSystem::create(parse_properties(properties), std::move(fs_name), std::move(id),
profile, std::move(root_path));
}
Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(const THdfsParams& hdfs_params,
std::string fs_name, std::string id,
RuntimeProfile* profile,
std::string root_path) {
#ifdef USE_HADOOP_HDFS
if (!config::enable_java_support) {
return ResultError(Status::InternalError(
"hdfs file system is not enabled, you can change be config enable_java_support to "
"true."));
}
#endif
std::shared_ptr<HdfsFileSystem> fs(new HdfsFileSystem(
hdfs_params, std::move(fs_name), std::move(id), profile, std::move(root_path)));
RETURN_IF_ERROR_RESULT(fs->init());
return fs;
}
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
RuntimeProfile* profile, std::string root_path)
: RemoteFileSystem(std::move(root_path), std::move(id), FileSystemType::HDFS),
_hdfs_params(hdfs_params),
_fs_name(std::move(fs_name)),
_profile(profile) {
if (_fs_name.empty()) {
_fs_name = hdfs_params.fs_name;
}
}
HdfsFileSystem::~HdfsFileSystem() = default;
Status HdfsFileSystem::init() {
RETURN_IF_ERROR(ExecEnv::GetInstance()->hdfs_mgr()->get_or_create_fs(_hdfs_params, _fs_name,
&_fs_handler));
if (!_fs_handler) {
return Status::InternalError("failed to init Hdfs handler with, please check hdfs params.");
}
return Status::OK();
}
Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto res = io::HdfsFileWriter::create(file, _fs_handler, _fs_name, opts);
if (res.has_value()) {
*writer = std::move(res).value();
return Status::OK();
} else {
return std::move(res).error();
}
}
Status HdfsFileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) {
CHECK_HDFS_HANDLER(_fs_handler);
*reader =
DORIS_TRY(HdfsFileReader::create(file, _fs_handler->hdfs_fs, _fs_name, opts, _profile));
return Status::OK();
}
Status HdfsFileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) {
CHECK_HDFS_HANDLER(_fs_handler);
Path real_path = convert_path(dir, _fs_name);
int res = hdfsCreateDirectory(_fs_handler->hdfs_fs, real_path.string().c_str());
if (res == -1) {
return Status::IOError("failed to create directory {}: {}", dir.native(), hdfs_error());
}
return Status::OK();
}
Status HdfsFileSystem::delete_file_impl(const Path& file) {
return delete_internal(file, 0);
}
Status HdfsFileSystem::delete_directory_impl(const Path& dir) {
return delete_internal(dir, 1);
}
Status HdfsFileSystem::batch_delete_impl(const std::vector<Path>& files) {
for (auto& file : files) {
RETURN_IF_ERROR(delete_file_impl(file));
}
return Status::OK();
}
Status HdfsFileSystem::delete_internal(const Path& path, int is_recursive) {
bool exists = true;
RETURN_IF_ERROR(exists_impl(path, &exists));
if (!exists) {
return Status::OK();
}
CHECK_HDFS_HANDLER(_fs_handler);
Path real_path = convert_path(path, _fs_name);
int res = hdfsDelete(_fs_handler->hdfs_fs, real_path.string().c_str(), is_recursive);
if (res == -1) {
return Status::IOError("failed to delete directory {}: {}", path.native(), hdfs_error());
}
return Status::OK();
}
Status HdfsFileSystem::exists_impl(const Path& path, bool* res) const {
CHECK_HDFS_HANDLER(_fs_handler);
Path real_path = convert_path(path, _fs_name);
int is_exists = hdfsExists(_fs_handler->hdfs_fs, real_path.string().c_str());
#ifdef USE_HADOOP_HDFS
// when calling hdfsExists() and return non-zero code,
// if errno is ENOENT, which means the file does not exist.
// if errno is not ENOENT, which means it encounter other error, should return.
// NOTE: not for libhdfs3 since it only runs on MaxOS, don't have to support it.
//
// See details:
// https://github.com/apache/hadoop/blob/5cda162a804fb0cfc2a5ac0058ab407662c5fb00/
// hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c#L1923-L1924
if (is_exists != 0 && errno != ENOENT) {
char* root_cause = hdfsGetLastExceptionRootCause();
return Status::IOError("failed to check path existence {}: {}", path.native(),
(root_cause ? root_cause : "unknown"));
}
#endif
*res = (is_exists == 0);
return Status::OK();
}
Status HdfsFileSystem::file_size_impl(const Path& path, int64_t* file_size) const {
CHECK_HDFS_HANDLER(_fs_handler);
Path real_path = convert_path(path, _fs_name);
hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handler->hdfs_fs, real_path.string().c_str());
if (file_info == nullptr) {
return Status::IOError("failed to get file size of {}: {}", path.native(), hdfs_error());
}
*file_size = file_info->mSize;
hdfsFreeFileInfo(file_info, 1);
return Status::OK();
}
Status HdfsFileSystem::list_impl(const Path& path, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
RETURN_IF_ERROR(exists_impl(path, exists));
if (!(*exists)) {
return Status::OK();
}
CHECK_HDFS_HANDLER(_fs_handler);
Path real_path = convert_path(path, _fs_name);
int numEntries = 0;
hdfsFileInfo* hdfs_file_info =
hdfsListDirectory(_fs_handler->hdfs_fs, real_path.c_str(), &numEntries);
if (hdfs_file_info == nullptr) {
return Status::IOError("failed to list files/directors {}: {}", path.native(),
hdfs_error());
}
for (int idx = 0; idx < numEntries; ++idx) {
auto& file = hdfs_file_info[idx];
if (only_file && file.mKind == kObjectKindDirectory) {
continue;
}
auto& file_info = files->emplace_back();
std::string_view fname(file.mName);
fname.remove_prefix(fname.rfind('/') + 1);
file_info.file_name = fname;
file_info.file_size = file.mSize;
file_info.is_file = (file.mKind != kObjectKindDirectory);
}
hdfsFreeFileInfo(hdfs_file_info, numEntries);
return Status::OK();
}
Status HdfsFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
Path normal_orig_name = convert_path(orig_name, _fs_name);
Path normal_new_name = convert_path(new_name, _fs_name);
int ret = hdfsRename(_fs_handler->hdfs_fs, normal_orig_name.c_str(), normal_new_name.c_str());
if (ret == 0) {
LOG(INFO) << "finished to rename file. orig: " << normal_orig_name
<< ", new: " << normal_new_name;
return Status::OK();
} else {
return Status::IOError("fail to rename from {} to {}: {}", normal_orig_name.native(),
normal_new_name.native(), hdfs_error());
}
return Status::OK();
}
Status HdfsFileSystem::upload_impl(const Path& local_file, const Path& remote_file) {
// 1. open local file for read
FileSystemSPtr local_fs = global_local_filesystem();
FileReaderSPtr local_reader = nullptr;
RETURN_IF_ERROR(local_fs->open_file(local_file, &local_reader));
int64_t file_len = local_reader->size();
if (file_len == -1) {
return Status::IOError("failed to get size of file: {}", local_file.string());
}
// 2. open remote file for write
FileWriterPtr hdfs_writer = nullptr;
RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer, nullptr));
constexpr size_t buf_sz = 1024 * 1024;
char read_buf[buf_sz];
size_t left_len = file_len;
size_t read_offset = 0;
size_t bytes_read = 0;
while (left_len > 0) {
size_t read_len = left_len > buf_sz ? buf_sz : left_len;
RETURN_IF_ERROR(local_reader->read_at(read_offset, {read_buf, read_len}, &bytes_read));
RETURN_IF_ERROR(hdfs_writer->append({read_buf, read_len}));
read_offset += read_len;
left_len -= read_len;
}
return hdfs_writer->close();
}
Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
DCHECK(local_files.size() == remote_files.size());
for (int i = 0; i < local_files.size(); ++i) {
RETURN_IF_ERROR(upload_impl(local_files[i], remote_files[i]));
}
return Status::OK();
}
Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_file) {
// 1. open remote file for read
FileReaderSPtr hdfs_reader = nullptr;
RETURN_IF_ERROR(open_file_internal(remote_file, &hdfs_reader, FileReaderOptions::DEFAULT));
// 2. remove the existing local file if exist
if (std::filesystem::remove(local_file)) {
LOG(INFO) << "remove the previously exist local file: " << local_file;
}
// 3. open local file for write
FileSystemSPtr local_fs = global_local_filesystem();
FileWriterPtr local_writer = nullptr;
RETURN_IF_ERROR(local_fs->create_file(local_file, &local_writer));
// 4. read remote and write to local
LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file;
constexpr size_t buf_sz = 1024 * 1024;
std::unique_ptr<char[]> read_buf(new char[buf_sz]);
size_t cur_offset = 0;
while (true) {
size_t read_len = 0;
Slice file_slice(read_buf.get(), buf_sz);
RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len));
cur_offset += read_len;
if (read_len == 0) {
break;
}
RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
}
return local_writer->close();
}
} // namespace doris::io