blob: b7b6f314207695a7fe723f6ef83ddcc145512b7f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <sstream>
#include <string>
#include <tuple>
#include <utility>
#include <aws/core/Aws.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/http/HttpTypes.h>
#include <aws/core/utils/DateTime.h>
#include <aws/core/utils/StringUtils.h>
#include "arrow/filesystem/filesystem.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/print.h"
#include "arrow/util/string_view.h"
namespace arrow {
namespace fs {
namespace internal {
#define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \
auto outcome_name = (rexpr); \
if (!outcome_name.IsSuccess()) { \
return ErrorToStatus(outcome_name.GetError()); \
} \
lhs = std::move(outcome_name).GetResultWithOwnership();
#define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y)
#define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \
ARROW_AWS_ASSIGN_OR_RAISE_IMPL( \
ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
// XXX Should we expose this at some point?
enum class S3Backend { Amazon, Minio, Other };
// Detect the S3 backend type from the S3 server's response headers
S3Backend DetectS3Backend(const Aws::Http::HeaderValueCollection& headers) {
const auto it = headers.find("server");
if (it != headers.end()) {
const auto& value = util::string_view(it->second);
if (value.find("AmazonS3") != std::string::npos) {
return S3Backend::Amazon;
}
if (value.find("MinIO") != std::string::npos) {
return S3Backend::Minio;
}
}
return S3Backend::Other;
}
template <typename Error>
S3Backend DetectS3Backend(const Aws::Client::AWSError<Error>& error) {
return DetectS3Backend(error.GetResponseHeaders());
}
template <typename Error>
inline bool IsConnectError(const Aws::Client::AWSError<Error>& error) {
if (error.ShouldRetry()) {
return true;
}
// Sometimes Minio may fail with a 503 error
// (exception name: XMinioServerNotInitialized,
// message: "Server not initialized, please try again")
if (error.GetExceptionName() == "XMinioServerNotInitialized") {
return true;
}
return false;
}
inline bool IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
const auto error_type = error.GetErrorType();
return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET ||
error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
}
inline bool IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
const auto error_type = error.GetErrorType();
return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS ||
error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU);
}
// TODO qualify error messages with a prefix indicating context
// (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...")
template <typename ErrorType>
Status ErrorToStatus(const std::string& prefix,
const Aws::Client::AWSError<ErrorType>& error) {
// XXX Handle fine-grained error types
// See
// https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371
return Status::IOError(prefix, "AWS Error [code ",
static_cast<int>(error.GetErrorType()),
"]: ", error.GetMessage());
}
template <typename ErrorType, typename... Args>
Status ErrorToStatus(const std::tuple<Args&...>& prefix,
const Aws::Client::AWSError<ErrorType>& error) {
std::stringstream ss;
::arrow::internal::PrintTuple(&ss, prefix);
return ErrorToStatus(ss.str(), error);
}
template <typename ErrorType>
Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
return ErrorToStatus(std::string(), error);
}
template <typename AwsResult, typename Error>
Status OutcomeToStatus(const std::string& prefix,
const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
if (outcome.IsSuccess()) {
return Status::OK();
} else {
return ErrorToStatus(prefix, outcome.GetError());
}
}
template <typename AwsResult, typename Error, typename... Args>
Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
if (outcome.IsSuccess()) {
return Status::OK();
} else {
return ErrorToStatus(prefix, outcome.GetError());
}
}
template <typename AwsResult, typename Error>
Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
return OutcomeToStatus(std::string(), outcome);
}
template <typename AwsResult, typename Error>
Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
if (outcome.IsSuccess()) {
return std::move(outcome).GetResultWithOwnership();
} else {
return ErrorToStatus(outcome.GetError());
}
}
inline Aws::String ToAwsString(const std::string& s) {
// Direct construction of Aws::String from std::string doesn't work because
// it uses a specific Allocator class.
return Aws::String(s.begin(), s.end());
}
inline util::string_view FromAwsString(const Aws::String& s) {
return {s.data(), s.length()};
}
inline Aws::String ToURLEncodedAwsString(const std::string& s) {
return Aws::Utils::StringUtils::URLEncode(s.data());
}
inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) {
return std::chrono::time_point_cast<std::chrono::nanoseconds>(dt.UnderlyingTimestamp());
}
// A connect retry strategy with a controlled max duration.
class ConnectRetryStrategy : public Aws::Client::RetryStrategy {
public:
static const int32_t kDefaultRetryInterval = 200; /* milliseconds */
static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */
explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval,
int32_t max_retry_duration = kDefaultMaxRetryDuration)
: retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {}
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
long attempted_retries) const override { // NOLINT
if (!IsConnectError(error)) {
// Not a connect error, don't retry
return false;
}
return attempted_retries * retry_interval_ < max_retry_duration_;
}
long CalculateDelayBeforeNextRetry( // NOLINT
const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
long attempted_retries) const override { // NOLINT
return retry_interval_;
}
protected:
int32_t retry_interval_;
int32_t max_retry_duration_;
};
} // namespace internal
} // namespace fs
} // namespace arrow