bug: write operation won't split payload into chunks following configuration (#6796)
* bug: write operation won't split payload into chunks following configuration
* try to fix error
* remove operator add unitest
* fix clippy
* Update core/src/types/context/write.rs
---------
Co-authored-by: Xuanwo <github@xuanwo.io>
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index cc6c772..11a8551 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -223,11 +223,16 @@
struct MockWriter {
buf: Arc<Mutex<Vec<u8>>>,
+ write_sizes: Arc<Mutex<Vec<usize>>>,
}
impl Write for MockWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
- debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len());
+ let size = bs.len();
+ debug!("test_fuzz_exact_buf_writer: flush size: {}", size);
+
+ let mut write_sizes = self.write_sizes.lock().await;
+ write_sizes.push(size);
let mut buf = self.buf.lock().await;
buf.put(bs);
@@ -256,7 +261,14 @@
rng.fill_bytes(&mut expected);
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ true,
+ );
let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
@@ -284,7 +296,14 @@
.try_init();
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), false);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ false,
+ );
let mut rng = thread_rng();
let mut expected = vec![0; 15];
@@ -315,7 +334,14 @@
.try_init();
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), false);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ false,
+ );
let mut rng = thread_rng();
let mut expected = vec![];
@@ -358,7 +384,14 @@
.try_init();
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), false);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ false,
+ );
let mut rng = thread_rng();
let mut expected = vec![];
@@ -408,7 +441,10 @@
let buf = Arc::new(Mutex::new(vec![]));
let buffer_size = rng.gen_range(1..10);
let mut writer = WriteGenerator::new(
- Box::new(MockWriter { buf: buf.clone() }),
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
Some(buffer_size),
true,
);
@@ -438,4 +474,90 @@
);
Ok(())
}
+
+ /// Test that when writing a large buffer in exact mode, it gets split into chunks.
+ ///
+ /// This test verifies that WriteGenerator correctly handles large buffers by
+ /// splitting them into chunks of the configured chunk size, rather than writing
+ /// everything at once.
+ #[tokio::test]
+ async fn test_exact_buf_writer_large_buffer_splits_into_chunks() -> Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
+ let chunk_size = 10;
+ let large_buffer_size = 25; // 2.5x chunk_size
+
+ let buf = Arc::new(Mutex::new(vec![]));
+ let write_sizes = Arc::new(Mutex::new(vec![]));
+ let mut writer = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: write_sizes.clone(),
+ }),
+ Some(chunk_size),
+ true, // exact mode
+ );
+
+ let mut rng = thread_rng();
+ let mut expected = vec![0; large_buffer_size];
+ rng.fill_bytes(&mut expected);
+
+ let bs = Bytes::from(expected.clone());
+ // In exact mode, a large buffer should be written in chunks.
+ // We need to call write multiple times until all data is written.
+ let mut remaining = bs.clone();
+ while !remaining.is_empty() {
+ let n = writer.write(remaining.clone().into()).await?;
+ remaining.advance(n);
+ }
+
+ writer.close().await?;
+
+ // Verify all data was written
+ let buf = buf.lock().await;
+ assert_eq!(buf.len(), expected.len());
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&*buf)),
+ format!("{:x}", Sha256::digest(&expected))
+ );
+
+ // Verify that writes were split into chunks (except possibly the last one)
+ let write_sizes = write_sizes.lock().await;
+ // In exact mode, all writes except the last should be exactly chunk_size
+ // The last write might be smaller if there's a remainder
+ for (i, &size) in write_sizes.iter().enumerate() {
+ if i < write_sizes.len() - 1 {
+ // All writes except the last should be exactly chunk_size
+ assert_eq!(
+ size, chunk_size,
+ "Write {} should be exactly chunk_size {}, but was {}",
+ i, chunk_size, size
+ );
+ } else {
+ // Last write should be <= chunk_size
+ assert!(
+ size <= chunk_size,
+ "Last write should be <= chunk_size {}, but was {}",
+ chunk_size,
+ size
+ );
+ }
+ }
+
+ // Verify we got the expected number of writes
+ let expected_writes = large_buffer_size.div_ceil(chunk_size);
+ assert_eq!(
+ write_sizes.len(),
+ expected_writes,
+ "Expected {} writes, but got {}",
+ expected_writes,
+ write_sizes.len()
+ );
+
+ Ok(())
+ }
}
diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs
index 82c722e..347dece 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -849,6 +849,7 @@
}
let (args, opts) = opts.into();
+
let context = WriteContext::new(acc, path, args, opts);
let mut w = Writer::new(context).await?;
w.write(bs).await?;