| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::future::Future; |
| use std::str::FromStr; |
| use std::sync::Arc; |
| |
| use super::core::*; |
| use super::error::parse_error; |
| use opendal_core::raw::*; |
| use opendal_core::*; |
| use opendal_service_azblob::core::AzblobCore; |
| use opendal_service_azblob::writer::AzblobWriter; |
| use reqsign_azure_storage::RequestSigner; |
| use reqsign_azure_storage::StaticCredentialProvider; |
| use reqsign_core::Context; |
| use reqsign_core::Signer; |
| |
| pub struct GhacWriter(pub TwoWays<GhacWriterV1, GhacWriterV2>); |
| |
| impl GhacWriter { |
| /// TODO: maybe we can move the signed url logic to azblob service instead. |
| pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> { |
| match core.service_version { |
| GhacVersion::V1 => Ok(GhacWriter(TwoWays::One(GhacWriterV1 { |
| core, |
| path: write_path, |
| url, |
| size: 0, |
| }))), |
| GhacVersion::V2 => { |
| let uri = http::Uri::from_str(&url) |
| .map_err(new_http_uri_invalid_error)? |
| .into_parts(); |
| let (Some(scheme), Some(authority), Some(pq)) = |
| (uri.scheme, uri.authority, uri.path_and_query) |
| else { |
| return Err(Error::new( |
| ErrorKind::Unexpected, |
| "ghac returns invalid signed url", |
| ) |
| .with_context("url", &url)); |
| }; |
| let endpoint = format!("{scheme}://{authority}"); |
| let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else { |
| return Err(Error::new( |
| ErrorKind::Unexpected, |
| "ghac returns invalid signed url that bucket or path is missing", |
| ) |
| .with_context("url", &url)); |
| }; |
| let Some(query) = pq.query() else { |
| return Err(Error::new( |
| ErrorKind::Unexpected, |
| "ghac returns invalid signed url that sas is missing", |
| ) |
| .with_context("url", &url)); |
| }; |
| let signer = Signer::new( |
| Context::new(), |
| StaticCredentialProvider::new_sas_token(query), |
| RequestSigner::new(), |
| ); |
| let azure_core = Arc::new(AzblobCore { |
| info: { |
| let am = AccessorInfo::default(); |
| am.set_scheme("azblob") |
| .set_root("/") |
| .set_name(container) |
| .set_native_capability(Capability { |
| stat: true, |
| stat_with_if_match: true, |
| stat_with_if_none_match: true, |
| |
| read: true, |
| |
| read_with_if_match: true, |
| read_with_if_none_match: true, |
| read_with_override_content_disposition: true, |
| read_with_if_modified_since: true, |
| read_with_if_unmodified_since: true, |
| |
| write: true, |
| write_can_append: true, |
| write_can_empty: true, |
| write_can_multi: true, |
| write_with_cache_control: true, |
| write_with_content_type: true, |
| write_with_if_not_exists: true, |
| write_with_if_none_match: true, |
| write_with_user_metadata: true, |
| |
| copy: true, |
| |
| list: true, |
| list_with_recursive: true, |
| |
| shared: true, |
| |
| ..Default::default() |
| }); |
| |
| am.into() |
| }, |
| container: container.to_string(), |
| root: "/".to_string(), |
| endpoint, |
| encryption_key: None, |
| encryption_key_sha256: None, |
| encryption_algorithm: None, |
| signer, |
| }); |
| let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string()); |
| let writer = oio::BlockWriter::new(core.info.clone(), w, 4); |
| Ok(GhacWriter(TwoWays::Two(GhacWriterV2 { |
| core, |
| writer, |
| path: write_path, |
| url, |
| size: 0, |
| }))) |
| } |
| } |
| } |
| } |
| |
| impl oio::Write for GhacWriter { |
| fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend { |
| self.0.write(bs) |
| } |
| |
| fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend { |
| self.0.abort() |
| } |
| |
| fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend { |
| self.0.close() |
| } |
| } |
| |
| pub struct GhacWriterV1 { |
| core: Arc<GhacCore>, |
| |
| path: String, |
| url: String, |
| size: u64, |
| } |
| |
| impl oio::Write for GhacWriterV1 { |
| async fn write(&mut self, bs: Buffer) -> Result<()> { |
| let size = bs.len() as u64; |
| let offset = self.size; |
| |
| let resp = self.core.ghac_v1_write(&self.url, size, offset, bs).await?; |
| if !resp.status().is_success() { |
| return Err(parse_error(resp).with_operation("Backend::ghac_upload")); |
| } |
| self.size += size; |
| Ok(()) |
| } |
| |
| async fn abort(&mut self) -> Result<()> { |
| Ok(()) |
| } |
| |
| async fn close(&mut self) -> Result<Metadata> { |
| self.core |
| .ghac_finalize_upload(&self.path, &self.url, self.size) |
| .await?; |
| Ok(Metadata::default().with_content_length(self.size)) |
| } |
| } |
| |
| pub struct GhacWriterV2 { |
| core: Arc<GhacCore>, |
| writer: oio::BlockWriter<AzblobWriter>, |
| |
| path: String, |
| url: String, |
| size: u64, |
| } |
| |
| impl oio::Write for GhacWriterV2 { |
| async fn write(&mut self, bs: Buffer) -> Result<()> { |
| let size = bs.len() as u64; |
| |
| self.writer.write(bs).await?; |
| self.size += size; |
| Ok(()) |
| } |
| |
| async fn close(&mut self) -> Result<Metadata> { |
| self.writer.close().await?; |
| let _ = self |
| .core |
| .ghac_finalize_upload(&self.path, &self.url, self.size) |
| .await; |
| Ok(Metadata::default().with_content_length(self.size)) |
| } |
| |
| async fn abort(&mut self) -> Result<()> { |
| Ok(()) |
| } |
| } |