channel try
diff --git a/rust/experimental/server/Cargo.toml b/rust/experimental/server/Cargo.toml
index 860ad43..0b55654 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -67,7 +67,6 @@
log = "0.4.17"
env_logger = "0.10.0"
crossbeam = "0.8.2"
-crossbeam-channel = "0.5"
tempdir = "0.3.7"
async-trait = "0.1.68"
futures = "0.3"
@@ -98,6 +97,7 @@
cap = "0.1.2"
spin = "0.9.8"
opendal = { version = "0.44.0", features = ["services-fs"]}
+crossbeam-channel = "0.5.8"
[dependencies.hdfs-native]
version = "0.7.0"
diff --git a/rust/experimental/server/src/channel.rs b/rust/experimental/server/src/channel.rs
new file mode 100644
index 0000000..13d9994
--- /dev/null
+++ b/rust/experimental/server/src/channel.rs
@@ -0,0 +1,35 @@
+use std::sync::Arc;
+use crossbeam_channel::{Receiver, Sender, unbounded};
+use crate::app::{AppManager, WritingViewContext};
+use crate::error::WorkerError;
+use crate::metric::TOTAL_RECEIVED_DATA;
+use crate::runtime::manager::RuntimeManager;
+
+#[derive(Clone)]
+pub struct Channel {
+ app_manager: Arc<AppManager>,
+ runtime_manager: RuntimeManager,
+
+ sender: Sender<WritingViewContext>,
+ receiver: Receiver<WritingViewContext>,
+}
+
+impl Channel {
+ pub fn new(app_manager: Arc<AppManager>, runtime_manager: RuntimeManager) -> Self {
+ let (s, r) = unbounded();
+ Self {
+ app_manager,
+ runtime_manager,
+ sender: s,
+ receiver: r,
+ }
+ }
+
+ pub fn send(&self, ctx: WritingViewContext) -> anyhow::Result<i32, WorkerError> {
+ let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
+ TOTAL_RECEIVED_DATA.inc_by(len as u64);
+
+ self.sender.send(ctx).map_err(|_| WorkerError::WRITING_TO_CHANNEL_FAIL)?;
+ Ok(len)
+ }
+}
\ No newline at end of file
diff --git a/rust/experimental/server/src/error.rs b/rust/experimental/server/src/error.rs
index 7a70408..cfbe608 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -16,7 +16,6 @@
// under the License.
use anyhow::Error;
-
use log::error;
use poem::error::ParseQueryError;
use thiserror::Error;
@@ -63,6 +62,9 @@
#[error("Spill event has been retried exceed the max limit for app: {0}")]
SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String),
+
+ #[error("Failed to writing data to channel.")]
+ WRITING_TO_CHANNEL_FAIL,
}
impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/grpc.rs b/rust/experimental/server/src/grpc.rs
index 1ff4bd4..658b94f 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -48,6 +48,7 @@
};
use crate::util;
use tonic::{Request, Response, Status};
+use crate::channel::Channel;
/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
/// streams on the same connection.
@@ -77,11 +78,12 @@
pub struct DefaultShuffleServer {
app_manager_ref: AppManagerRef,
+ writing_channel: Channel,
}
impl DefaultShuffleServer {
- pub fn from(app_manager_ref: AppManagerRef) -> DefaultShuffleServer {
- DefaultShuffleServer { app_manager_ref }
+ pub fn from(app_manager_ref: AppManagerRef, writing_channel: Channel) -> DefaultShuffleServer {
+ DefaultShuffleServer { app_manager_ref, writing_channel }
}
}
@@ -217,10 +219,12 @@
};
let ctx = WritingViewContext::from(uid.clone(), blocks);
- let inserted = app
- .insert(ctx)
- .instrument_await(format!("insert data for app. uid: {:?}", &uid))
- .await;
+ let inserted = self.writing_channel.send(ctx);
+
+ // let inserted = app
+ // .insert(ctx)
+ // .instrument_await(format!("insert data for app. uid: {:?}", &uid))
+ // .await;
if inserted.is_err() {
let err = format!(
"Errors on putting data. app_id: {}, err: {:?}",
diff --git a/rust/experimental/server/src/lib.rs b/rust/experimental/server/src/lib.rs
index 1219285..54d3f18 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -31,6 +31,7 @@
pub mod signal;
pub mod store;
pub mod util;
+pub mod channel;
use crate::app::AppManager;
use crate::grpc::DefaultShuffleServer;
@@ -69,14 +70,16 @@
HTTP_SERVICE.start(runtime_manager.clone(), http_port);
let (tx, rx) = oneshot::channel::<()>();
+ let cloned_runtime_manager = runtime_manager.clone();
+ let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, config.clone());
+
+ let writing_channel = channel::Channel::new(app_manager_ref.clone(), runtime_manager.clone());
// implement server startup
- let cloned_runtime_manager = runtime_manager.clone();
runtime_manager.grpc_runtime.spawn(async move {
- let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, config.clone());
let rpc_port = config.grpc_port.unwrap_or(19999);
info!("Starting GRpc server with port:[{}] ......", rpc_port);
- let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
+ let shuffle_server = DefaultShuffleServer::from(app_manager_ref, writing_channel.clone());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port as u16);
let service = ShuffleServerServer::new(shuffle_server)
.max_decoding_message_size(usize::MAX)
diff --git a/rust/experimental/server/src/main.rs b/rust/experimental/server/src/main.rs
index f5e93c5..86c7ba9 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -62,6 +62,7 @@
pub mod signal;
pub mod store;
mod util;
+pub mod channel;
const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_SIZE";
const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
@@ -233,8 +234,10 @@
let available_cores = std::thread::available_parallelism()?;
debug!("GRpc service with parallelism: [{}]", &available_cores);
+ let writing_channel = channel::Channel::new(app_manager_ref.clone(), runtime_manager.clone());
+
for _ in 0..available_cores.into() {
- let shuffle_server = DefaultShuffleServer::from(app_manager_ref.clone());
+ let shuffle_server = DefaultShuffleServer::from(app_manager_ref.clone(), writing_channel.clone());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port as u16);
let service = ShuffleServerServer::new(shuffle_server)
.max_decoding_message_size(usize::MAX)
diff --git a/rust/experimental/server/src/store/memory.rs b/rust/experimental/server/src/store/memory.rs
index e93e4ae..fad27e7 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -256,12 +256,13 @@
async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> {
let uid = ctx.uid;
- let buffer = self.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer_guarded = buffer.lock();
-
- let blocks = ctx.data_blocks;
- let inserted_size = buffer_guarded.add(blocks)?;
- drop(buffer_guarded);
+ let inserted_size = {
+ let buffer = self.get_or_create_underlying_staging_buffer(uid);
+ let mut buffer_guarded = buffer.lock();
+ let blocks = ctx.data_blocks;
+ let inserted_size = buffer_guarded.add(blocks)?;
+ inserted_size
+ };
self.budget.allocated_to_used(inserted_size)?;