blob: 722ef0054d6d2822b17956437fc82e7e10fd2c2d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use std::sync::Arc;
use bytes::Bytes;
use http::Request;
use http::Response;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use opendal_core::raw::*;
use opendal_core::*;
use reqsign_core::Signer;
use reqsign_huaweicloud_obs::Credential;
use serde::Deserialize;
use serde::Serialize;
pub mod constants {
pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
}
pub struct ObsCore {
pub info: Arc<AccessorInfo>,
pub bucket: String,
pub root: String,
pub endpoint: String,
pub signer: Signer<Credential>,
}
impl Debug for ObsCore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObsCore")
.field("root", &self.root)
.field("bucket", &self.bucket)
.field("endpoint", &self.endpoint)
.finish_non_exhaustive()
}
}
impl ObsCore {
pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
let (mut parts, body) = req.into_parts();
self.signer
.sign(&mut parts, None)
.await
.map_err(|e| new_request_sign_error(e.into()))?;
Ok(Request::from_parts(parts, body))
}
pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> Result<Request<T>> {
let (mut parts, body) = req.into_parts();
self.signer
.sign(&mut parts, Some(duration))
.await
.map_err(|e| new_request_sign_error(e.into()))?;
Ok(Request::from_parts(parts, body))
}
#[inline]
pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
self.info.http_client().send(req).await
}
}
impl ObsCore {
pub async fn obs_get_object(
&self,
path: &str,
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
let req = self.obs_get_object_request(path, range, args)?;
let req = self.sign(req).await?;
self.info.http_client().fetch(req).await
}
pub fn obs_get_object_request(
&self,
path: &str,
range: BytesRange,
args: &OpRead,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut req = Request::get(&url);
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
if !range.is_full() {
req = req.header(http::header::RANGE, range.to_header())
}
if let Some(if_none_match) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, if_none_match);
}
let req = req
.extension(Operation::Read)
.body(Buffer::new())
.map_err(new_request_build_error)?;
Ok(req)
}
pub fn obs_put_object_request(
&self,
path: &str,
size: Option<u64>,
args: &OpWrite,
body: Buffer,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut req = Request::put(&url);
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}
if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime)
}
// Set user metadata headers.
if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
}
}
let req = req
.extension(Operation::Write)
.body(body)
.map_err(new_request_build_error)?;
Ok(req)
}
pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
let req = self.obs_head_object_request(path, args)?;
let req = self.sign(req).await?;
self.send(req).await
}
pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
// The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud.
// https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html
let mut req = Request::head(&url);
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
if let Some(if_none_match) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, if_none_match);
}
let req = req
.extension(Operation::Stat)
.body(Buffer::new())
.map_err(new_request_build_error)?;
Ok(req)
}
pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let req = Request::delete(&url);
let req = req
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
pub fn obs_append_object_request(
&self,
path: &str,
position: u64,
size: u64,
args: &OpWrite,
body: Buffer,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}?append&position={}",
self.endpoint,
percent_encode_path(&p),
position
);
let mut req = Request::post(&url);
req = req.header(CONTENT_LENGTH, size);
if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}
if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}
if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
let req = req
.extension(Operation::Write)
.body(body)
.map_err(new_request_build_error)?;
Ok(req)
}
pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
let source = build_abs_path(&self.root, from);
let target = build_abs_path(&self.root, to);
let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
let req = Request::put(&url)
.extension(Operation::Copy)
.header("x-obs-copy-source", &source)
.body(Buffer::new())
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
pub async fn obs_list_objects(
&self,
path: &str,
next_marker: &str,
delimiter: &str,
limit: Option<usize>,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let mut url = QueryPairsWriter::new(&self.endpoint);
if !path.is_empty() {
url = url.push("prefix", &percent_encode_path(&p));
}
if !delimiter.is_empty() {
url = url.push("delimiter", delimiter);
}
if let Some(limit) = limit {
url = url.push("max-keys", &limit.to_string());
}
if !next_marker.is_empty() {
url = url.push("marker", next_marker);
}
let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
pub async fn obs_initiate_multipart_upload(
&self,
path: &str,
content_type: Option<&str>,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
let mut req = Request::post(&url);
if let Some(mime) = content_type {
req = req.header(CONTENT_TYPE, mime)
}
let req = req
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
pub async fn obs_upload_part_request(
&self,
path: &str,
upload_id: &str,
part_number: usize,
size: Option<u64>,
body: Buffer,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}?partNumber={}&uploadId={}",
self.endpoint,
percent_encode_path(&p),
part_number,
percent_encode_path(upload_id)
);
let mut req = Request::put(&url);
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size);
}
let req = req
.extension(Operation::Write)
// Set body
.body(body)
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
pub async fn obs_complete_multipart_upload(
&self,
path: &str,
upload_id: &str,
parts: Vec<CompleteMultipartUploadRequestPart>,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}?uploadId={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(upload_id)
);
let req = Request::post(&url);
let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
part: parts.to_vec(),
})
.map_err(new_xml_serialize_error)?;
// Make sure content length has been set to avoid post with chunked encoding.
let req = req.header(CONTENT_LENGTH, content.len());
// Set content-type to `application/xml` to avoid mixed with form post.
let req = req.header(CONTENT_TYPE, "application/xml");
let req = req
.extension(Operation::Write)
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
/// Abort an on-going multipart upload.
pub async fn obs_abort_multipart_upload(
&self,
path: &str,
upload_id: &str,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}?uploadId={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(upload_id)
);
let req = Request::delete(&url)
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
let req = self.sign(req).await?;
self.send(req).await
}
}
/// Result of CreateMultipartUpload
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct InitiateMultipartUploadResult {
pub upload_id: String,
}
/// Request of CompleteMultipartUploadRequest
#[derive(Default, Debug, Serialize)]
#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
pub struct CompleteMultipartUploadRequest {
pub part: Vec<CompleteMultipartUploadRequestPart>,
}
#[derive(Clone, Default, Debug, Serialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct CompleteMultipartUploadRequestPart {
#[serde(rename = "PartNumber")]
pub part_number: usize,
///
///
/// quick-xml will do escape on `"` which leads to our serialized output is
/// not the same as aws s3's example.
///
/// Ideally, we could use `serialize_with` to address this (buf failed)
///
/// ```ignore
/// #[derive(Default, Debug, Serialize)]
/// #[serde(default, rename_all = "PascalCase")]
/// struct CompleteMultipartUploadRequestPart {
/// #[serde(rename = "PartNumber")]
/// part_number: usize,
/// #[serde(rename = "ETag", serialize_with = "partial_escape")]
/// etag: String,
/// }
///
/// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
/// where
/// S: serde::Serializer,
/// {
/// ser.serialize_str(&String::from_utf8_lossy(
/// &quick_xml::escape::partial_escape(s.as_bytes()),
/// ))
/// }
/// ```
///
/// ref: <https://github.com/tafia/quick-xml/issues/362>
#[serde(rename = "ETag")]
pub etag: String,
}
/// Output of `CompleteMultipartUpload` operation
#[derive(Debug, Default, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct CompleteMultipartUploadResult {
pub location: String,
pub bucket: String,
pub key: String,
#[serde(rename = "ETag")]
pub etag: String,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListObjectsOutput {
pub name: String,
pub prefix: String,
pub contents: Vec<ListObjectsOutputContent>,
pub common_prefixes: Vec<CommonPrefix>,
pub marker: String,
pub next_marker: Option<String>,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct CommonPrefix {
pub prefix: String,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListObjectsOutputContent {
pub key: String,
pub size: u64,
}
#[cfg(test)]
mod tests {
use bytes::Buf;
use super::*;
#[test]
fn test_parse_xml() {
let bs = bytes::Bytes::from(
r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
<Name>examplebucket</Name>
<Prefix>obj</Prefix>
<Marker>obj002</Marker>
<NextMarker>obj004</NextMarker>
<MaxKeys>1000</MaxKeys>
<IsTruncated>false</IsTruncated>
<Contents>
<Key>obj002</Key>
<LastModified>2015-07-01T02:11:19.775Z</LastModified>
<ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
<Size>9</Size>
<Owner>
<ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
</Owner>
<StorageClass>STANDARD</StorageClass>
</Contents>
<Contents>
<Key>obj003</Key>
<LastModified>2015-07-01T02:11:19.775Z</LastModified>
<ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
<Size>10</Size>
<Owner>
<ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
</Owner>
<StorageClass>STANDARD</StorageClass>
</Contents>
<CommonPrefixes>
<Prefix>hello</Prefix>
</CommonPrefixes>
<CommonPrefixes>
<Prefix>world</Prefix>
</CommonPrefixes>
</ListBucketResult>"#,
);
let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
assert_eq!(out.name, "examplebucket".to_string());
assert_eq!(out.prefix, "obj".to_string());
assert_eq!(out.marker, "obj002".to_string());
assert_eq!(out.next_marker, Some("obj004".to_string()),);
assert_eq!(
out.contents
.iter()
.map(|v| v.key.clone())
.collect::<Vec<String>>(),
["obj002", "obj003"],
);
assert_eq!(
out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
[9, 10],
);
assert_eq!(
out.common_prefixes
.iter()
.map(|v| v.prefix.clone())
.collect::<Vec<String>>(),
["hello", "world"],
)
}
}