blob: d54eb914e0efcd2c2c485f40c9587fcd2cc603a1 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use super::core::*;
use super::error::parse_error;
use opendal_core::raw::*;
use opendal_core::*;
use opendal_service_azblob::core::AzblobCore;
use opendal_service_azblob::writer::AzblobWriter;
use reqsign_azure_storage::RequestSigner;
use reqsign_azure_storage::StaticCredentialProvider;
use reqsign_core::Context;
use reqsign_core::Signer;
pub struct GhacWriter(pub TwoWays<GhacWriterV1, GhacWriterV2>);
impl GhacWriter {
/// TODO: maybe we can move the signed url logic to azblob service instead.
pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> {
match core.service_version {
GhacVersion::V1 => Ok(GhacWriter(TwoWays::One(GhacWriterV1 {
core,
path: write_path,
url,
size: 0,
}))),
GhacVersion::V2 => {
let uri = http::Uri::from_str(&url)
.map_err(new_http_uri_invalid_error)?
.into_parts();
let (Some(scheme), Some(authority), Some(pq)) =
(uri.scheme, uri.authority, uri.path_and_query)
else {
return Err(Error::new(
ErrorKind::Unexpected,
"ghac returns invalid signed url",
)
.with_context("url", &url));
};
let endpoint = format!("{scheme}://{authority}");
let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else {
return Err(Error::new(
ErrorKind::Unexpected,
"ghac returns invalid signed url that bucket or path is missing",
)
.with_context("url", &url));
};
let Some(query) = pq.query() else {
return Err(Error::new(
ErrorKind::Unexpected,
"ghac returns invalid signed url that sas is missing",
)
.with_context("url", &url));
};
let signer = Signer::new(
Context::new(),
StaticCredentialProvider::new_sas_token(query),
RequestSigner::new(),
);
let azure_core = Arc::new(AzblobCore {
info: {
let am = AccessorInfo::default();
am.set_scheme("azblob")
.set_root("/")
.set_name(container)
.set_native_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
read: true,
read_with_if_match: true,
read_with_if_none_match: true,
read_with_override_content_disposition: true,
read_with_if_modified_since: true,
read_with_if_unmodified_since: true,
write: true,
write_can_append: true,
write_can_empty: true,
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_if_not_exists: true,
write_with_if_none_match: true,
write_with_user_metadata: true,
copy: true,
list: true,
list_with_recursive: true,
shared: true,
..Default::default()
});
am.into()
},
container: container.to_string(),
root: "/".to_string(),
endpoint,
encryption_key: None,
encryption_key_sha256: None,
encryption_algorithm: None,
signer,
});
let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string());
let writer = oio::BlockWriter::new(core.info.clone(), w, 4);
Ok(GhacWriter(TwoWays::Two(GhacWriterV2 {
core,
writer,
path: write_path,
url,
size: 0,
})))
}
}
}
}
impl oio::Write for GhacWriter {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.0.write(bs)
}
fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.0.abort()
}
fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
self.0.close()
}
}
pub struct GhacWriterV1 {
core: Arc<GhacCore>,
path: String,
url: String,
size: u64,
}
impl oio::Write for GhacWriterV1 {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len() as u64;
let offset = self.size;
let resp = self.core.ghac_v1_write(&self.url, size, offset, bs).await?;
if !resp.status().is_success() {
return Err(parse_error(resp).with_operation("Backend::ghac_upload"));
}
self.size += size;
Ok(())
}
async fn abort(&mut self) -> Result<()> {
Ok(())
}
async fn close(&mut self) -> Result<Metadata> {
self.core
.ghac_finalize_upload(&self.path, &self.url, self.size)
.await?;
Ok(Metadata::default().with_content_length(self.size))
}
}
pub struct GhacWriterV2 {
core: Arc<GhacCore>,
writer: oio::BlockWriter<AzblobWriter>,
path: String,
url: String,
size: u64,
}
impl oio::Write for GhacWriterV2 {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len() as u64;
self.writer.write(bs).await?;
self.size += size;
Ok(())
}
async fn close(&mut self) -> Result<Metadata> {
self.writer.close().await?;
let _ = self
.core
.ghac_finalize_upload(&self.path, &self.url, self.size)
.await;
Ok(Metadata::default().with_content_length(self.size))
}
async fn abort(&mut self) -> Result<()> {
Ok(())
}
}