channel try
diff --git a/rust/experimental/server/Cargo.toml b/rust/experimental/server/Cargo.toml
index 860ad43..0b55654 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -67,7 +67,6 @@
 log = "0.4.17"
 env_logger = "0.10.0"
 crossbeam = "0.8.2"
-crossbeam-channel = "0.5"
 tempdir = "0.3.7"
 async-trait = "0.1.68"
 futures = "0.3"
@@ -98,6 +97,7 @@
 cap = "0.1.2"
 spin = "0.9.8"
 opendal = { version = "0.44.0", features = ["services-fs"]}
+crossbeam-channel = "0.5.8"
 
 [dependencies.hdfs-native]
 version = "0.7.0"
diff --git a/rust/experimental/server/src/channel.rs b/rust/experimental/server/src/channel.rs
new file mode 100644
index 0000000..13d9994
--- /dev/null
+++ b/rust/experimental/server/src/channel.rs
@@ -0,0 +1,35 @@
+use std::sync::Arc;
+use crossbeam_channel::{Receiver, Sender, unbounded};
+use crate::app::{AppManager, WritingViewContext};
+use crate::error::WorkerError;
+use crate::metric::TOTAL_RECEIVED_DATA;
+use crate::runtime::manager::RuntimeManager;
+
+#[derive(Clone)]
+pub struct Channel {
+    app_manager: Arc<AppManager>,
+    runtime_manager: RuntimeManager,
+
+    sender: Sender<WritingViewContext>,
+    receiver: Receiver<WritingViewContext>,
+}
+
+impl Channel {
+    pub fn new(app_manager: Arc<AppManager>, runtime_manager: RuntimeManager) -> Self {
+        let (s, r) = unbounded();
+        Self {
+            app_manager,
+            runtime_manager,
+            sender: s,
+            receiver: r,
+        }
+    }
+
+    pub fn send(&self, ctx: WritingViewContext) -> anyhow::Result<i32, WorkerError> {
+        let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
+        TOTAL_RECEIVED_DATA.inc_by(len as u64);
+
+        self.sender.send(ctx).map_err(|_| WorkerError::WRITING_TO_CHANNEL_FAIL)?;
+        Ok(len)
+    }
+}
\ No newline at end of file
diff --git a/rust/experimental/server/src/error.rs b/rust/experimental/server/src/error.rs
index 7a70408..cfbe608 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use anyhow::Error;
-
 use log::error;
 use poem::error::ParseQueryError;
 use thiserror::Error;
@@ -63,6 +62,9 @@
 
     #[error("Spill event has been retried exceed the max limit for app: {0}")]
     SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String),
+
+    #[error("Failed to writing data to channel.")]
+    WRITING_TO_CHANNEL_FAIL,
 }
 
 impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/grpc.rs b/rust/experimental/server/src/grpc.rs
index 1ff4bd4..658b94f 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -48,6 +48,7 @@
 };
 use crate::util;
 use tonic::{Request, Response, Status};
+use crate::channel::Channel;
 
 /// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
 /// streams on the same connection.
@@ -77,11 +78,12 @@
 
 pub struct DefaultShuffleServer {
     app_manager_ref: AppManagerRef,
+    writing_channel: Channel,
 }
 
 impl DefaultShuffleServer {
-    pub fn from(app_manager_ref: AppManagerRef) -> DefaultShuffleServer {
-        DefaultShuffleServer { app_manager_ref }
+    pub fn from(app_manager_ref: AppManagerRef, writing_channel: Channel) -> DefaultShuffleServer {
+        DefaultShuffleServer { app_manager_ref, writing_channel }
     }
 }
 
@@ -217,10 +219,12 @@
             };
             let ctx = WritingViewContext::from(uid.clone(), blocks);
 
-            let inserted = app
-                .insert(ctx)
-                .instrument_await(format!("insert data for app. uid: {:?}", &uid))
-                .await;
+            let inserted = self.writing_channel.send(ctx);
+
+            // let inserted = app
+            //     .insert(ctx)
+            //     .instrument_await(format!("insert data for app. uid: {:?}", &uid))
+            //     .await;
             if inserted.is_err() {
                 let err = format!(
                     "Errors on putting data. app_id: {}, err: {:?}",
diff --git a/rust/experimental/server/src/lib.rs b/rust/experimental/server/src/lib.rs
index 1219285..54d3f18 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -31,6 +31,7 @@
 pub mod signal;
 pub mod store;
 pub mod util;
+pub mod channel;
 
 use crate::app::AppManager;
 use crate::grpc::DefaultShuffleServer;
@@ -69,14 +70,16 @@
     HTTP_SERVICE.start(runtime_manager.clone(), http_port);
 
     let (tx, rx) = oneshot::channel::<()>();
+    let cloned_runtime_manager = runtime_manager.clone();
+    let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, config.clone());
+
+    let writing_channel = channel::Channel::new(app_manager_ref.clone(), runtime_manager.clone());
 
     // implement server startup
-    let cloned_runtime_manager = runtime_manager.clone();
     runtime_manager.grpc_runtime.spawn(async move {
-        let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, config.clone());
         let rpc_port = config.grpc_port.unwrap_or(19999);
         info!("Starting GRpc server with port:[{}] ......", rpc_port);
-        let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
+        let shuffle_server = DefaultShuffleServer::from(app_manager_ref, writing_channel.clone());
         let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port as u16);
         let service = ShuffleServerServer::new(shuffle_server)
             .max_decoding_message_size(usize::MAX)
diff --git a/rust/experimental/server/src/main.rs b/rust/experimental/server/src/main.rs
index f5e93c5..86c7ba9 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -62,6 +62,7 @@
 pub mod signal;
 pub mod store;
 mod util;
+pub mod channel;
 
 const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_SIZE";
 const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
@@ -233,8 +234,10 @@
     let available_cores = std::thread::available_parallelism()?;
     debug!("GRpc service with parallelism: [{}]", &available_cores);
 
+    let writing_channel = channel::Channel::new(app_manager_ref.clone(), runtime_manager.clone());
+
     for _ in 0..available_cores.into() {
-        let shuffle_server = DefaultShuffleServer::from(app_manager_ref.clone());
+        let shuffle_server = DefaultShuffleServer::from(app_manager_ref.clone(), writing_channel.clone());
         let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port as u16);
         let service = ShuffleServerServer::new(shuffle_server)
             .max_decoding_message_size(usize::MAX)
diff --git a/rust/experimental/server/src/store/memory.rs b/rust/experimental/server/src/store/memory.rs
index e93e4ae..fad27e7 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -256,12 +256,13 @@
 
     async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> {
         let uid = ctx.uid;
-        let buffer = self.get_or_create_underlying_staging_buffer(uid.clone());
-        let mut buffer_guarded = buffer.lock();
-
-        let blocks = ctx.data_blocks;
-        let inserted_size = buffer_guarded.add(blocks)?;
-        drop(buffer_guarded);
+        let inserted_size = {
+            let buffer = self.get_or_create_underlying_staging_buffer(uid);
+            let mut buffer_guarded = buffer.lock();
+            let blocks = ctx.data_blocks;
+            let inserted_size = buffer_guarded.add(blocks)?;
+            inserted_size
+        };
 
         self.budget.allocated_to_used(inserted_size)?;