blob: 98dc05731b9a9e8486f13220d06cf7825f02d463 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <sstream>
#include <utility>
#include "arrow/util/config.h"
#include "arrow/filesystem/filesystem.h"
#ifdef ARROW_HDFS
#include "arrow/filesystem/hdfs.h"
#endif
#ifdef ARROW_S3
#include "arrow/filesystem/s3fs.h"
#endif
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/slow.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/parallel.h"
#include "arrow/util/uri.h"
#include "arrow/util/vector.h"
#include "arrow/util/windows_fixup.h"
namespace arrow {
using internal::checked_pointer_cast;
using internal::TaskHints;
using internal::Uri;
using io::internal::SubmitIO;
namespace fs {
using internal::ConcatAbstractPath;
using internal::EnsureTrailingSlash;
using internal::GetAbstractPathParent;
using internal::kSep;
using internal::RemoveLeadingSlash;
using internal::RemoveTrailingSlash;
using internal::ToSlashes;
std::string ToString(FileType ftype) {
switch (ftype) {
case FileType::NotFound:
return "not-found";
case FileType::Unknown:
return "unknown";
case FileType::File:
return "file";
case FileType::Directory:
return "directory";
default:
ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast<int>(ftype);
return "???";
}
}
// For googletest
ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType ftype) {
#define FILE_TYPE_CASE(value_name) \
case FileType::value_name: \
os << "FileType::" ARROW_STRINGIFY(value_name); \
break;
switch (ftype) {
FILE_TYPE_CASE(NotFound)
FILE_TYPE_CASE(Unknown)
FILE_TYPE_CASE(File)
FILE_TYPE_CASE(Directory)
default:
ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast<int>(ftype);
}
#undef FILE_TYPE_CASE
return os;
}
std::string FileInfo::base_name() const {
return internal::GetAbstractPathParent(path_).second;
}
std::string FileInfo::dir_name() const {
return internal::GetAbstractPathParent(path_).first;
}
// Debug helper
std::string FileInfo::ToString() const {
std::stringstream os;
os << *this;
return os.str();
}
std::ostream& operator<<(std::ostream& os, const FileInfo& info) {
return os << "FileInfo(" << info.type() << ", " << info.path() << ")";
}
std::string FileInfo::extension() const {
return internal::GetAbstractPathExtension(path_);
}
//////////////////////////////////////////////////////////////////////////
// FileSystem default method implementations
FileSystem::~FileSystem() {}
Result<std::string> FileSystem::NormalizePath(std::string path) { return path; }
Result<std::vector<FileInfo>> FileSystem::GetFileInfo(
const std::vector<std::string>& paths) {
std::vector<FileInfo> res;
res.reserve(paths.size());
for (const auto& path : paths) {
ARROW_ASSIGN_OR_RAISE(FileInfo info, GetFileInfo(path));
res.push_back(std::move(info));
}
return res;
}
namespace {
template <typename DeferredFunc>
auto FileSystemDefer(FileSystem* fs, bool synchronous, DeferredFunc&& func)
-> decltype(DeferNotOk(
fs->io_context().executor()->Submit(func, std::shared_ptr<FileSystem>{}))) {
auto self = fs->shared_from_this();
if (synchronous) {
return std::forward<DeferredFunc>(func)(std::move(self));
}
return DeferNotOk(io::internal::SubmitIO(
fs->io_context(), std::forward<DeferredFunc>(func), std::move(self)));
}
} // namespace
Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync(
const std::vector<std::string>& paths) {
return FileSystemDefer(
this, default_async_is_sync_,
[paths](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(paths); });
}
FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select) {
auto fut = FileSystemDefer(
this, default_async_is_sync_,
[select](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(select); });
return MakeSingleFutureGenerator(std::move(fut));
}
Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) {
Status st = Status::OK();
for (const auto& path : paths) {
st &= DeleteFile(path);
}
return st;
}
namespace {
Status ValidateInputFileInfo(const FileInfo& info) {
if (info.type() == FileType::NotFound) {
return internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return internal::NotAFile(info.path());
}
return Status::OK();
}
} // namespace
Result<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStream(
const FileInfo& info) {
RETURN_NOT_OK(ValidateInputFileInfo(info));
return OpenInputStream(info.path());
}
Result<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFile(
const FileInfo& info) {
RETURN_NOT_OK(ValidateInputFileInfo(info));
return OpenInputFile(info.path());
}
Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync(
const std::string& path) {
return FileSystemDefer(
this, default_async_is_sync_,
[path](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(path); });
}
Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync(
const FileInfo& info) {
RETURN_NOT_OK(ValidateInputFileInfo(info));
return FileSystemDefer(
this, default_async_is_sync_,
[info](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(info); });
}
Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync(
const std::string& path) {
return FileSystemDefer(
this, default_async_is_sync_,
[path](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(path); });
}
Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync(
const FileInfo& info) {
RETURN_NOT_OK(ValidateInputFileInfo(info));
return FileSystemDefer(
this, default_async_is_sync_,
[info](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(info); });
}
//////////////////////////////////////////////////////////////////////////
// SubTreeFileSystem implementation
SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path,
std::shared_ptr<FileSystem> base_fs)
: FileSystem(base_fs->io_context()),
base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()),
base_fs_(base_fs) {}
SubTreeFileSystem::~SubTreeFileSystem() {}
Result<std::string> SubTreeFileSystem::NormalizeBasePath(
std::string base_path, const std::shared_ptr<FileSystem>& base_fs) {
ARROW_ASSIGN_OR_RAISE(base_path, base_fs->NormalizePath(std::move(base_path)));
return EnsureTrailingSlash(std::move(base_path));
}
bool SubTreeFileSystem::Equals(const FileSystem& other) const {
if (this == &other) {
return true;
}
if (other.type_name() != type_name()) {
return false;
}
const auto& subfs = ::arrow::internal::checked_cast<const SubTreeFileSystem&>(other);
return base_path_ == subfs.base_path_ && base_fs_->Equals(subfs.base_fs_);
}
std::string SubTreeFileSystem::PrependBase(const std::string& s) const {
if (s.empty()) {
return base_path_;
} else {
return ConcatAbstractPath(base_path_, s);
}
}
Status SubTreeFileSystem::PrependBaseNonEmpty(std::string* s) const {
if (s->empty()) {
return Status::IOError("Empty path");
} else {
*s = ConcatAbstractPath(base_path_, *s);
return Status::OK();
}
}
Result<std::string> SubTreeFileSystem::StripBase(const std::string& s) const {
auto len = base_path_.length();
// Note base_path_ ends with a slash (if not empty)
if (s.length() >= len && s.substr(0, len) == base_path_) {
return s.substr(len);
} else {
return Status::UnknownError("Underlying filesystem returned path '", s,
"', which is not a subpath of '", base_path_, "'");
}
}
Status SubTreeFileSystem::FixInfo(FileInfo* info) const {
ARROW_ASSIGN_OR_RAISE(auto fixed_path, StripBase(info->path()));
info->set_path(std::move(fixed_path));
return Status::OK();
}
Result<std::string> SubTreeFileSystem::NormalizePath(std::string path) {
ARROW_ASSIGN_OR_RAISE(auto normalized, base_fs_->NormalizePath(PrependBase(path)));
return StripBase(std::move(normalized));
}
Result<FileInfo> SubTreeFileSystem::GetFileInfo(const std::string& path) {
ARROW_ASSIGN_OR_RAISE(FileInfo info, base_fs_->GetFileInfo(PrependBase(path)));
RETURN_NOT_OK(FixInfo(&info));
return info;
}
Result<std::vector<FileInfo>> SubTreeFileSystem::GetFileInfo(const FileSelector& select) {
auto selector = select;
selector.base_dir = PrependBase(selector.base_dir);
ARROW_ASSIGN_OR_RAISE(auto infos, base_fs_->GetFileInfo(selector));
for (auto& info : infos) {
RETURN_NOT_OK(FixInfo(&info));
}
return infos;
}
FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& select) {
auto selector = select;
selector.base_dir = PrependBase(selector.base_dir);
auto gen = base_fs_->GetFileInfoGenerator(selector);
auto self = checked_pointer_cast<SubTreeFileSystem>(shared_from_this());
std::function<Result<std::vector<FileInfo>>(const std::vector<FileInfo>& infos)>
fix_infos = [self](std::vector<FileInfo> infos) -> Result<std::vector<FileInfo>> {
for (auto& info : infos) {
RETURN_NOT_OK(self->FixInfo(&info));
}
return infos;
};
return MakeMappedGenerator(gen, fix_infos);
}
Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->CreateDir(s, recursive);
}
Status SubTreeFileSystem::DeleteDir(const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->DeleteDir(s);
}
Status SubTreeFileSystem::DeleteDirContents(const std::string& path) {
if (internal::IsEmptyPath(path)) {
return internal::InvalidDeleteDirContents(path);
}
auto s = PrependBase(path);
return base_fs_->DeleteDirContents(s);
}
Status SubTreeFileSystem::DeleteRootDirContents() {
if (base_path_.empty()) {
return base_fs_->DeleteRootDirContents();
} else {
return base_fs_->DeleteDirContents(base_path_);
}
}
Status SubTreeFileSystem::DeleteFile(const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->DeleteFile(s);
}
Status SubTreeFileSystem::Move(const std::string& src, const std::string& dest) {
auto s = src;
auto d = dest;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
RETURN_NOT_OK(PrependBaseNonEmpty(&d));
return base_fs_->Move(s, d);
}
Status SubTreeFileSystem::CopyFile(const std::string& src, const std::string& dest) {
auto s = src;
auto d = dest;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
RETURN_NOT_OK(PrependBaseNonEmpty(&d));
return base_fs_->CopyFile(s, d);
}
Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputStream(s);
}
Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
FileInfo new_info(info);
new_info.set_path(std::move(s));
return base_fs_->OpenInputStream(new_info);
}
Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputStreamAsync(s);
}
Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
FileInfo new_info(info);
new_info.set_path(std::move(s));
return base_fs_->OpenInputStreamAsync(new_info);
}
Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputFile(s);
}
Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
FileInfo new_info(info);
new_info.set_path(std::move(s));
return base_fs_->OpenInputFile(new_info);
}
Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputFileAsync(s);
}
Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
FileInfo new_info(info);
new_info.set_path(std::move(s));
return base_fs_->OpenInputFileAsync(new_info);
}
Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenOutputStream(s);
}
Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenAppendStream(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenAppendStream(s);
}
//////////////////////////////////////////////////////////////////////////
// SlowFileSystem implementation
SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
std::shared_ptr<io::LatencyGenerator> latencies)
: FileSystem(base_fs->io_context()), base_fs_(base_fs), latencies_(latencies) {}
SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
double average_latency)
: FileSystem(base_fs->io_context()),
base_fs_(base_fs),
latencies_(io::LatencyGenerator::Make(average_latency)) {}
SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
double average_latency, int32_t seed)
: FileSystem(base_fs->io_context()),
base_fs_(base_fs),
latencies_(io::LatencyGenerator::Make(average_latency, seed)) {}
bool SlowFileSystem::Equals(const FileSystem& other) const { return this == &other; }
Result<FileInfo> SlowFileSystem::GetFileInfo(const std::string& path) {
latencies_->Sleep();
return base_fs_->GetFileInfo(path);
}
Result<std::vector<FileInfo>> SlowFileSystem::GetFileInfo(const FileSelector& selector) {
latencies_->Sleep();
return base_fs_->GetFileInfo(selector);
}
Status SlowFileSystem::CreateDir(const std::string& path, bool recursive) {
latencies_->Sleep();
return base_fs_->CreateDir(path, recursive);
}
Status SlowFileSystem::DeleteDir(const std::string& path) {
latencies_->Sleep();
return base_fs_->DeleteDir(path);
}
Status SlowFileSystem::DeleteDirContents(const std::string& path) {
latencies_->Sleep();
return base_fs_->DeleteDirContents(path);
}
Status SlowFileSystem::DeleteRootDirContents() {
latencies_->Sleep();
return base_fs_->DeleteRootDirContents();
}
Status SlowFileSystem::DeleteFile(const std::string& path) {
latencies_->Sleep();
return base_fs_->DeleteFile(path);
}
Status SlowFileSystem::Move(const std::string& src, const std::string& dest) {
latencies_->Sleep();
return base_fs_->Move(src, dest);
}
Status SlowFileSystem::CopyFile(const std::string& src, const std::string& dest) {
latencies_->Sleep();
return base_fs_->CopyFile(src, dest);
}
Result<std::shared_ptr<io::InputStream>> SlowFileSystem::OpenInputStream(
const std::string& path) {
latencies_->Sleep();
ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(path));
return std::make_shared<io::SlowInputStream>(stream, latencies_);
}
Result<std::shared_ptr<io::InputStream>> SlowFileSystem::OpenInputStream(
const FileInfo& info) {
latencies_->Sleep();
ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(info));
return std::make_shared<io::SlowInputStream>(stream, latencies_);
}
Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile(
const std::string& path) {
latencies_->Sleep();
ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(path));
return std::make_shared<io::SlowRandomAccessFile>(file, latencies_);
}
Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile(
const FileInfo& info) {
latencies_->Sleep();
ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(info));
return std::make_shared<io::SlowRandomAccessFile>(file, latencies_);
}
Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenOutputStream(
const std::string& path) {
latencies_->Sleep();
// XXX Should we have a SlowOutputStream that waits on Flush() and Close()?
return base_fs_->OpenOutputStream(path);
}
Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenAppendStream(
const std::string& path) {
latencies_->Sleep();
return base_fs_->OpenAppendStream(path);
}
Status CopyFiles(const std::vector<FileLocator>& sources,
const std::vector<FileLocator>& destinations,
const io::IOContext& io_context, int64_t chunk_size, bool use_threads) {
if (sources.size() != destinations.size()) {
return Status::Invalid("Trying to copy ", sources.size(), " files into ",
destinations.size(), " paths.");
}
auto copy_one_file = [&](int i) {
if (sources[i].filesystem->Equals(destinations[i].filesystem)) {
return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path);
}
ARROW_ASSIGN_OR_RAISE(auto source,
sources[i].filesystem->OpenInputStream(sources[i].path));
ARROW_ASSIGN_OR_RAISE(auto destination, destinations[i].filesystem->OpenOutputStream(
destinations[i].path));
RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size, io_context));
return destination->Close();
};
return ::arrow::internal::OptionalParallelFor(
use_threads, static_cast<int>(sources.size()), std::move(copy_one_file),
io_context.executor());
}
Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs,
const FileSelector& source_sel,
const std::shared_ptr<FileSystem>& destination_fs,
const std::string& destination_base_dir, const io::IOContext& io_context,
int64_t chunk_size, bool use_threads) {
ARROW_ASSIGN_OR_RAISE(auto source_infos, source_fs->GetFileInfo(source_sel));
if (source_infos.empty()) {
return Status::OK();
}
std::vector<FileLocator> sources, destinations;
std::vector<std::string> dirs;
for (const FileInfo& source_info : source_infos) {
auto relative = internal::RemoveAncestor(source_sel.base_dir, source_info.path());
if (!relative.has_value()) {
return Status::Invalid("GetFileInfo() yielded path '", source_info.path(),
"', which is outside base dir '", source_sel.base_dir, "'");
}
auto destination_path =
internal::ConcatAbstractPath(destination_base_dir, relative->to_string());
if (source_info.IsDirectory()) {
dirs.push_back(destination_path);
} else if (source_info.IsFile()) {
sources.push_back({source_fs, source_info.path()});
destinations.push_back({destination_fs, destination_path});
}
}
auto create_one_dir = [&](int i) { return destination_fs->CreateDir(dirs[i]); };
dirs = internal::MinimalCreateDirSet(std::move(dirs));
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
use_threads, static_cast<int>(dirs.size()), std::move(create_one_dir),
io_context.executor()));
return CopyFiles(sources, destinations, io_context, chunk_size, use_threads);
}
namespace {
Result<Uri> ParseFileSystemUri(const std::string& uri_string) {
Uri uri;
auto status = uri.Parse(uri_string);
if (!status.ok()) {
#ifdef _WIN32
// Could be a "file:..." URI with backslashes instead of regular slashes.
RETURN_NOT_OK(uri.Parse(ToSlashes(uri_string)));
if (uri.scheme() != "file") {
return status;
}
#else
return status;
#endif
}
return std::move(uri);
}
Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
const auto scheme = uri.scheme();
if (scheme == "file") {
std::string path;
ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
if (out_path != nullptr) {
*out_path = path;
}
return std::make_shared<LocalFileSystem>(options, io_context);
}
if (scheme == "hdfs" || scheme == "viewfs") {
#ifdef ARROW_HDFS
ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri));
if (out_path != nullptr) {
*out_path = uri.path();
}
ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options, io_context));
return hdfs;
#else
return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support");
#endif
}
if (scheme == "s3") {
#ifdef ARROW_S3
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context));
return s3fs;
#else
return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support");
#endif
}
if (scheme == "mock") {
// MockFileSystem does not have an absolute / relative path distinction,
// normalize path by removing leading slash.
if (out_path != nullptr) {
*out_path = std::string(RemoveLeadingSlash(uri.path()));
}
return std::make_shared<internal::MockFileSystem>(internal::CurrentTimePoint(),
io_context);
}
return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string);
}
} // namespace
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
std::string* out_path) {
return FileSystemFromUri(uri_string, io::default_io_context(), out_path);
}
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string));
return FileSystemFromUriReal(fsuri, uri_string, io_context, out_path);
}
Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(const std::string& uri_string,
std::string* out_path) {
return FileSystemFromUriOrPath(uri_string, io::default_io_context(), out_path);
}
Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
const std::string& uri_string, const io::IOContext& io_context,
std::string* out_path) {
if (internal::DetectAbsolutePath(uri_string)) {
// Normalize path separators
if (out_path != nullptr) {
*out_path = ToSlashes(uri_string);
}
return std::make_shared<LocalFileSystem>();
}
return FileSystemFromUri(uri_string, io_context, out_path);
}
Status FileSystemFromUri(const std::string& uri, std::shared_ptr<FileSystem>* out_fs,
std::string* out_path) {
return FileSystemFromUri(uri, out_path).Value(out_fs);
}
Status Initialize(const FileSystemGlobalOptions& options) {
internal::global_options = options;
return Status::OK();
}
} // namespace fs
} // namespace arrow