| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::fmt::Debug; |
| use std::sync::Arc; |
| |
| use bytes::Bytes; |
| use http::Request; |
| use http::Response; |
| use http::header::CACHE_CONTROL; |
| use http::header::CONTENT_DISPOSITION; |
| use http::header::CONTENT_LENGTH; |
| use http::header::CONTENT_TYPE; |
| use http::header::IF_MATCH; |
| use http::header::IF_NONE_MATCH; |
| use opendal_core::raw::*; |
| use opendal_core::*; |
| use reqsign_core::Signer; |
| use reqsign_huaweicloud_obs::Credential; |
| use serde::Deserialize; |
| use serde::Serialize; |
| |
| pub mod constants { |
| pub const X_OBS_META_PREFIX: &str = "x-obs-meta-"; |
| pub const X_OBS_VERSION_ID: &str = "x-obs-version-id"; |
| } |
| |
| pub struct ObsCore { |
| pub info: Arc<AccessorInfo>, |
| pub bucket: String, |
| pub root: String, |
| pub endpoint: String, |
| |
| pub signer: Signer<Credential>, |
| } |
| |
| impl Debug for ObsCore { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("ObsCore") |
| .field("root", &self.root) |
| .field("bucket", &self.bucket) |
| .field("endpoint", &self.endpoint) |
| .finish_non_exhaustive() |
| } |
| } |
| |
| impl ObsCore { |
| pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> { |
| let (mut parts, body) = req.into_parts(); |
| |
| self.signer |
| .sign(&mut parts, None) |
| .await |
| .map_err(|e| new_request_sign_error(e.into()))?; |
| |
| Ok(Request::from_parts(parts, body)) |
| } |
| |
| pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> Result<Request<T>> { |
| let (mut parts, body) = req.into_parts(); |
| |
| self.signer |
| .sign(&mut parts, Some(duration)) |
| .await |
| .map_err(|e| new_request_sign_error(e.into()))?; |
| |
| Ok(Request::from_parts(parts, body)) |
| } |
| |
| #[inline] |
| pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> { |
| self.info.http_client().send(req).await |
| } |
| } |
| |
| impl ObsCore { |
| pub async fn obs_get_object( |
| &self, |
| path: &str, |
| range: BytesRange, |
| args: &OpRead, |
| ) -> Result<Response<HttpBody>> { |
| let req = self.obs_get_object_request(path, range, args)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.info.http_client().fetch(req).await |
| } |
| |
| pub fn obs_get_object_request( |
| &self, |
| path: &str, |
| range: BytesRange, |
| args: &OpRead, |
| ) -> Result<Request<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); |
| |
| let mut req = Request::get(&url); |
| |
| if let Some(if_match) = args.if_match() { |
| req = req.header(IF_MATCH, if_match); |
| } |
| |
| if !range.is_full() { |
| req = req.header(http::header::RANGE, range.to_header()) |
| } |
| |
| if let Some(if_none_match) = args.if_none_match() { |
| req = req.header(IF_NONE_MATCH, if_none_match); |
| } |
| |
| let req = req |
| .extension(Operation::Read) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| Ok(req) |
| } |
| |
| pub fn obs_put_object_request( |
| &self, |
| path: &str, |
| size: Option<u64>, |
| args: &OpWrite, |
| body: Buffer, |
| ) -> Result<Request<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); |
| |
| let mut req = Request::put(&url); |
| |
| if let Some(size) = size { |
| req = req.header(CONTENT_LENGTH, size) |
| } |
| if let Some(cache_control) = args.cache_control() { |
| req = req.header(CACHE_CONTROL, cache_control) |
| } |
| |
| if let Some(mime) = args.content_type() { |
| req = req.header(CONTENT_TYPE, mime) |
| } |
| |
| // Set user metadata headers. |
| if let Some(user_metadata) = args.user_metadata() { |
| for (key, value) in user_metadata { |
| req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value) |
| } |
| } |
| |
| let req = req |
| .extension(Operation::Write) |
| .body(body) |
| .map_err(new_request_build_error)?; |
| |
| Ok(req) |
| } |
| |
| pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> { |
| let req = self.obs_head_object_request(path, args)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.send(req).await |
| } |
| |
| pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); |
| |
| // The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud. |
| // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html |
| |
| let mut req = Request::head(&url); |
| |
| if let Some(if_match) = args.if_match() { |
| req = req.header(IF_MATCH, if_match); |
| } |
| |
| if let Some(if_none_match) = args.if_none_match() { |
| req = req.header(IF_NONE_MATCH, if_none_match); |
| } |
| |
| let req = req |
| .extension(Operation::Stat) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| Ok(req) |
| } |
| |
| pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); |
| |
| let req = Request::delete(&url); |
| |
| let req = req |
| .extension(Operation::Delete) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.send(req).await |
| } |
| |
| pub fn obs_append_object_request( |
| &self, |
| path: &str, |
| position: u64, |
| size: u64, |
| args: &OpWrite, |
| body: Buffer, |
| ) -> Result<Request<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| let url = format!( |
| "{}/{}?append&position={}", |
| self.endpoint, |
| percent_encode_path(&p), |
| position |
| ); |
| |
| let mut req = Request::post(&url); |
| |
| req = req.header(CONTENT_LENGTH, size); |
| |
| if let Some(mime) = args.content_type() { |
| req = req.header(CONTENT_TYPE, mime); |
| } |
| |
| if let Some(pos) = args.content_disposition() { |
| req = req.header(CONTENT_DISPOSITION, pos); |
| } |
| |
| if let Some(cache_control) = args.cache_control() { |
| req = req.header(CACHE_CONTROL, cache_control) |
| } |
| |
| let req = req |
| .extension(Operation::Write) |
| .body(body) |
| .map_err(new_request_build_error)?; |
| |
| Ok(req) |
| } |
| |
| pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> { |
| let source = build_abs_path(&self.root, from); |
| let target = build_abs_path(&self.root, to); |
| |
| let source = format!("/{}/{}", self.bucket, percent_encode_path(&source)); |
| let url = format!("{}/{}", self.endpoint, percent_encode_path(&target)); |
| |
| let req = Request::put(&url) |
| .extension(Operation::Copy) |
| .header("x-obs-copy-source", &source) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.send(req).await |
| } |
| |
| pub async fn obs_list_objects( |
| &self, |
| path: &str, |
| next_marker: &str, |
| delimiter: &str, |
| limit: Option<usize>, |
| ) -> Result<Response<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| let mut url = QueryPairsWriter::new(&self.endpoint); |
| |
| if !path.is_empty() { |
| url = url.push("prefix", &percent_encode_path(&p)); |
| } |
| if !delimiter.is_empty() { |
| url = url.push("delimiter", delimiter); |
| } |
| if let Some(limit) = limit { |
| url = url.push("max-keys", &limit.to_string()); |
| } |
| if !next_marker.is_empty() { |
| url = url.push("marker", next_marker); |
| } |
| |
| let req = Request::get(url.finish()) |
| .extension(Operation::List) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.send(req).await |
| } |
| pub async fn obs_initiate_multipart_upload( |
| &self, |
| path: &str, |
| content_type: Option<&str>, |
| ) -> Result<Response<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p)); |
| let mut req = Request::post(&url); |
| |
| if let Some(mime) = content_type { |
| req = req.header(CONTENT_TYPE, mime) |
| } |
| |
| let req = req |
| .extension(Operation::Write) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.send(req).await |
| } |
| pub async fn obs_upload_part_request( |
| &self, |
| path: &str, |
| upload_id: &str, |
| part_number: usize, |
| size: Option<u64>, |
| body: Buffer, |
| ) -> Result<Response<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!( |
| "{}/{}?partNumber={}&uploadId={}", |
| self.endpoint, |
| percent_encode_path(&p), |
| part_number, |
| percent_encode_path(upload_id) |
| ); |
| |
| let mut req = Request::put(&url); |
| |
| if let Some(size) = size { |
| req = req.header(CONTENT_LENGTH, size); |
| } |
| |
| let req = req |
| .extension(Operation::Write) |
| // Set body |
| .body(body) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| |
| self.send(req).await |
| } |
| |
| pub async fn obs_complete_multipart_upload( |
| &self, |
| path: &str, |
| upload_id: &str, |
| parts: Vec<CompleteMultipartUploadRequestPart>, |
| ) -> Result<Response<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| let url = format!( |
| "{}/{}?uploadId={}", |
| self.endpoint, |
| percent_encode_path(&p), |
| percent_encode_path(upload_id) |
| ); |
| |
| let req = Request::post(&url); |
| |
| let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { |
| part: parts.to_vec(), |
| }) |
| .map_err(new_xml_serialize_error)?; |
| // Make sure content length has been set to avoid post with chunked encoding. |
| let req = req.header(CONTENT_LENGTH, content.len()); |
| // Set content-type to `application/xml` to avoid mixed with form post. |
| let req = req.header(CONTENT_TYPE, "application/xml"); |
| |
| let req = req |
| .extension(Operation::Write) |
| .body(Buffer::from(Bytes::from(content))) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| self.send(req).await |
| } |
| |
| /// Abort an on-going multipart upload. |
| pub async fn obs_abort_multipart_upload( |
| &self, |
| path: &str, |
| upload_id: &str, |
| ) -> Result<Response<Buffer>> { |
| let p = build_abs_path(&self.root, path); |
| |
| let url = format!( |
| "{}/{}?uploadId={}", |
| self.endpoint, |
| percent_encode_path(&p), |
| percent_encode_path(upload_id) |
| ); |
| |
| let req = Request::delete(&url) |
| .extension(Operation::Write) |
| .body(Buffer::new()) |
| .map_err(new_request_build_error)?; |
| |
| let req = self.sign(req).await?; |
| self.send(req).await |
| } |
| } |
| |
| /// Result of CreateMultipartUpload |
| #[derive(Default, Debug, Deserialize)] |
| #[serde(default, rename_all = "PascalCase")] |
| pub struct InitiateMultipartUploadResult { |
| pub upload_id: String, |
| } |
| |
| /// Request of CompleteMultipartUploadRequest |
| #[derive(Default, Debug, Serialize)] |
| #[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] |
| pub struct CompleteMultipartUploadRequest { |
| pub part: Vec<CompleteMultipartUploadRequestPart>, |
| } |
| |
| #[derive(Clone, Default, Debug, Serialize)] |
| #[serde(default, rename_all = "PascalCase")] |
| pub struct CompleteMultipartUploadRequestPart { |
| #[serde(rename = "PartNumber")] |
| pub part_number: usize, |
| /// |
| /// |
| /// quick-xml will do escape on `"` which leads to our serialized output is |
| /// not the same as aws s3's example. |
| /// |
| /// Ideally, we could use `serialize_with` to address this (buf failed) |
| /// |
| /// ```ignore |
| /// #[derive(Default, Debug, Serialize)] |
| /// #[serde(default, rename_all = "PascalCase")] |
| /// struct CompleteMultipartUploadRequestPart { |
| /// #[serde(rename = "PartNumber")] |
| /// part_number: usize, |
| /// #[serde(rename = "ETag", serialize_with = "partial_escape")] |
| /// etag: String, |
| /// } |
| /// |
| /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error> |
| /// where |
| /// S: serde::Serializer, |
| /// { |
| /// ser.serialize_str(&String::from_utf8_lossy( |
| /// &quick_xml::escape::partial_escape(s.as_bytes()), |
| /// )) |
| /// } |
| /// ``` |
| /// |
| /// ref: <https://github.com/tafia/quick-xml/issues/362> |
| #[serde(rename = "ETag")] |
| pub etag: String, |
| } |
| |
| /// Output of `CompleteMultipartUpload` operation |
| #[derive(Debug, Default, Deserialize)] |
| #[serde(default, rename_all = "PascalCase")] |
| pub struct CompleteMultipartUploadResult { |
| pub location: String, |
| pub bucket: String, |
| pub key: String, |
| #[serde(rename = "ETag")] |
| pub etag: String, |
| } |
| |
| #[derive(Default, Debug, Deserialize)] |
| #[serde(default, rename_all = "PascalCase")] |
| pub struct ListObjectsOutput { |
| pub name: String, |
| pub prefix: String, |
| pub contents: Vec<ListObjectsOutputContent>, |
| pub common_prefixes: Vec<CommonPrefix>, |
| pub marker: String, |
| pub next_marker: Option<String>, |
| } |
| |
| #[derive(Default, Debug, Deserialize)] |
| #[serde(default, rename_all = "PascalCase")] |
| pub struct CommonPrefix { |
| pub prefix: String, |
| } |
| |
| #[derive(Default, Debug, Deserialize)] |
| #[serde(default, rename_all = "PascalCase")] |
| pub struct ListObjectsOutputContent { |
| pub key: String, |
| pub size: u64, |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use bytes::Buf; |
| |
| use super::*; |
| |
| #[test] |
| fn test_parse_xml() { |
| let bs = bytes::Bytes::from( |
| r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?> |
| <ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/"> |
| <Name>examplebucket</Name> |
| <Prefix>obj</Prefix> |
| <Marker>obj002</Marker> |
| <NextMarker>obj004</NextMarker> |
| <MaxKeys>1000</MaxKeys> |
| <IsTruncated>false</IsTruncated> |
| <Contents> |
| <Key>obj002</Key> |
| <LastModified>2015-07-01T02:11:19.775Z</LastModified> |
| <ETag>"a72e382246ac83e86bd203389849e71d"</ETag> |
| <Size>9</Size> |
| <Owner> |
| <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID> |
| </Owner> |
| <StorageClass>STANDARD</StorageClass> |
| </Contents> |
| <Contents> |
| <Key>obj003</Key> |
| <LastModified>2015-07-01T02:11:19.775Z</LastModified> |
| <ETag>"a72e382246ac83e86bd203389849e71d"</ETag> |
| <Size>10</Size> |
| <Owner> |
| <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID> |
| </Owner> |
| <StorageClass>STANDARD</StorageClass> |
| </Contents> |
| <CommonPrefixes> |
| <Prefix>hello</Prefix> |
| </CommonPrefixes> |
| <CommonPrefixes> |
| <Prefix>world</Prefix> |
| </CommonPrefixes> |
| </ListBucketResult>"#, |
| ); |
| let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success"); |
| |
| assert_eq!(out.name, "examplebucket".to_string()); |
| assert_eq!(out.prefix, "obj".to_string()); |
| assert_eq!(out.marker, "obj002".to_string()); |
| assert_eq!(out.next_marker, Some("obj004".to_string()),); |
| assert_eq!( |
| out.contents |
| .iter() |
| .map(|v| v.key.clone()) |
| .collect::<Vec<String>>(), |
| ["obj002", "obj003"], |
| ); |
| assert_eq!( |
| out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(), |
| [9, 10], |
| ); |
| assert_eq!( |
| out.common_prefixes |
| .iter() |
| .map(|v| v.prefix.clone()) |
| .collect::<Vec<String>>(), |
| ["hello", "world"], |
| ) |
| } |
| } |