blob: c041002abc1374fb8b52f18b5d4adb07685d0fcc [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/common/fs/resolving_file_system.h"
#include <mutex>
#include <shared_mutex>
#include <utility>
#include "paimon/common/utils/path_util.h"
#include "paimon/fs/file_system_factory.h"
namespace paimon {
ResolvingFileSystem::ResolvingFileSystem(
const std::map<std::string, std::string>& scheme_to_fs_identifier,
const std::string& default_fs_identifier, const std::map<std::string, std::string>& options)
: scheme_to_fs_identifier_(scheme_to_fs_identifier),
default_fs_identifier_(default_fs_identifier),
options_(options) {
// local file's scheme may be 'file' or empty
auto identifier_iter = scheme_to_fs_identifier_.find("file");
if (identifier_iter != scheme_to_fs_identifier_.end()) {
scheme_to_fs_identifier_[""] = identifier_iter->second;
}
}
Result<std::shared_ptr<FileSystem>> ResolvingFileSystem::GetRealFileSystem(
const std::string& uri) const {
PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(uri));
// Try to get file system from cache with shared lock (read lock)
{
std::shared_lock<std::shared_mutex> read_lock(fs_cache_mutex_);
auto fs_iter = fs_cache_.find({path.scheme, path.authority});
if (fs_iter != fs_cache_.end()) {
return fs_iter->second;
}
}
// Cache miss, create file system and set it to cache with exclusive lock (write lock)
std::unique_lock<std::shared_mutex> write_lock(fs_cache_mutex_);
// Double-check pattern: check again after acquiring write lock
auto fs_iter = fs_cache_.find({path.scheme, path.authority});
if (fs_iter != fs_cache_.end()) {
return fs_iter->second;
}
// Create file system
std::string identifier = default_fs_identifier_;
auto identifier_iter = scheme_to_fs_identifier_.find(path.scheme);
if (identifier_iter != scheme_to_fs_identifier_.end()) {
identifier = identifier_iter->second;
}
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get(identifier, uri, options_));
fs_cache_.emplace(std::make_pair(path.scheme, path.authority), fs);
return fs;
}
Result<std::unique_ptr<InputStream>> ResolvingFileSystem::Open(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->Open(path);
}
Result<std::unique_ptr<OutputStream>> ResolvingFileSystem::Create(const std::string& path,
bool overwrite) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->Create(path, overwrite);
}
Status ResolvingFileSystem::Mkdirs(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->Mkdirs(path);
}
Status ResolvingFileSystem::Rename(const std::string& src, const std::string& dst) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(src));
return fs->Rename(src, dst);
}
Status ResolvingFileSystem::Delete(const std::string& path, bool recursive) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->Delete(path, recursive);
}
Result<std::unique_ptr<FileStatus>> ResolvingFileSystem::GetFileStatus(
const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->GetFileStatus(path);
}
Status ResolvingFileSystem::ListDir(
const std::string& directory,
std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(directory));
return fs->ListDir(directory, file_status_list);
}
Status ResolvingFileSystem::ListFileStatus(
const std::string& path, std::vector<std::unique_ptr<FileStatus>>* file_status_list) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->ListFileStatus(path, file_status_list);
}
Result<bool> ResolvingFileSystem::Exists(const std::string& path) const {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, GetRealFileSystem(path));
return fs->Exists(path);
}
} // namespace paimon