blob: f2f755ee181e39667697ab884f2cce35fcd8d900 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use super::core::{BucketOperation, HfCore, LfsFile};
use opendal_core::raw::*;
use opendal_core::*;
use xet::error::XetError;
use xet::xet_session::{Sha256Policy, XetFileInfo, XetStreamUpload, XetUploadCommit};
/// Writer that always uses the XET protocol for uploads.
pub struct HfWriter {
core: Arc<HfCore>,
path: String,
xet_commit: XetUploadCommit,
xet_stream: XetStreamUpload,
// RetryLayer can replay Writer::close after a temporary commit failure,
// so writer finalization must be re-entrant and reuse the first finish result.
finished_info: Option<XetFileInfo>,
}
impl HfWriter {
/// Create a new writer for the given path.
///
/// All uploads go through the XET protocol regardless of file size.
/// An XET upload commit and stream are created eagerly so that
/// subsequent [`write`](oio::Write::write) calls can stream data
/// directly into the XET CAS.
pub async fn try_new(core: Arc<HfCore>, path: String) -> Result<Self> {
let commit = core.xet_upload_commit().await?;
let stream = commit
.upload_stream(None, Sha256Policy::Compute)
.await
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "failed to start xet upload stream")
.set_source(err)
})?;
Ok(HfWriter {
core,
path,
xet_commit: commit,
xet_stream: stream,
finished_info: None,
})
}
/// Finalize the XET upload and register the file with HF.
///
/// 1. Commit data to XET CAS (may already be done by `stream.finish()`)
/// 2. Register the file via git LFS commit or bucket batch API
///
/// Retries on transient CAS propagation delays.
async fn commit(&mut self, file_info: &XetFileInfo) -> Result<Metadata> {
// Finalize the XET CAS upload. May return AlreadyCompleted
// if stream.finish() already committed internally.
match self.xet_commit.commit().await {
Ok(_) | Err(XetError::AlreadyCompleted) => {}
Err(e) => {
return Err(
Error::new(ErrorKind::Unexpected, "failed to commit xet upload").set_source(e),
);
}
}
let content_length = file_info
.file_size()
.expect("file_size must be set after finish()");
let meta = Metadata::default().with_content_length(content_length);
let repo_path = self.core.repo_path(&self.path);
if self.core.repo.is_bucket() {
let xet_hash = file_info.hash().to_string();
self.core
.commit_bucket(vec![BucketOperation::AddFile {
path: repo_path,
xet_hash,
}])
.await?;
} else {
let sha256 = file_info.sha256().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "xet upload returned no sha256")
})?;
let lfs_file = LfsFile {
path: repo_path,
oid: sha256.to_string(),
algo: "sha256".to_string(),
size: content_length,
};
self.core
.commit_git(vec![], vec![lfs_file], vec![], vec![])
.await?;
}
Ok(meta)
}
}
impl oio::Write for HfWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.xet_stream.write(bs.to_bytes()).await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "failed to write xet chunk").set_source(err)
})
}
async fn close(&mut self) -> Result<Metadata> {
let file_info = match self.finished_info.clone() {
Some(file_info) => file_info,
None => match self.xet_stream.finish().await {
Ok(file_meta) => {
// Cache the finalized metadata so a retried close can skip
// finish() and only replay the registration step.
let file_info = file_meta.xet_info;
self.finished_info = Some(file_info.clone());
file_info
}
Err(XetError::AlreadyCompleted) => {
return Err(Error::new(
ErrorKind::Unexpected,
"xet upload finished before metadata was cached",
));
}
Err(err) => {
return Err(
Error::new(ErrorKind::Unexpected, "failed to finish xet upload")
.set_source(err),
);
}
},
};
self.commit(&file_info).await
}
async fn abort(&mut self) -> Result<()> {
self.xet_stream.abort();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::backend::test_utils::testing_bucket_operator;
/// Bucket writes always use XET and commit via NDJSON batch — this code
/// path is not covered by behavior tests unless run with a bucket config.
/// Requires HF_OPENDAL_BUCKET and HF_OPENDAL_TOKEN.
#[tokio::test]
#[ignore]
async fn test_bucket_write_roundtrip() {
let op = testing_bucket_operator();
let path = "tests/bucket-roundtrip.bin";
let content = b"Binary content for bucket roundtrip test";
op.write(path, content.as_slice())
.await
.expect("bucket write should succeed");
let data = op.read(path).await.expect("read should succeed");
assert_eq!(data.to_bytes().as_ref(), content);
let _ = op.delete(path).await;
}
}