blob: 75b1e71cc94d72dbc2ba1123c1ca187b9dbb2bd0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/filesystem/s3fs.h"
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>
#ifdef _WIN32
// Undefine preprocessor macros that interfere with AWS function / method names
#ifdef GetMessage
#undef GetMessage
#endif
#ifdef GetObject
#undef GetObject
#endif
#endif
#include <aws/core/Aws.h>
#include <aws/core/Region.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CompletedMultipartUpload.h>
#include <aws/s3/model/CompletedPart.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListBucketsResult.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include "arrow/buffer.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/s3_internal.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/atomic_shared_ptr.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/optional.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/windows_fixup.h"
namespace arrow {
using internal::TaskGroup;
using internal::Uri;
using io::internal::SubmitIO;
namespace fs {
using ::Aws::Client::AWSError;
using ::Aws::S3::S3Errors;
namespace S3Model = Aws::S3::Model;
using internal::ConnectRetryStrategy;
using internal::DetectS3Backend;
using internal::ErrorToStatus;
using internal::FromAwsDatetime;
using internal::FromAwsString;
using internal::IsAlreadyExists;
using internal::IsNotFound;
using internal::OutcomeToResult;
using internal::OutcomeToStatus;
using internal::S3Backend;
using internal::ToAwsString;
using internal::ToURLEncodedAwsString;
static const char kSep = '/';
namespace {
std::mutex aws_init_lock;
Aws::SDKOptions aws_options;
std::atomic<bool> aws_initialized(false);
Status DoInitializeS3(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;
#define LOG_LEVEL_CASE(level_name) \
case S3LogLevel::level_name: \
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
break;
switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}
#undef LOG_LEVEL_CASE
aws_options.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options.loggingOptions.logger_create_fn = [] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options.loggingOptions.logLevel);
};
Aws::InitAPI(aws_options);
aws_initialized.store(true);
return Status::OK();
}
} // namespace
Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoInitializeS3(options);
}
Status FinalizeS3() {
std::lock_guard<std::mutex> lock(aws_init_lock);
Aws::ShutdownAPI(aws_options);
aws_initialized.store(false);
return Status::OK();
}
Status EnsureS3Initialized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (!aws_initialized.load()) {
S3GlobalOptions options{S3LogLevel::Fatal};
return DoInitializeS3(options);
}
return Status::OK();
}
// -----------------------------------------------------------------------
// S3Options implementation
void S3Options::ConfigureDefaultCredentials() {
credentials_provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
}
void S3Options::ConfigureAnonymousCredentials() {
credentials_provider = std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
}
void S3Options::ConfigureAccessKey(const std::string& access_key,
const std::string& secret_key,
const std::string& session_token) {
credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
ToAwsString(access_key), ToAwsString(secret_key), ToAwsString(session_token));
}
void S3Options::ConfigureAssumeRoleCredentials(
const std::string& role_arn, const std::string& session_name,
const std::string& external_id, int load_frequency,
const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
credentials_provider = std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
ToAwsString(role_arn), ToAwsString(session_name), ToAwsString(external_id),
load_frequency, stsClient);
}
std::string S3Options::GetAccessKey() const {
auto credentials = credentials_provider->GetAWSCredentials();
return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
}
std::string S3Options::GetSecretKey() const {
auto credentials = credentials_provider->GetAWSCredentials();
return std::string(FromAwsString(credentials.GetAWSSecretKey()));
}
std::string S3Options::GetSessionToken() const {
auto credentials = credentials_provider->GetAWSCredentials();
return std::string(FromAwsString(credentials.GetSessionToken()));
}
S3Options S3Options::Defaults() {
S3Options options;
options.ConfigureDefaultCredentials();
return options;
}
S3Options S3Options::Anonymous() {
S3Options options;
options.ConfigureAnonymousCredentials();
return options;
}
S3Options S3Options::FromAccessKey(const std::string& access_key,
const std::string& secret_key,
const std::string& session_token) {
S3Options options;
options.ConfigureAccessKey(access_key, secret_key, session_token);
return options;
}
S3Options S3Options::FromAssumeRole(
const std::string& role_arn, const std::string& session_name,
const std::string& external_id, int load_frequency,
const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
S3Options options;
options.role_arn = role_arn;
options.session_name = session_name;
options.external_id = external_id;
options.load_frequency = load_frequency;
options.ConfigureAssumeRoleCredentials(role_arn, session_name, external_id,
load_frequency, stsClient);
return options;
}
Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
S3Options options;
const auto bucket = uri.host();
auto path = uri.path();
if (bucket.empty()) {
if (!path.empty()) {
return Status::Invalid("Missing bucket name in S3 URI");
}
} else {
if (path.empty()) {
path = bucket;
} else {
if (path[0] != '/') {
return Status::Invalid("S3 URI should absolute, not relative");
}
path = bucket + path;
}
}
if (out_path != nullptr) {
*out_path = std::string(internal::RemoveTrailingSlash(path));
}
std::unordered_map<std::string, std::string> options_map;
ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
for (const auto& kv : options_items) {
options_map.emplace(kv.first, kv.second);
}
const auto username = uri.username();
if (!username.empty()) {
options.ConfigureAccessKey(username, uri.password());
} else {
options.ConfigureDefaultCredentials();
}
bool region_set = false;
for (const auto& kv : options_map) {
if (kv.first == "region") {
options.region = kv.second;
region_set = true;
} else if (kv.first == "scheme") {
options.scheme = kv.second;
} else if (kv.first == "endpoint_override") {
options.endpoint_override = kv.second;
} else {
return Status::Invalid("Unexpected query parameter in S3 URI: '", kv.first, "'");
}
}
if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
// XXX Should we use a dedicated resolver with the given credentials?
ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
}
return options;
}
Result<S3Options> S3Options::FromUri(const std::string& uri_string,
std::string* out_path) {
Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
return FromUri(uri, out_path);
}
bool S3Options::Equals(const S3Options& other) const {
return (region == other.region && endpoint_override == other.endpoint_override &&
scheme == other.scheme && background_writes == other.background_writes &&
GetAccessKey() == other.GetAccessKey() &&
GetSecretKey() == other.GetSecretKey() &&
GetSessionToken() == other.GetSessionToken());
}
namespace {
Status CheckS3Initialized() {
if (!aws_initialized.load()) {
return Status::Invalid(
"S3 subsystem not initialized; please call InitializeS3() "
"before carrying out any S3-related operation");
}
return Status::OK();
}
// XXX Sanitize paths by removing leading slash?
struct S3Path {
std::string full_path;
std::string bucket;
std::string key;
std::vector<std::string> key_parts;
static Result<S3Path> FromString(const std::string& s) {
const auto src = internal::RemoveTrailingSlash(s);
auto first_sep = src.find_first_of(kSep);
if (first_sep == 0) {
return Status::Invalid("Path cannot start with a separator ('", s, "')");
}
if (first_sep == std::string::npos) {
return S3Path{std::string(src), std::string(src), "", {}};
}
S3Path path;
path.full_path = std::string(src);
path.bucket = std::string(src.substr(0, first_sep));
path.key = std::string(src.substr(first_sep + 1));
path.key_parts = internal::SplitAbstractPath(path.key);
RETURN_NOT_OK(Validate(&path));
return path;
}
static Status Validate(const S3Path* path) {
auto result = internal::ValidateAbstractPathParts(path->key_parts);
if (!result.ok()) {
return Status::Invalid(result.message(), " in path ", path->full_path);
} else {
return result;
}
}
Aws::String ToURLEncodedAwsString() const {
// URL-encode individual parts, not the '/' separator
Aws::String res;
res += internal::ToURLEncodedAwsString(bucket);
for (const auto& part : key_parts) {
res += kSep;
res += internal::ToURLEncodedAwsString(part);
}
return res;
}
S3Path parent() const {
DCHECK(!key_parts.empty());
auto parent = S3Path{"", bucket, "", key_parts};
parent.key_parts.pop_back();
parent.key = internal::JoinAbstractPath(parent.key_parts);
parent.full_path = parent.bucket + kSep + parent.key;
return parent;
}
bool has_parent() const { return !key.empty(); }
bool empty() const { return bucket.empty() && key.empty(); }
bool operator==(const S3Path& other) const {
return bucket == other.bucket && key == other.key;
}
};
// XXX return in OutcomeToStatus instead?
Status PathNotFound(const S3Path& path) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
Status PathNotFound(const std::string& bucket, const std::string& key) {
return ::arrow::fs::internal::PathNotFound(bucket + kSep + key);
}
Status NotAFile(const S3Path& path) {
return ::arrow::fs::internal::NotAFile(path.full_path);
}
Status ValidateFilePath(const S3Path& path) {
if (path.bucket.empty() || path.key.empty()) {
return NotAFile(path);
}
return Status::OK();
}
std::string FormatRange(int64_t start, int64_t length) {
// Format a HTTP range header value
std::stringstream ss;
ss << "bytes=" << start << "-" << start + length - 1;
return ss.str();
}
class S3Client : public Aws::S3::S3Client {
public:
using Aws::S3::S3Client::S3Client;
// To get a bucket's region, we must extract the "x-amz-bucket-region" header
// from the response to a HEAD bucket request.
// Unfortunately, the S3Client APIs don't let us access the headers of successful
// responses. So we have to cook a AWS request and issue it ourselves.
Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
auto uri = GeneratePresignedUrl(request.GetBucket(),
/*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
// NOTE: The signer region argument isn't passed here, as there's no easy
// way of computing it (the relevant method is private).
auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
Aws::Auth::SIGV4_SIGNER);
const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
: outcome.GetError().GetResponseCode();
const auto& headers = outcome.IsSuccess()
? outcome.GetResult().GetHeaderValueCollection()
: outcome.GetError().GetResponseHeaders();
const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
if (it == headers.end()) {
if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
return Status::IOError("Bucket '", request.GetBucket(), "' not found");
} else if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
request.GetBucket(), "': "),
outcome.GetError());
} else {
return Status::IOError("When resolving region for bucket '", request.GetBucket(),
"': missing 'x-amz-bucket-region' header in response");
}
}
return std::string(FromAwsString(it->second));
}
Result<std::string> GetBucketRegion(const std::string& bucket) {
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));
return GetBucketRegion(req);
}
};
// In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool.
template <bool Never = false>
void DisableRedirectsImpl(bool* followRedirects) {
*followRedirects = false;
}
// In AWS SDK >= 1.8, it's a Aws::Client::FollowRedirectsPolicy scoped enum.
template <typename PolicyEnum, PolicyEnum Never = PolicyEnum::NEVER>
void DisableRedirectsImpl(PolicyEnum* followRedirects) {
*followRedirects = Never;
}
void DisableRedirects(Aws::Client::ClientConfiguration* c) {
DisableRedirectsImpl(&c->followRedirects);
}
class ClientBuilder {
public:
explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
const Aws::Client::ClientConfiguration& config() const { return client_config_; }
Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
Result<std::shared_ptr<S3Client>> BuildClient() {
credentials_provider_ = options_.credentials_provider;
if (!options_.region.empty()) {
client_config_.region = ToAwsString(options_.region);
}
client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
if (options_.scheme == "http") {
client_config_.scheme = Aws::Http::Scheme::HTTP;
} else if (options_.scheme == "https") {
client_config_.scheme = Aws::Http::Scheme::HTTPS;
} else {
return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
}
client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
if (!internal::global_options.tls_ca_file_path.empty()) {
client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
}
if (!internal::global_options.tls_ca_dir_path.empty()) {
client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
}
const bool use_virtual_addressing = options_.endpoint_override.empty();
return std::make_shared<S3Client>(
credentials_provider_, client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing);
}
const S3Options& options() const { return options_; }
protected:
S3Options options_;
Aws::Client::ClientConfiguration client_config_;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
};
// -----------------------------------------------------------------------
// S3 region resolver
class RegionResolver {
public:
static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
RETURN_NOT_OK(resolver->Init());
return resolver;
}
static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
static std::shared_ptr<RegionResolver> instance;
auto resolver = arrow::internal::atomic_load(&instance);
if (resolver) {
return resolver;
}
auto maybe_resolver = Make(S3Options::Anonymous());
if (!maybe_resolver.ok()) {
return maybe_resolver;
}
// Make sure to always return the same instance even if several threads
// call DefaultInstance at once.
std::shared_ptr<RegionResolver> existing;
if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
*maybe_resolver)) {
return *maybe_resolver;
} else {
return existing;
}
}
Result<std::string> ResolveRegion(const std::string& bucket) {
std::unique_lock<std::mutex> lock(cache_mutex_);
auto it = cache_.find(bucket);
if (it != cache_.end()) {
return it->second;
}
lock.unlock();
ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
lock.lock();
// Note we don't cache a non-existent bucket, as the bucket could be created later
cache_[bucket] = region;
return region;
}
Result<std::string> ResolveRegionUncached(const std::string& bucket) {
return client_->GetBucketRegion(bucket);
}
protected:
explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
Status Init() {
DCHECK(builder_.options().endpoint_override.empty());
// On Windows with AWS SDK >= 1.8, it is necessary to disable redirects (ARROW-10085).
DisableRedirects(builder_.mutable_config());
return builder_.BuildClient().Value(&client_);
}
ClientBuilder builder_;
std::shared_ptr<S3Client> client_;
std::mutex cache_mutex_;
// XXX Should cache size be bounded? It must be quite unusual to query millions
// of different buckets in a single program invocation...
std::unordered_map<std::string, std::string> cache_;
};
// -----------------------------------------------------------------------
// S3 file stream implementations
// A non-copying iostream.
// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
public:
StringViewStream(const void* data, int64_t nbytes)
: Aws::Utils::Stream::PreallocatedStreamBuf(
reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
static_cast<size_t>(nbytes)),
std::iostream(this) {}
};
// By default, the AWS SDK reads object data into an auto-growing StringStream.
// To avoid copies, read directly into our preallocated buffer instead.
// See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
// functionally similar recipe.
Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
}
Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
const S3Path& path, int64_t start,
int64_t length, void* out) {
S3Model::GetObjectRequest req;
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));
req.SetRange(ToAwsString(FormatRange(start, length)));
req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
return OutcomeToResult(client->GetObject(req));
}
// A RandomAccessFile that reads from a S3 object
class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,
const io::IOContext& io_context, const S3Path& path,
int64_t size = kNoSize)
: client_(std::move(client)),
io_context_(io_context),
path_(path),
content_length_(size) {}
Status Init() {
// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first Read() call.
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
return Status::OK();
}
S3Model::HeadObjectRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
auto outcome = client_->HeadObject(req);
if (!outcome.IsSuccess()) {
if (IsNotFound(outcome.GetError())) {
return PathNotFound(path_);
} else {
return ErrorToStatus(
std::forward_as_tuple("When reading information for key '", path_.key,
"' in bucket '", path_.bucket, "': "),
outcome.GetError());
}
}
content_length_ = outcome.GetResult().GetContentLength();
DCHECK_GE(content_length_, 0);
return Status::OK();
}
Status CheckClosed() const {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
return Status::OK();
}
Status CheckPosition(int64_t position, const char* action) const {
if (position < 0) {
return Status::Invalid("Cannot ", action, " from negative position");
}
if (position > content_length_) {
return Status::IOError("Cannot ", action, " past end of file");
}
return Status::OK();
}
// RandomAccessFile APIs
Status Close() override {
client_ = nullptr;
closed_ = true;
return Status::OK();
}
bool closed() const override { return closed_; }
Result<int64_t> Tell() const override {
RETURN_NOT_OK(CheckClosed());
return pos_;
}
Result<int64_t> GetSize() override {
RETURN_NOT_OK(CheckClosed());
return content_length_;
}
Status Seek(int64_t position) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPosition(position, "seek"));
pos_ = position;
return Status::OK();
}
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPosition(position, "read"));
nbytes = std::min(nbytes, content_length_ - position);
if (nbytes == 0) {
return 0;
}
// Read the desired range of bytes
ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
GetObjectRange(client_.get(), path_, position, nbytes, out));
auto& stream = result.GetBody();
stream.ignore(nbytes);
// NOTE: the stream is a stringstream by default, there is no actual error
// to check for. However, stream.fail() may return true if EOF is reached.
return stream.gcount();
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPosition(position, "read"));
// No need to allocate more than the remaining number of bytes
nbytes = std::min(nbytes, content_length_ - position);
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool()));
if (nbytes > 0) {
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
ReadAt(position, nbytes, buf->mutable_data()));
DCHECK_LE(bytes_read, nbytes);
RETURN_NOT_OK(buf->Resize(bytes_read));
}
return std::move(buf);
}
Result<int64_t> Read(int64_t nbytes, void* out) override {
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
pos_ += bytes_read;
return bytes_read;
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
pos_ += buffer->size();
return std::move(buffer);
}
protected:
std::shared_ptr<Aws::S3::S3Client> client_;
const io::IOContext io_context_;
S3Path path_;
bool closed_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
};
// Minimum size for each part of a multipart upload, except for the last part.
// AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
// so I chose the safer value.
// (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
// An OutputStream that writes to a S3 object
class ObjectOutputStream final : public io::OutputStream {
protected:
struct UploadState;
public:
ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client,
const io::IOContext& io_context, const S3Path& path,
const S3Options& options)
: client_(std::move(client)),
io_context_(io_context),
path_(path),
background_writes_(options.background_writes) {}
~ObjectOutputStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
// even though it may be more expensive.
io::internal::CloseFromDestructor(this);
}
Status Init() {
// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
auto outcome = client_->CreateMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When initiating multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
outcome.GetError());
}
upload_id_ = outcome.GetResult().GetUploadId();
upload_state_ = std::make_shared<UploadState>();
closed_ = false;
return Status::OK();
}
Status Abort() override {
if (closed_) {
return Status::OK();
}
S3Model::AbortMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
auto outcome = client_->AbortMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
"' in bucket '", path_.bucket, "': "),
outcome.GetError());
}
current_part_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
}
// OutputStream interface
Status Close() override {
if (closed_) {
return Status::OK();
}
if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
}
// S3 mandates at least one part, upload an empty one if necessary
if (part_number_ == 1) {
RETURN_NOT_OK(UploadPart("", 0));
}
// Wait for in-progress uploads to finish (if async writes are enabled)
RETURN_NOT_OK(Flush());
// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));
S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));
auto outcome = client_->CompleteMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
outcome.GetError());
}
client_ = nullptr;
closed_ = true;
return Status::OK();
}
bool closed() const override { return closed_; }
Result<int64_t> Tell() const override {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
return pos_;
}
Status Write(const std::shared_ptr<Buffer>& buffer) override {
return DoWrite(buffer->data(), buffer->size(), buffer);
}
Status Write(const void* data, int64_t nbytes) override {
return DoWrite(data, nbytes);
}
Status DoWrite(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
if (!current_part_ && nbytes >= part_upload_threshold_) {
// No current part and data large enough, upload it directly
// (without copying if the buffer is owned)
RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
pos_ += nbytes;
return Status::OK();
}
// Can't upload data on its own, need to buffer it
if (!current_part_) {
ARROW_ASSIGN_OR_RAISE(
current_part_,
io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool()));
current_part_size_ = 0;
}
RETURN_NOT_OK(current_part_->Write(data, nbytes));
pos_ += nbytes;
current_part_size_ += nbytes;
if (current_part_size_ >= part_upload_threshold_) {
// Current part large enough, upload it
RETURN_NOT_OK(CommitCurrentPart());
}
return Status::OK();
}
Status Flush() override {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
upload_state_->cv.wait(lock,
[this]() { return upload_state_->parts_in_progress == 0; });
return upload_state_->status;
}
// Upload-related helpers
Status CommitCurrentPart() {
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
return UploadPart(buf);
}
Status UploadPart(std::shared_ptr<Buffer> buffer) {
return UploadPart(buffer->data(), buffer->size(), buffer);
}
Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetPartNumber(part_number_);
req.SetContentLength(nbytes);
if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
auto outcome = client_->UploadPart(req);
if (!outcome.IsSuccess()) {
return UploadPartError(req, outcome);
} else {
AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
}
} else {
// If the data isn't owned, make an immutable copy for the lifetime of the closure
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
}
req.SetBody(
std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
++upload_state_->parts_in_progress;
}
auto client = client_;
ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
return client->UploadPart(req);
}));
// The closure keeps the buffer and the upload state alive
auto state = upload_state_;
auto part_number = part_number_;
auto handler = [owned_buffer, state, part_number,
req](const Result<S3Model::UploadPartOutcome>& result) -> void {
HandleUploadOutcome(state, part_number, req, result);
};
fut.AddCallback(std::move(handler));
}
++part_number_;
// With up to 10000 parts in an upload (S3 limit), a stream writing chunks
// of exactly 5MB would be limited to 50GB total. To avoid that, we bump
// the upload threshold every 100 parts. So the pattern is:
// - part 1 to 99: 5MB threshold
// - part 100 to 199: 10MB threshold
// - part 200 to 299: 15MB threshold
// ...
// - part 9900 to 9999: 500MB threshold
// So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
// chunk sizes and avoiding too much buffering in the common case of a small-ish
// stream. If the limit's not enough, we can revisit.
if (part_number_ % 100 == 0) {
part_upload_threshold_ += kMinimumPartUpload;
}
return Status::OK();
}
static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
int part_number, const S3Model::UploadPartRequest& req,
const Result<S3Model::UploadPartOutcome>& result) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!result.ok()) {
state->status &= result.status();
} else {
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
state->status &= UploadPartError(req, outcome);
} else {
AddCompletedPart(state, part_number, outcome.GetResult());
}
}
// Notify completion
if (--state->parts_in_progress == 0) {
state->cv.notify_all();
}
}
static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
const S3Model::UploadPartResult& result) {
S3Model::CompletedPart part;
// Append ETag and part number for this uploaded part
// (will be needed for upload completion in Close())
part.SetPartNumber(part_number);
part.SetETag(result.GetETag());
int slot = part_number - 1;
if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
state->completed_parts.resize(slot + 1);
}
DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
state->completed_parts[slot] = std::move(part);
}
static Status UploadPartError(const S3Model::UploadPartRequest& req,
const S3Model::UploadPartOutcome& outcome) {
return ErrorToStatus(
std::forward_as_tuple("When uploading part for key '", req.GetKey(),
"' in bucket '", req.GetBucket(), "': "),
outcome.GetError());
}
protected:
std::shared_ptr<Aws::S3::S3Client> client_;
const io::IOContext io_context_;
const S3Path path_;
const bool background_writes_;
Aws::String upload_id_;
bool closed_ = true;
int64_t pos_ = 0;
int32_t part_number_ = 1;
std::shared_ptr<io::BufferOutputStream> current_part_;
int64_t current_part_size_ = 0;
int64_t part_upload_threshold_ = kMinimumPartUpload;
// This struct is kept alive through background writes to avoid problems
// in the completion handler.
struct UploadState {
std::mutex mutex;
std::condition_variable cv;
Aws::Vector<S3Model::CompletedPart> completed_parts;
int64_t parts_in_progress = 0;
Status status;
};
std::shared_ptr<UploadState> upload_state_;
};
// This function assumes info->path() is already set
void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
info->set_type(FileType::File);
info->set_size(static_cast<int64_t>(obj.GetContentLength()));
info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
}
void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
info->set_type(FileType::File);
info->set_size(static_cast<int64_t>(obj.GetSize()));
info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
}
struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
using ResultHandler = std::function<Status(const std::string& prefix,
const S3Model::ListObjectsV2Result&)>;
using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
std::shared_ptr<Aws::S3::S3Client> client_;
io::IOContext io_context_;
const std::string bucket_;
const std::string base_dir_;
const int32_t max_keys_;
const ResultHandler result_handler_;
const ErrorHandler error_handler_;
const RecursionHandler recursion_handler_;
template <typename... Args>
static Status Walk(Args&&... args) {
return WalkAsync(std::forward<Args>(args)...).status();
}
template <typename... Args>
static Future<> WalkAsync(Args&&... args) {
auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
return self->DoWalk();
}
TreeWalker(std::shared_ptr<Aws::S3::S3Client> client, io::IOContext io_context,
std::string bucket, std::string base_dir, int32_t max_keys,
ResultHandler result_handler, ErrorHandler error_handler,
RecursionHandler recursion_handler)
: client_(std::move(client)),
io_context_(io_context),
bucket_(std::move(bucket)),
base_dir_(std::move(base_dir)),
max_keys_(max_keys),
result_handler_(std::move(result_handler)),
error_handler_(std::move(error_handler)),
recursion_handler_(std::move(recursion_handler)) {}
private:
std::shared_ptr<TaskGroup> task_group_;
std::mutex mutex_;
Future<> DoWalk() {
task_group_ =
TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token());
WalkChild(base_dir_, /*nesting_depth=*/0);
// When this returns, ListObjectsV2 tasks either have finished or will exit early
return task_group_->FinishAsync();
}
bool ok() const { return task_group_->ok(); }
struct ListObjectsV2Handler {
std::shared_ptr<TreeWalker> walker;
std::string prefix;
int32_t nesting_depth;
S3Model::ListObjectsV2Request req;
Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
// Serialize calls to operation-specific handlers
if (!walker->ok()) {
// Early exit: avoid executing handlers if DoWalk() returned
return Status::OK();
}
if (!result.ok()) {
return result.status();
}
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
{
std::lock_guard<std::mutex> guard(walker->mutex_);
return walker->error_handler_(outcome.GetError());
}
}
return HandleResult(outcome.GetResult());
}
void SpawnListObjectsV2() {
auto cb = *this;
walker->task_group_->Append([cb]() mutable {
Result<S3Model::ListObjectsV2Outcome> result =
cb.walker->client_->ListObjectsV2(cb.req);
return cb(result);
});
}
Status HandleResult(const S3Model::ListObjectsV2Result& result) {
bool recurse;
{
// Only one thread should be running result_handler_/recursion_handler_ at a time
std::lock_guard<std::mutex> guard(walker->mutex_);
recurse = result.GetCommonPrefixes().size() > 0;
if (recurse) {
ARROW_ASSIGN_OR_RAISE(auto maybe_recurse,
walker->recursion_handler_(nesting_depth + 1));
recurse &= maybe_recurse;
}
RETURN_NOT_OK(walker->result_handler_(prefix, result));
}
if (recurse) {
walker->WalkChildren(result, nesting_depth + 1);
}
// If the result was truncated, issue a continuation request to get
// further directory entries.
if (result.GetIsTruncated()) {
DCHECK(!result.GetNextContinuationToken().empty());
req.SetContinuationToken(result.GetNextContinuationToken());
SpawnListObjectsV2();
}
return Status::OK();
}
void Start() {
req.SetBucket(ToAwsString(walker->bucket_));
if (!prefix.empty()) {
req.SetPrefix(ToAwsString(prefix) + kSep);
}
req.SetDelimiter(Aws::String() + kSep);
req.SetMaxKeys(walker->max_keys_);
SpawnListObjectsV2();
}
};
void WalkChild(std::string key, int32_t nesting_depth) {
ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
handler.Start();
}
void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
for (const auto& prefix : result.GetCommonPrefixes()) {
const auto child_key =
internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
WalkChild(std::string{child_key}, nesting_depth);
}
}
friend struct ListObjectsV2Handler;
};
} // namespace
// -----------------------------------------------------------------------
// S3 filesystem implementation
class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Impl> {
public:
ClientBuilder builder_;
io::IOContext io_context_;
std::shared_ptr<Aws::S3::S3Client> client_;
util::optional<S3Backend> backend_;
const int32_t kListObjectsMaxKeys = 1000;
// At most 1000 keys per multiple-delete request
const int32_t kMultipleDeleteMaxKeys = 1000;
// Limit recursing depth, since a recursion bomb can be created
const int32_t kMaxNestingDepth = 100;
explicit Impl(S3Options options, io::IOContext io_context)
: builder_(std::move(options)), io_context_(io_context) {}
Status Init() { return builder_.BuildClient().Value(&client_); }
const S3Options& options() const { return builder_.options(); }
std::string region() const {
return std::string(FromAwsString(builder_.config().region));
}
template <typename Error>
void SaveBackend(const Aws::Client::AWSError<Error>& error) {
if (!backend_ || *backend_ == S3Backend::Other) {
backend_ = DetectS3Backend(error);
}
}
// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
S3Model::CreateBucketConfiguration config;
S3Model::CreateBucketRequest req;
config.SetLocationConstraint(
S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
ToAwsString(options().region)));
req.SetBucket(ToAwsString(bucket));
req.SetCreateBucketConfiguration(config);
auto outcome = client_->CreateBucket(req);
if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
outcome.GetError());
}
return Status::OK();
}
// Create an object with empty contents. Successful if object already exists.
Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
S3Model::PutObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
return OutcomeToStatus(
std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
client_->PutObject(req));
}
Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
DCHECK(!key.empty());
return CreateEmptyObject(bucket, key + kSep);
}
Status DeleteObject(const std::string& bucket, const std::string& key) {
S3Model::DeleteObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
return OutcomeToStatus(
std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
client_->DeleteObject(req));
}
Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
req.SetKey(ToAwsString(dest_path.key));
// Copy source "Must be URL-encoded" according to AWS SDK docs.
req.SetCopySource(src_path.ToURLEncodedAwsString());
return OutcomeToStatus(
std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
src_path.bucket, "' to key '", dest_path.key,
"' in bucket '", dest_path.bucket, "': "),
client_->CopyObject(req));
}
// On Minio, an empty "directory" doesn't satisfy the same API requests as
// a non-empty "directory". This is a Minio-specific quirk, but we need
// to handle it for unit testing.
Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
S3Model::HeadObjectRequest req;
req.SetBucket(ToAwsString(bucket));
if (backend_ && *backend_ == S3Backend::Minio) {
// Minio wants a slash at the end, Amazon doesn't
req.SetKey(ToAwsString(key) + kSep);
} else {
req.SetKey(ToAwsString(key));
}
auto outcome = client_->HeadObject(req);
if (outcome.IsSuccess()) {
*out = true;
return Status::OK();
}
if (!backend_) {
SaveBackend(outcome.GetError());
DCHECK(backend_);
if (*backend_ == S3Backend::Minio) {
// Try again with separator-terminated key (see above)
return IsEmptyDirectory(bucket, key, out);
}
}
if (IsNotFound(outcome.GetError())) {
*out = false;
return Status::OK();
}
return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
"' in bucket '", bucket, "': "),
outcome.GetError());
}
Status IsEmptyDirectory(const S3Path& path, bool* out) {
return IsEmptyDirectory(path.bucket, path.key, out);
}
Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
S3Model::ListObjectsV2Request req;
req.SetBucket(ToAwsString(path.bucket));
req.SetPrefix(ToAwsString(path.key) + kSep);
req.SetDelimiter(Aws::String() + kSep);
req.SetMaxKeys(1);
auto outcome = client_->ListObjectsV2(req);
if (outcome.IsSuccess()) {
*out = outcome.GetResult().GetKeyCount() > 0;
return Status::OK();
}
if (IsNotFound(outcome.GetError())) {
*out = false;
return Status::OK();
}
return ErrorToStatus(
std::forward_as_tuple("When listing objects under key '", path.key,
"' in bucket '", path.bucket, "': "),
outcome.GetError());
}
Status CheckNestingDepth(int32_t nesting_depth) {
if (nesting_depth >= kMaxNestingDepth) {
return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
kMaxNestingDepth, ")");
}
return Status::OK();
}
// A helper class for Walk and WalkAsync
struct FileInfoCollector {
FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
: bucket(std::move(bucket)),
key(std::move(key)),
allow_not_found(select.allow_not_found) {}
Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
std::vector<FileInfo>* out) {
// Walk "directories"
for (const auto& child_prefix : result.GetCommonPrefixes()) {
is_empty = false;
const auto child_key =
internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
std::stringstream child_path;
child_path << bucket << kSep << child_key;
FileInfo info;
info.set_path(child_path.str());
info.set_type(FileType::Directory);
out->push_back(std::move(info));
}
// Walk "files"
for (const auto& obj : result.GetContents()) {
is_empty = false;
FileInfo info;
const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
if (child_key == util::string_view(prefix)) {
// Amazon can return the "directory" key itself as part of the results, skip
continue;
}
std::stringstream child_path;
child_path << bucket << kSep << child_key;
info.set_path(child_path.str());
FileObjectToInfo(obj, &info);
out->push_back(std::move(info));
}
return Status::OK();
}
Status Finish(Impl* impl) {
// If no contents were found, perhaps it's an empty "directory",
// or perhaps it's a nonexistent entry. Check.
if (is_empty && !allow_not_found) {
bool is_actually_empty;
RETURN_NOT_OK(impl->IsEmptyDirectory(bucket, key, &is_actually_empty));
if (!is_actually_empty) {
return PathNotFound(bucket, key);
}
}
return Status::OK();
}
std::string bucket;
std::string key;
bool allow_not_found;
bool is_empty = true;
};
// Workhorse for GetFileInfo(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
FileInfoCollector collector(bucket, key, select);
auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
if (select.allow_not_found && IsNotFound(error)) {
return Status::OK();
}
return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
"' in bucket '", bucket, "': "),
error);
};
auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
return select.recursive && nesting_depth <= select.max_recursion;
};
auto handle_results = [&](const std::string& prefix,
const S3Model::ListObjectsV2Result& result) -> Status {
return collector.Collect(prefix, result, out);
};
RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
handle_results, handle_error, handle_recursion));
// If no contents were found, perhaps it's an empty "directory",
// or perhaps it's a nonexistent entry. Check.
RETURN_NOT_OK(collector.Finish(this));
// Sort results for convenience, since they can come massively out of order
std::sort(out->begin(), out->end(), FileInfo::ByPath{});
return Status::OK();
}
// Workhorse for GetFileInfoGenerator(FileSelector...)
FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& bucket,
const std::string& key) {
PushGenerator<std::vector<FileInfo>> gen;
auto producer = gen.producer();
auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
auto self = shared_from_this();
auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) -> Status {
if (select.allow_not_found && IsNotFound(error)) {
return Status::OK();
}
return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
"' in bucket '", bucket, "': "),
error);
};
auto handle_recursion = [select, self](int32_t nesting_depth) -> Result<bool> {
RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
return select.recursive && nesting_depth <= select.max_recursion;
};
auto handle_results =
[collector, producer](
const std::string& prefix,
const S3Model::ListObjectsV2Result& result) mutable -> Status {
std::vector<FileInfo> out;
RETURN_NOT_OK(collector->Collect(prefix, result, &out));
if (!out.empty()) {
producer.Push(std::move(out));
}
return Status::OK();
};
TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
handle_results, handle_error, handle_recursion)
.AddCallback([collector, producer,
self](const Result<::arrow::detail::Empty>& res) mutable {
auto st = collector->Finish(self.get());
if (!st.ok()) {
producer.Push(st);
}
producer.Close();
});
return gen;
}
Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
std::vector<std::string>* file_keys,
std::vector<std::string>* dir_keys) {
auto handle_results = [&](const std::string& prefix,
const S3Model::ListObjectsV2Result& result) -> Status {
// Walk "files"
file_keys->reserve(file_keys->size() + result.GetContents().size());
for (const auto& obj : result.GetContents()) {
file_keys->emplace_back(FromAwsString(obj.GetKey()));
}
// Walk "directories"
dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
for (const auto& prefix : result.GetCommonPrefixes()) {
dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
}
return Status::OK();
};
auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
"' in bucket '", bucket, "': "),
error);
};
auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
return true; // Recurse
};
return TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
handle_results, handle_error, handle_recursion);
}
// Delete multiple objects at once
Future<> DeleteObjectsAsync(const std::string& bucket,
const std::vector<std::string>& keys) {
struct DeleteCallback {
const std::string bucket;
Status operator()(const Result<S3Model::DeleteObjectsOutcome>& result) {
if (!result.ok()) {
return result.status();
}
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
return ErrorToStatus(outcome.GetError());
}
// Also need to check per-key errors, even on successful outcome
// See
// https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
const auto& errors = outcome.GetResult().GetErrors();
if (!errors.empty()) {
std::stringstream ss;
ss << "Got the following " << errors.size()
<< " errors when deleting objects in S3 bucket '" << bucket << "':\n";
for (const auto& error : errors) {
ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
}
return Status::IOError(ss.str());
}
return Status::OK();
}
};
const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
DeleteCallback delete_cb{bucket};
auto client = client_;
std::vector<Future<>> futures;
futures.reserve(keys.size() / chunk_size + 1);
for (size_t start = 0; start < keys.size(); start += chunk_size) {
S3Model::DeleteObjectsRequest req;
S3Model::Delete del;
for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
}
req.SetBucket(ToAwsString(bucket));
req.SetDelete(std::move(del));
ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
return client->DeleteObjects(req);
}));
futures.push_back(std::move(fut).Then(delete_cb));
}
return AllComplete(futures);
}
Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
return DeleteObjectsAsync(bucket, keys).status();
}
Status DeleteDirContents(const std::string& bucket, const std::string& key) {
std::vector<std::string> file_keys;
std::vector<std::string> dir_keys;
RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
// No contents found, is it an empty directory?
bool exists = false;
RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
if (!exists) {
return PathNotFound(bucket, key);
}
}
// First delete all "files", then delete all child "directories"
RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
// Delete directories in reverse lexicographic order, to ensure children
// are deleted before their parents (Minio).
std::sort(dir_keys.rbegin(), dir_keys.rend());
return DeleteObjects(bucket, dir_keys);
}
Status EnsureDirectoryExists(const S3Path& path) {
if (!path.key.empty()) {
return CreateEmptyDir(path.bucket, path.key);
}
return Status::OK();
}
Status EnsureParentExists(const S3Path& path) {
if (path.has_parent()) {
return EnsureDirectoryExists(path.parent());
}
return Status::OK();
}
static Result<std::vector<std::string>> ProcessListBuckets(
const Aws::S3::Model::ListBucketsOutcome& outcome) {
if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
outcome.GetError());
}
std::vector<std::string> buckets;
buckets.reserve(outcome.GetResult().GetBuckets().size());
for (const auto& bucket : outcome.GetResult().GetBuckets()) {
buckets.emplace_back(FromAwsString(bucket.GetName()));
}
return buckets;
}
Result<std::vector<std::string>> ListBuckets() {
auto outcome = client_->ListBuckets();
return ProcessListBuckets(outcome);
}
Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
auto self = shared_from_this();
return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
.Then(Impl::ProcessListBuckets);
}
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
S3FileSystem* fs) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
S3FileSystem* fs) {
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return ::arrow::fs::internal::NotAFile(info.path());
}
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
auto ptr =
std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
};
S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context)
: FileSystem(io_context), impl_(std::make_shared<Impl>(options, io_context)) {
default_async_is_sync_ = false;
}
S3FileSystem::~S3FileSystem() {}
Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
const S3Options& options, const io::IOContext& io_context) {
RETURN_NOT_OK(CheckS3Initialized());
std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
RETURN_NOT_OK(ptr->impl_->Init());
return ptr;
}
bool S3FileSystem::Equals(const FileSystem& other) const {
if (this == &other) {
return true;
}
if (other.type_name() != type_name()) {
return false;
}
const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
return options().Equals(s3fs.options());
}
S3Options S3FileSystem::options() const { return impl_->options(); }
std::string S3FileSystem::region() const { return impl_->region(); }
Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
FileInfo info;
info.set_path(s);
if (path.empty()) {
// It's the root path ""
info.set_type(FileType::Directory);
return info;
} else if (path.key.empty()) {
// It's a bucket
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(path.bucket));
auto outcome = impl_->client_->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(
std::forward_as_tuple("When getting information for bucket '", path.bucket,
"': "),
outcome.GetError());
}
info.set_type(FileType::NotFound);
return info;
}
// NOTE: S3 doesn't have a bucket modification time. Only a creation
// time is available, and you have to list all buckets to get it.
info.set_type(FileType::Directory);
return info;
} else {
// It's an object
S3Model::HeadObjectRequest req;
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));
auto outcome = impl_->client_->HeadObject(req);
if (outcome.IsSuccess()) {
// "File" object found
FileObjectToInfo(outcome.GetResult(), &info);
return info;
}
if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(
std::forward_as_tuple("When getting information for key '", path.key,
"' in bucket '", path.bucket, "': "),
outcome.GetError());
}
// Not found => perhaps it's an empty "directory"
bool is_dir = false;
RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
if (is_dir) {
info.set_type(FileType::Directory);
return info;
}
// Not found => perhaps it's a non-empty "directory"
RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
if (is_dir) {
info.set_type(FileType::Directory);
} else {
info.set_type(FileType::NotFound);
}
return info;
}
}
Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
FileInfoVector results;
if (base_path.empty()) {
// List all buckets
ARROW_ASSIGN_OR_RAISE(auto buckets, impl_->ListBuckets());
for (const auto& bucket : buckets) {
FileInfo info;
info.set_path(bucket);
info.set_type(FileType::Directory);
results.push_back(std::move(info));
if (select.recursive) {
RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
}
}
return results;
}
// Nominal case -> walk a single bucket
RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
return results;
}
FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) {
auto maybe_base_path = S3Path::FromString(select.base_dir);
if (!maybe_base_path.ok()) {
return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
}
auto base_path = *std::move(maybe_base_path);
if (base_path.empty()) {
// List all buckets, then possibly recurse
PushGenerator<AsyncGenerator<FileInfoVector>> gen;
auto producer = gen.producer();
auto fut = impl_->ListBucketsAsync(io_context());
auto impl = impl_->shared_from_this();
fut.AddCallback(
[producer, select, impl](const Result<std::vector<std::string>>& res) mutable {
if (!res.ok()) {
producer.Push(res.status());
producer.Close();
return;
}
FileInfoVector buckets;
for (const auto& bucket : *res) {
buckets.push_back(FileInfo{bucket, FileType::Directory});
}
// Generate all bucket infos
auto buckets_fut = Future<FileInfoVector>::MakeFinished(std::move(buckets));
producer.Push(MakeSingleFutureGenerator(buckets_fut));
if (select.recursive) {
// Generate recursive walk for each bucket in turn
for (const auto& bucket : *buckets_fut.result()) {
producer.Push(impl->WalkAsync(select, bucket.path(), ""));
}
}
producer.Close();
});
return MakeConcatenatedGenerator(
AsyncGenerator<AsyncGenerator<FileInfoVector>>{std::move(gen)});
}
// Nominal case -> walk a single bucket
return impl_->WalkAsync(select, base_path.bucket, base_path.key);
}
Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.key.empty()) {
// Create bucket
return impl_->CreateBucket(path.bucket);
}
// Create object
if (recursive) {
// Ensure bucket exists
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
// Ensure that all parents exist, then the directory itself
std::string parent_key;
for (const auto& part : path.key_parts) {
parent_key += part;
parent_key += kSep;
RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
}
return Status::OK();
} else {
// Check parent dir exists
if (path.has_parent()) {
S3Path parent_path = path.parent();
bool exists;
RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
if (!exists) {
RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
}
if (!exists) {
return Status::IOError("Cannot create directory '", path.full_path,
"': parent directory does not exist");
}
}
// XXX Should we check that no non-directory entry exists?
// Minio does it for us, not sure about other S3 implementations.
return impl_->CreateEmptyDir(path.bucket, path.key);
}
}
Status S3FileSystem::DeleteDir(const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.empty()) {
return Status::NotImplemented("Cannot delete all S3 buckets");
}
RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
if (path.key.empty()) {
// Delete bucket
S3Model::DeleteBucketRequest req;
req.SetBucket(ToAwsString(path.bucket));
return OutcomeToStatus(
std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
impl_->client_->DeleteBucket(req));
} else {
// Delete "directory"
RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
// Parent may be implicitly deleted if it became empty, recreate it
return impl_->EnsureParentExists(path);
}
}
Status S3FileSystem::DeleteDirContents(const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.empty()) {
return Status::NotImplemented("Cannot delete all S3 buckets");
}
RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
// Directory may be implicitly deleted, recreate it
return impl_->EnsureDirectoryExists(path);
}
Status S3FileSystem::DeleteRootDirContents() {
return Status::NotImplemented("Cannot delete all S3 buckets");
}
Status S3FileSystem::DeleteFile(const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
// Check the object exists
S3Model::HeadObjectRequest req;
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));
auto outcome = impl_->client_->HeadObject(req);
if (!outcome.IsSuccess()) {
if (IsNotFound(outcome.GetError())) {
return PathNotFound(path);
} else {
return ErrorToStatus(
std::forward_as_tuple("When getting information for key '", path.key,
"' in bucket '", path.bucket, "': "),
outcome.GetError());
}
}
// Object found, delete it
RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
// Parent may be implicitly deleted if it became empty, recreate it
return impl_->EnsureParentExists(path);
}
Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
// XXX We don't implement moving directories as it would be too expensive:
// one must copy all directory contents one by one (including object data),
// then delete the original contents.
ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
RETURN_NOT_OK(ValidateFilePath(src_path));
ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
RETURN_NOT_OK(ValidateFilePath(dest_path));
if (src_path == dest_path) {
return Status::OK();
}
RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
// Source parent may be implicitly deleted if it became empty, recreate it
return impl_->EnsureParentExists(src_path);
}
Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
RETURN_NOT_OK(ValidateFilePath(src_path));
ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
RETURN_NOT_OK(ValidateFilePath(dest_path));
if (src_path == dest_path) {
return Status::OK();
}
return impl_->CopyObject(src_path, dest_path);
}
Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
const std::string& s) {
return impl_->OpenInputFile(s, this);
}
Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
const FileInfo& info) {
return impl_->OpenInputFile(info, this);
}
Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
const std::string& s) {
return impl_->OpenInputFile(s, this);
}
Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
const FileInfo& info) {
return impl_->OpenInputFile(info, this);
}
Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
impl_->options());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
const std::string& path) {
// XXX Investigate UploadPartCopy? Does it work with source == destination?
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
// (but would need to fall back to GET if the current data is < 5 MB)
return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
}
//
// Top-level utility functions
//
Result<std::string> ResolveBucketRegion(const std::string& bucket) {
ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
return resolver->ResolveRegion(bucket);
}
} // namespace fs
} // namespace arrow