blob: 82b10d5e38865892a23e8b2c204d4d44154c8fbf [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use std::sync::Arc;
use http::Response;
use http::StatusCode;
use http::Uri;
use log::debug;
use reqsign_aliyun_oss::AssumeRoleWithOidcCredentialProvider;
use reqsign_aliyun_oss::EnvCredentialProvider;
use reqsign_aliyun_oss::RequestSigner;
use reqsign_aliyun_oss::StaticCredentialProvider;
use reqsign_core::Context;
use reqsign_core::Env as _;
use reqsign_core::OsEnv;
use reqsign_core::ProvideCredentialChain;
use reqsign_core::Signer;
use reqsign_core::StaticEnv;
use reqsign_file_read_tokio::TokioFileRead;
use super::OSS_SCHEME;
use super::config::OssConfig;
use super::core::*;
use super::deleter::OssDeleter;
use super::error::parse_error;
use super::lister::OssLister;
use super::lister::OssListers;
use super::lister::OssObjectVersionsLister;
use super::writer::OssWriter;
use super::writer::OssWriters;
use opendal_core::raw::*;
use opendal_core::*;
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
/// Aliyun Object Storage Service (OSS) support
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct OssBuilder {
pub(super) config: OssConfig,
}
impl Debug for OssBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OssBuilder")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl OssBuilder {
/// Set root of this backend.
///
/// All operations will happen under this root.
pub fn root(mut self, root: &str) -> Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};
self
}
/// Set bucket name of this backend.
pub fn bucket(mut self, bucket: &str) -> Self {
self.config.bucket = bucket.to_string();
self
}
/// Set endpoint of this backend.
pub fn endpoint(mut self, endpoint: &str) -> Self {
if !endpoint.is_empty() {
// Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
}
self
}
/// Set addressing style for the endpoint.
///
/// Available values: `virtual`, `cname`, `path`.
///
/// - `virtual`: Use virtual addressing style, i.e. `http://bucket.oss-<region>.aliyuncs.com/object`
/// - `cname`: Use cname addressing style, i.e. `http://mydomain.com/object` with mydomain.com bound to your bucket.
/// - `path`: Use path addressing style. i.e. `http://oss-<region>.aliyuncs.com/bucket/object`
///
/// - If not set, default value is `virtual`.
pub fn addressing_style(mut self, addressing_style: &str) -> Self {
self.config.addressing_style = Some(addressing_style.to_string());
self
}
/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;
self
}
/// Set an endpoint for generating presigned urls.
///
/// You can offer a public endpoint like <https://oss-cn-beijing.aliyuncs.com> to return a presinged url for
/// public accessors, along with an internal endpoint like <https://oss-cn-beijing-internal.aliyuncs.com>
/// to access objects in a faster path.
///
/// - If presign_endpoint is set, we will use presign_endpoint on generating presigned urls.
/// - if not, we will use endpoint as default.
pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
if !endpoint.is_empty() {
// Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
}
self
}
/// Set addressing style for presign endpoint.
///
/// Similar to setting addressing style for endpoint.
///
/// - If both presign_endpoint and presign_addressing_style are not set, they are the same as endpoint's configurations.
///
/// - If presign_endpoint is set, but presign_addressing_style is not set, default value is `virtual`.
pub fn presign_addressing_style(mut self, addressing_style: &str) -> Self {
self.config.presign_addressing_style = Some(addressing_style.to_string());
self
}
/// Set access_key_id of this backend.
///
/// - If access_key_id is set, we will take user's input first.
/// - If not, we will try to load it from environment.
pub fn access_key_id(mut self, v: &str) -> Self {
if !v.is_empty() {
self.config.access_key_id = Some(v.to_string())
}
self
}
/// Set access_key_secret of this backend.
///
/// - If access_key_secret is set, we will take user's input first.
/// - If not, we will try to load it from environment.
pub fn access_key_secret(mut self, v: &str) -> Self {
if !v.is_empty() {
self.config.access_key_secret = Some(v.to_string())
}
self
}
/// Set security_token for this backend.
///
/// - If security_token is set, we will take user's input first.
/// - If not, we will try to load it from environment.
pub fn security_token(mut self, security_token: &str) -> Self {
if !security_token.is_empty() {
self.config.security_token = Some(security_token.to_string())
}
self
}
/// preprocess the endpoint option
fn parse_endpoint(
&self,
endpoint: &Option<String>,
bucket: &str,
addressing_style: AddressingStyle,
) -> Result<(String, String)> {
let (endpoint, host) = match endpoint.clone() {
Some(ep) => {
let uri = ep.parse::<Uri>().map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
.with_context("service", OSS_SCHEME)
.with_context("endpoint", &ep)
.set_source(err)
})?;
let host = uri.host().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
.with_context("service", OSS_SCHEME)
.with_context("endpoint", &ep)
})?;
let full_host = match addressing_style {
AddressingStyle::Virtual => {
if let Some(port) = uri.port_u16() {
format!("{bucket}.{host}:{port}")
} else {
format!("{bucket}.{host}")
}
}
AddressingStyle::Cname | AddressingStyle::Path => {
if let Some(port) = uri.port_u16() {
format!("{host}:{port}")
} else {
host.to_string()
}
}
};
if let Some(port) = uri.port_u16() {
format!("{bucket}.{host}:{port}")
} else {
format!("{bucket}.{host}")
};
let endpoint = match uri.scheme_str() {
Some(scheme_str) => match scheme_str {
"http" | "https" => format!("{scheme_str}://{full_host}"),
_ => {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"endpoint protocol is invalid",
)
.with_context("service", OSS_SCHEME));
}
},
None => format!("https://{full_host}"),
};
let endpoint = match addressing_style {
AddressingStyle::Path => format!("{}/{}", endpoint, bucket),
AddressingStyle::Cname | AddressingStyle::Virtual => endpoint,
};
(endpoint, full_host)
}
None => {
return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
.with_context("service", OSS_SCHEME));
}
};
Ok((endpoint, host))
}
/// Set server_side_encryption for this backend.
///
/// Available values: `AES256`, `KMS`.
///
/// Reference: <https://www.alibabacloud.com/help/en/object-storage-service/latest/server-side-encryption-5>
/// Brief explanation:
/// There are two server-side encryption methods available:
/// SSE-AES256:
/// 1. Configure the bucket encryption mode as OSS-managed and specify the encryption algorithm as AES256.
/// 2. Include the `x-oss-server-side-encryption` parameter in the request and set its value to AES256.
/// SSE-KMS:
/// 1. To use this service, you need to first enable KMS.
/// 2. Configure the bucket encryption mode as KMS, and specify the specific CMK ID for BYOK (Bring Your Own Key)
/// or not specify the specific CMK ID for OSS-managed KMS key.
/// 3. Include the `x-oss-server-side-encryption` parameter in the request and set its value to KMS.
/// 4. If a specific CMK ID is specified, include the `x-oss-server-side-encryption-key-id` parameter in the request, and set its value to the specified CMK ID.
pub fn server_side_encryption(mut self, v: &str) -> Self {
if !v.is_empty() {
self.config.server_side_encryption = Some(v.to_string())
}
self
}
/// Set server_side_encryption_key_id for this backend.
///
/// # Notes
///
/// This option only takes effect when server_side_encryption equals to KMS.
pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
if !v.is_empty() {
self.config.server_side_encryption_key_id = Some(v.to_string())
}
self
}
/// Set maximum batch operations of this backend.
#[deprecated(
since = "0.52.0",
note = "Please use `delete_max_size` instead of `batch_max_operations`"
)]
pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
self.config.delete_max_size = Some(delete_max_size);
self
}
/// Set maximum delete operations of this backend.
pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
self.config.delete_max_size = Some(delete_max_size);
self
}
/// Allow anonymous will allow opendal to send request without signing
/// when credential is not loaded.
pub fn allow_anonymous(mut self) -> Self {
self.config.allow_anonymous = true;
self
}
/// Set role_arn for this backend.
///
/// If `role_arn` is set, we will use already known config as source
/// credential to assume role with `role_arn`.
pub fn role_arn(mut self, role_arn: &str) -> Self {
if !role_arn.is_empty() {
self.config.role_arn = Some(role_arn.to_string())
}
self
}
/// Set role_session_name for this backend.
pub fn role_session_name(mut self, role_session_name: &str) -> Self {
if !role_session_name.is_empty() {
self.config.role_session_name = Some(role_session_name.to_string())
}
self
}
/// Set oidc_provider_arn for this backend.
pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
if !oidc_provider_arn.is_empty() {
self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
}
self
}
/// Set oidc_token_file for this backend.
pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
if !oidc_token_file.is_empty() {
self.config.oidc_token_file = Some(oidc_token_file.to_string())
}
self
}
/// Set sts_endpoint for this backend.
pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
if !sts_endpoint.is_empty() {
self.config.sts_endpoint = Some(sts_endpoint.to_string())
}
self
}
}
enum AddressingStyle {
Path,
Cname,
Virtual,
}
impl TryFrom<&Option<String>> for AddressingStyle {
type Error = Error;
fn try_from(value: &Option<String>) -> Result<Self> {
match value.as_deref() {
None | Some("virtual") => Ok(AddressingStyle::Virtual),
Some("path") => Ok(AddressingStyle::Path),
Some("cname") => Ok(AddressingStyle::Cname),
Some(v) => Err(Error::new(
ErrorKind::ConfigInvalid,
"Invalid addressing style, available: `virtual`, `path`, `cname`",
)
.with_context("service", OSS_SCHEME)
.with_context("addressing_style", v)),
}
}
}
impl Builder for OssBuilder {
type Config = OssConfig;
fn build(self) -> Result<impl Access> {
debug!("backend build started: {:?}", &self);
let root = normalize_root(&self.config.root.clone().unwrap_or_default());
debug!("backend use root {}", &root);
// Handle endpoint, region and bucket name.
let bucket = match self.config.bucket.is_empty() {
false => Ok(&self.config.bucket),
true => Err(
Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
.with_context("service", OSS_SCHEME),
),
}?;
// Retrieve endpoint and host by parsing the endpoint option and bucket. If presign_endpoint is not
// set, take endpoint as default presign_endpoint.
let (endpoint, host) = self.parse_endpoint(
&self.config.endpoint,
bucket,
(&self.config.addressing_style).try_into()?,
)?;
debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
let presign_endpoint = if self.config.presign_endpoint.is_some() {
self.parse_endpoint(
&self.config.presign_endpoint,
bucket,
(&self.config.presign_addressing_style).try_into()?,
)?
.0
} else {
endpoint.clone()
};
debug!("backend use presign_endpoint: {}", &presign_endpoint);
let server_side_encryption = match &self.config.server_side_encryption {
None => None,
Some(v) => Some(
build_header_value(v)
.map_err(|err| err.with_context("key", "server_side_encryption"))?,
),
};
let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
None => None,
Some(v) => Some(
build_header_value(v)
.map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
),
};
// NOTE: `AssumeRoleWithOidcCredentialProvider` still reads `role_arn`, `oidc_provider_arn`
// and `oidc_token_file` from `Context` environment variables at runtime. Until reqsign
// exposes typed builder APIs for all of them, we overlay config values into a `StaticEnv`
// snapshot here.
let os_env = OsEnv;
let mut envs = os_env.vars();
if let Some(v) = &self.config.role_arn {
envs.insert("ALIBABA_CLOUD_ROLE_ARN".to_string(), v.clone());
}
if let Some(v) = &self.config.oidc_provider_arn {
envs.insert("ALIBABA_CLOUD_OIDC_PROVIDER_ARN".to_string(), v.clone());
}
if let Some(v) = &self.config.oidc_token_file {
envs.insert("ALIBABA_CLOUD_OIDC_TOKEN_FILE".to_string(), v.clone());
}
let mut assume_role = AssumeRoleWithOidcCredentialProvider::new();
if let Some(sts_endpoint) = &self.config.sts_endpoint {
if sts_endpoint.starts_with("http://") || sts_endpoint.starts_with("https://") {
assume_role = assume_role.with_sts_endpoint(sts_endpoint.clone());
} else {
envs.insert(
"ALIBABA_CLOUD_STS_ENDPOINT".to_string(),
sts_endpoint.clone(),
);
}
}
if let Some(role_session_name) = &self.config.role_session_name {
assume_role = assume_role.with_role_session_name(role_session_name.clone());
}
let info = Arc::new(AccessorInfo::default());
let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(AccessorInfoHttpSend::new(info.clone()))
.with_env(StaticEnv {
home_dir: os_env.home_dir(),
envs,
});
let mut provider = ProvideCredentialChain::new()
.push(EnvCredentialProvider::new())
.push(assume_role);
if let (Some(ak), Some(sk)) = (&self.config.access_key_id, &self.config.access_key_secret) {
let static_provider = if let Some(token) = self.config.security_token.as_deref() {
StaticCredentialProvider::new(ak, sk).with_security_token(token)
} else {
StaticCredentialProvider::new(ak, sk)
};
provider = provider.push_front(static_provider);
}
let request_signer = RequestSigner::new(bucket);
let signer = Signer::new(ctx, provider, request_signer);
let delete_max_size = self
.config
.delete_max_size
.unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
Ok(OssBackend {
core: Arc::new(OssCore {
info: {
info.set_scheme(OSS_SCHEME)
.set_root(&root)
.set_name(bucket)
.set_native_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
stat_with_version: self.config.enable_versioning,
read: true,
read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.config.enable_versioning,
read_with_if_modified_since: true,
read_with_if_unmodified_since: true,
write: true,
write_can_empty: true,
write_can_append: true,
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
// TODO: set this to false while version has been enabled.
write_with_if_not_exists: !self.config.enable_versioning,
// The min multipart size of OSS is 100 KiB.
//
// ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
write_multi_min_size: Some(100 * 1024),
// The max multipart size of OSS is 5 GiB.
//
// ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
write_multi_max_size: if cfg!(target_pointer_width = "64") {
Some(5 * 1024 * 1024 * 1024)
} else {
Some(usize::MAX)
},
write_with_user_metadata: true,
delete: true,
delete_with_version: self.config.enable_versioning,
delete_max_size: Some(delete_max_size),
copy: true,
list: true,
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
list_with_versions: self.config.enable_versioning,
list_with_deleted: self.config.enable_versioning,
presign: true,
presign_stat: true,
presign_read: true,
presign_write: true,
shared: true,
..Default::default()
});
info.clone()
},
root,
bucket: bucket.to_owned(),
endpoint,
host,
presign_endpoint,
allow_anonymous: self.config.allow_anonymous,
signer,
server_side_encryption,
server_side_encryption_key_id,
}),
})
}
}
#[derive(Debug, Clone)]
/// Aliyun Object Storage Service backend
pub struct OssBackend {
core: Arc<OssCore>,
}
impl Access for OssBackend {
type Reader = HttpBody;
type Writer = OssWriters;
type Lister = OssListers;
type Deleter = oio::BatchDeleter<OssDeleter>;
fn info(&self) -> Arc<AccessorInfo> {
self.core.info.clone()
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let resp = self.core.oss_head_object(path, &args).await?;
let status = resp.status();
match status {
StatusCode::OK => {
let headers = resp.headers();
let mut meta = self.core.parse_metadata(path, resp.headers())?;
if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
meta.set_version(v);
}
Ok(RpStat::new(meta))
}
_ => Err(parse_error(resp)),
}
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.oss_get_object(path, &args).await?;
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
Ok((RpRead::default(), resp.into_body()))
}
_ => {
let (part, mut body) = resp.into_parts();
let buf = body.to_buffer().await?;
Err(parse_error(Response::from_parts(part, buf)))
}
}
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let writer = OssWriter::new(self.core.clone(), path, args.clone());
let w = if args.append() {
OssWriters::Two(oio::AppendWriter::new(writer))
} else {
OssWriters::One(oio::MultipartWriter::new(
self.core.info.clone(),
writer,
args.concurrent(),
))
};
Ok((RpWrite::default(), w))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
oio::BatchDeleter::new(
OssDeleter::new(self.core.clone()),
self.core.info.full_capability().delete_max_size,
),
))
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = if args.versions() || args.deleted() {
TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
self.core.clone(),
path,
args,
)))
} else {
TwoWays::One(oio::PageLister::new(OssLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};
Ok((RpList::default(), l))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let resp = self.core.oss_copy_object(from, to).await?;
let status = resp.status();
match status {
StatusCode::OK => Ok(RpCopy::default()),
_ => Err(parse_error(resp)),
}
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
// We will not send this request out, just for signing.
let req = match args.operation() {
PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
PresignOperation::Write(v) => {
self.core
.oss_put_object_request(path, None, v, Buffer::new(), true)
}
PresignOperation::Delete(_) => Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
)),
_ => Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
)),
};
let req = req?;
let req = self.core.sign_query(req, args.expire()).await?;
// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
Ok(RpPresign::new(PresignedRequest::new(
parts.method,
parts.uri,
parts.headers,
)))
}
}