bug: write operation won't split payload into chunks following configuration (#6796)

* bug: write operation won't split payload into chunks following configuration

* try to fix error

* remove operator add unitest

* fix clippy

* Update core/src/types/context/write.rs

---------

Co-authored-by: Xuanwo <github@xuanwo.io>
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index cc6c772..11a8551 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -223,11 +223,16 @@
 
     struct MockWriter {
         buf: Arc<Mutex<Vec<u8>>>,
+        write_sizes: Arc<Mutex<Vec<usize>>>,
     }
 
     impl Write for MockWriter {
         async fn write(&mut self, bs: Buffer) -> Result<()> {
-            debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len());
+            let size = bs.len();
+            debug!("test_fuzz_exact_buf_writer: flush size: {}", size);
+
+            let mut write_sizes = self.write_sizes.lock().await;
+            write_sizes.push(size);
 
             let mut buf = self.buf.lock().await;
             buf.put(bs);
@@ -256,7 +261,14 @@
         rng.fill_bytes(&mut expected);
 
         let buf = Arc::new(Mutex::new(vec![]));
-        let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true);
+        let mut w = WriteGenerator::new(
+            Box::new(MockWriter {
+                buf: buf.clone(),
+                write_sizes: Arc::new(Mutex::new(vec![])),
+            }),
+            Some(10),
+            true,
+        );
 
         let mut bs = Bytes::from(expected.clone());
         while !bs.is_empty() {
@@ -284,7 +296,14 @@
             .try_init();
 
         let buf = Arc::new(Mutex::new(vec![]));
-        let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), false);
+        let mut w = WriteGenerator::new(
+            Box::new(MockWriter {
+                buf: buf.clone(),
+                write_sizes: Arc::new(Mutex::new(vec![])),
+            }),
+            Some(10),
+            false,
+        );
 
         let mut rng = thread_rng();
         let mut expected = vec![0; 15];
@@ -315,7 +334,14 @@
             .try_init();
 
         let buf = Arc::new(Mutex::new(vec![]));
-        let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), false);
+        let mut w = WriteGenerator::new(
+            Box::new(MockWriter {
+                buf: buf.clone(),
+                write_sizes: Arc::new(Mutex::new(vec![])),
+            }),
+            Some(10),
+            false,
+        );
 
         let mut rng = thread_rng();
         let mut expected = vec![];
@@ -358,7 +384,14 @@
             .try_init();
 
         let buf = Arc::new(Mutex::new(vec![]));
-        let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), false);
+        let mut w = WriteGenerator::new(
+            Box::new(MockWriter {
+                buf: buf.clone(),
+                write_sizes: Arc::new(Mutex::new(vec![])),
+            }),
+            Some(10),
+            false,
+        );
 
         let mut rng = thread_rng();
         let mut expected = vec![];
@@ -408,7 +441,10 @@
         let buf = Arc::new(Mutex::new(vec![]));
         let buffer_size = rng.gen_range(1..10);
         let mut writer = WriteGenerator::new(
-            Box::new(MockWriter { buf: buf.clone() }),
+            Box::new(MockWriter {
+                buf: buf.clone(),
+                write_sizes: Arc::new(Mutex::new(vec![])),
+            }),
             Some(buffer_size),
             true,
         );
@@ -438,4 +474,90 @@
         );
         Ok(())
     }
+
+    /// Test that when writing a large buffer in exact mode, it gets split into chunks.
+    ///
+    /// This test verifies that WriteGenerator correctly handles large buffers by
+    /// splitting them into chunks of the configured chunk size, rather than writing
+    /// everything at once.
+    #[tokio::test]
+    async fn test_exact_buf_writer_large_buffer_splits_into_chunks() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let chunk_size = 10;
+        let large_buffer_size = 25; // 2.5x chunk_size
+
+        let buf = Arc::new(Mutex::new(vec![]));
+        let write_sizes = Arc::new(Mutex::new(vec![]));
+        let mut writer = WriteGenerator::new(
+            Box::new(MockWriter {
+                buf: buf.clone(),
+                write_sizes: write_sizes.clone(),
+            }),
+            Some(chunk_size),
+            true, // exact mode
+        );
+
+        let mut rng = thread_rng();
+        let mut expected = vec![0; large_buffer_size];
+        rng.fill_bytes(&mut expected);
+
+        let bs = Bytes::from(expected.clone());
+        // In exact mode, a large buffer should be written in chunks.
+        // We need to call write multiple times until all data is written.
+        let mut remaining = bs.clone();
+        while !remaining.is_empty() {
+            let n = writer.write(remaining.clone().into()).await?;
+            remaining.advance(n);
+        }
+
+        writer.close().await?;
+
+        // Verify all data was written
+        let buf = buf.lock().await;
+        assert_eq!(buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&*buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+
+        // Verify that writes were split into chunks (except possibly the last one)
+        let write_sizes = write_sizes.lock().await;
+        // In exact mode, all writes except the last should be exactly chunk_size
+        // The last write might be smaller if there's a remainder
+        for (i, &size) in write_sizes.iter().enumerate() {
+            if i < write_sizes.len() - 1 {
+                // All writes except the last should be exactly chunk_size
+                assert_eq!(
+                    size, chunk_size,
+                    "Write {} should be exactly chunk_size {}, but was {}",
+                    i, chunk_size, size
+                );
+            } else {
+                // Last write should be <= chunk_size
+                assert!(
+                    size <= chunk_size,
+                    "Last write should be <= chunk_size {}, but was {}",
+                    chunk_size,
+                    size
+                );
+            }
+        }
+
+        // Verify we got the expected number of writes
+        let expected_writes = large_buffer_size.div_ceil(chunk_size);
+        assert_eq!(
+            write_sizes.len(),
+            expected_writes,
+            "Expected {} writes, but got {}",
+            expected_writes,
+            write_sizes.len()
+        );
+
+        Ok(())
+    }
 }
diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs
index 82c722e..347dece 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -849,6 +849,7 @@
         }
 
         let (args, opts) = opts.into();
+
         let context = WriteContext::new(acc, path, args, opts);
         let mut w = Writer::new(context).await?;
         w.write(bs).await?;