blob: 07a3fe34c1dd0fdf42226afaf83c76f187eb8929 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/fs/local/local_file_system.h"
#include <fcntl.h>
#include <cassert>
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <utility>
#include "fmt/format.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/fs/local/local_file_status.h"
namespace paimon {
Result<LocalFile> LocalFileSystem::ToFile(const std::string& path_string) const {
// local file system does not support path_string with scheme, e.g., "file:/tmp" will be
// rewritten to "/tmp"
PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string));
if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) != "file") {
return Status::Invalid(fmt::format("invalid scheme {} for local file system", path.scheme));
}
return LocalFile(path.path);
}
Result<bool> LocalFileSystem::Exists(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
return file.Exists();
}
Result<std::unique_ptr<InputStream>> LocalFileSystem::Open(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path));
if (!is_exist) {
return Status::NotExist(fmt::format("File '{}' not exists", path));
}
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalInputStream> in, LocalInputStream::Create(file));
return in;
}
Result<std::unique_ptr<OutputStream>> LocalFileSystem::Create(const std::string& path,
bool overwrite) const {
PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path));
if (is_exist && !overwrite) {
return Status::Invalid(
fmt::format("do not allow overwrite, but the file {} already exists", path));
}
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
LocalFile parent = file.GetParentFile();
PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath()));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalOutputStream> out, LocalOutputStream::Create(file));
return out;
}
Status LocalFileSystem::Mkdirs(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
return MkdirsInternal(file);
}
Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const {
// Important: The 'Exists()' check above must come before the 'IsDir()'
// check to be safe when multiple parallel instances try to create the directory
PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists());
if (is_exist) {
PAIMON_ASSIGN_OR_RAISE(bool is_dir, file.IsDir());
if (is_dir) {
return Status::OK();
} else {
// exists and is not a directory -> is a regular file
return Status::IOError(fmt::format("file {} already exists and is not a directory",
file.GetAbsolutePath()));
}
}
auto parent = file.GetParentFile();
if (!parent.IsEmpty()) {
PAIMON_RETURN_NOT_OK(MkdirsInternal(parent));
}
PAIMON_ASSIGN_OR_RAISE(bool success, file.Mkdir());
if (!success) {
PAIMON_ASSIGN_OR_RAISE(bool is_dir, file.IsDir());
if (is_dir) {
return Status::OK();
} else {
return Status::IOError(
fmt::format("create directory '{}' failed", file.GetAbsolutePath()));
}
}
return Status::OK();
}
Result<std::unique_ptr<FileStatus>> LocalFileSystem::GetFileStatus(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists());
if (is_exist) {
return file.GetFileStatus();
} else {
return Status::NotExist(
fmt::format("File {} does not exist or the user running "
"Paimon has insufficient permissions to access it.",
file.GetAbsolutePath()));
}
}
Status LocalFileSystem::ListDir(
const std::string& directory,
std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const {
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(directory));
PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists());
if (!is_exist) {
return Status::OK();
}
PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile());
if (is_file) {
return Status::IOError(
fmt::format("file {} already exists and is not a directory", file.GetAbsolutePath()));
} else {
std::vector<std::string> file_list;
PAIMON_RETURN_NOT_OK(file.List(&file_list));
file_status_list->reserve(file_status_list->size() + file_list.size());
for (const auto& f : file_list) {
Result<std::unique_ptr<FileStatus>> file_status =
GetFileStatus(PathUtil::JoinPath(directory, f));
if (!file_status.ok() && !file_status.status().IsNotExist()) {
return file_status.status();
} else if (file_status.ok()) {
file_status_list->emplace_back(std::make_unique<LocalBasicFileStatus>(
file_status.value()->GetPath(), file_status.value()->IsDir()));
}
}
return Status::OK();
}
}
Status LocalFileSystem::ListFileStatus(
const std::string& path, std::vector<std::unique_ptr<FileStatus>>* file_status_list) const {
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists());
if (!is_exist) {
return Status::OK();
}
PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile());
if (is_file) {
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status, file.GetFileStatus());
file_status_list->emplace_back(std::move(file_status));
} else {
std::vector<std::string> file_list;
PAIMON_RETURN_NOT_OK(file.List(&file_list));
file_status_list->reserve(file_status_list->size() + file_list.size());
for (const auto& f : file_list) {
Result<std::unique_ptr<FileStatus>> file_status =
GetFileStatus(PathUtil::JoinPath(path, f));
if (!file_status.ok() && !file_status.status().IsNotExist()) {
return file_status.status();
} else if (file_status.ok()) {
file_status_list->emplace_back(std::move(file_status).value());
}
}
}
return Status::OK();
}
Status LocalFileSystem::Delete(const std::string& path, bool recursive) const {
PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path));
PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile());
if (is_file) {
return file.Delete();
}
return Delete(file, recursive);
}
Status LocalFileSystem::Delete(const LocalFile& f, bool recursive) const {
PAIMON_ASSIGN_OR_RAISE(bool is_dir, f.IsDir());
if (is_dir) {
std::vector<LocalFile> files;
PAIMON_RETURN_NOT_OK(f.ListFiles(&files));
if (recursive == false && !files.empty()) {
return Status::IOError(
fmt::format("cannot delete {}, directory is not empty", f.GetAbsolutePath()));
}
for (const auto& file : files) {
PAIMON_RETURN_NOT_OK(Delete(file));
}
}
// Now directory is empty
return f.Delete();
}
Status LocalFileSystem::Rename(const std::string& src, const std::string& dst) const {
std::string err_msg = fmt::format("rename '{}' to '{}' failed, because: ", src, dst);
PAIMON_ASSIGN_OR_RAISE(bool is_src_exist, Exists(src));
if (!is_src_exist) {
return Status::NotExist(err_msg, "src file not exist");
}
PAIMON_ASSIGN_OR_RAISE(bool is_dst_exist, Exists(dst));
if (is_dst_exist) {
return Status::Invalid(err_msg, "dst file already exist");
}
PAIMON_ASSIGN_OR_RAISE(LocalFile src_file, ToFile(src));
PAIMON_ASSIGN_OR_RAISE(bool is_file, src_file.IsFile());
std::string new_file_name = dst;
if (is_file && new_file_name[new_file_name.length() - 1] == '/') {
return Status::Invalid(err_msg, "src file is not a dir");
}
PAIMON_ASSIGN_OR_RAISE(LocalFile dst_file, ToFile(dst));
auto parent = dst_file.GetParentFile();
PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath()));
if (::rename(src.c_str(), dst.c_str()) != 0) {
int32_t cur_errno = errno;
return Status::IOError(err_msg, std::strerror(cur_errno));
}
return Status::OK();
}
// input stream
Result<std::unique_ptr<LocalInputStream>> LocalInputStream::Create(LocalFile& file) {
PAIMON_RETURN_NOT_OK(file.OpenFile(/*is_read_file=*/true));
return std::unique_ptr<LocalInputStream>(new LocalInputStream(file));
}
LocalInputStream::LocalInputStream(const LocalFile& file) : file_(file) {}
Status LocalInputStream::Seek(int64_t offset, SeekOrigin origin) {
if (origin != FS_SEEK_SET && origin != FS_SEEK_CUR && origin != FS_SEEK_END) {
return Status::Invalid(
"invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and FS_SEEK_END");
}
auto convert_origin = [](SeekOrigin origin) -> int32_t {
switch (origin) {
case FS_SEEK_SET:
return SEEK_SET;
case FS_SEEK_CUR:
return SEEK_CUR;
case FS_SEEK_END:
return SEEK_END;
default:
return SEEK_SET;
}
};
return file_.Seek(offset, convert_origin(origin));
}
Result<int64_t> LocalInputStream::GetPos() const {
return file_.Tell();
}
Result<int32_t> LocalInputStream::Read(char* buffer, uint32_t size) {
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_.Read(buffer, size));
if (read_length != static_cast<int32_t>(size)) {
return Status::IOError(fmt::format("file '{}' read size {} != expected {}",
file_.GetAbsolutePath(), read_length, size));
}
return read_length;
}
Result<int32_t> LocalInputStream::Read(char* buffer, uint32_t size, uint64_t offset) {
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_.Read(buffer, size, offset));
if (read_length != static_cast<int32_t>(size)) {
return Status::IOError(fmt::format("file '{}' read size {} != expected {}",
file_.GetAbsolutePath(), read_length, size));
}
return read_length;
}
void LocalInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
std::function<void(Status)>&& callback) {
Result<int32_t> read_size = Read(buffer, size, offset);
Status status = Status::OK();
if (!read_size.ok()) {
status = read_size.status();
} else {
assert(read_size.value() == static_cast<int32_t>(size));
}
callback(status);
}
Result<uint64_t> LocalInputStream::Length() const {
return file_.Length();
}
Status LocalInputStream::Close() {
return file_.Close();
}
// output stream
Result<std::unique_ptr<LocalOutputStream>> LocalOutputStream::Create(LocalFile& file) {
PAIMON_RETURN_NOT_OK(file.OpenFile(/*is_read_file=*/false));
return std::unique_ptr<LocalOutputStream>(new LocalOutputStream(file));
}
LocalOutputStream::LocalOutputStream(const LocalFile& file) : file_(file) {}
Result<int64_t> LocalOutputStream::GetPos() const {
return file_.Tell();
}
Result<int32_t> LocalOutputStream::Write(const char* buffer, uint32_t size) {
return file_.Write(buffer, size);
}
Status LocalOutputStream::Flush() {
return file_.Flush();
}
Status LocalOutputStream::Close() {
return file_.Close();
}
} // namespace paimon