Rpc context implementation #58 (#83)
* Rpc context implementation #58
* 修复建议补充测试场景
* 修复建议补充测试场景
* advice: use tracing to replace with println
* advice: use tracing to replace with println
Co-authored-by: qunwei <qunwei@prevailcloud.com>
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 7b38a28..bdde2ea 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -34,3 +34,6 @@
flate2 = "1.0"
dubbo-config = {path = "../config", version = "0.2.0"}
+
+#对象存储
+state = { version = "0.5", features = ["tls"] }
\ No newline at end of file
diff --git a/dubbo/src/context.rs b/dubbo/src/context.rs
index 2cf3692..c438bcd 100644
--- a/dubbo/src/context.rs
+++ b/dubbo/src/context.rs
@@ -15,33 +15,14 @@
* limitations under the License.
*/
-use core::cell::RefCell;
-use std::any::Any;
use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
+use std::thread;
-///
-/// ```rust
-/// use std::collections::HashMap;
-/// use std::sync::Arc;
-///
-/// let mut map = HashMap::<String, SafetyValue>::new();
-/// map.insert("key1".into(), Arc::new("data-1"));
-///
-/// // get a typed value from SafetyValue
-/// let value = map
-/// .get("key1")
-/// .and_then(|f| f.downcast_ref::<String>())
-/// .unwrap();
-///
-/// assert_eq!(value, "data-1");
-/// ```
-type SafetyValue = Arc<dyn Any + Sync + Send>;
+use serde_json::Value;
+use state::Container;
-thread_local! {
- static SERVICE_CONTEXT: RefCell<RpcContext> = RefCell::new(RpcContext::default());
-}
+pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();
///
/// All environment information of during the current call will put into the context
@@ -53,37 +34,38 @@
/// After B call C,the RpcContext record the information of B call C
///
#[derive(Clone, Default)]
-pub struct RpcContext {
- pub attachments: HashMap<String, SafetyValue>,
- // TODO
+pub struct RpcContext {}
+
+pub trait Context {
+ fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>>;
}
-impl RpcContext {
- pub fn current() -> Self {
- get_current(|ctx| ctx.clone())
- }
+impl Context for RpcContext {
+ fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>> {
+ let local = APPLICATION_CONTEXT.try_get_local::<Arc<Mutex<HashMap<String, Value>>>>();
- pub fn clear(&mut self) {
- self.attachments.clear();
- }
-}
+ tracing::debug!("{:?} - {:?}", thread::current().id(), local);
-fn get_current<F: FnMut(&RpcContext) -> T, T>(mut f: F) -> T {
- SERVICE_CONTEXT.try_with(|ctx| f(&ctx.borrow())).unwrap()
-}
-
-impl fmt::Debug for RpcContext {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Context")
- .field("attachments", &self.attachments)
- .finish()
+ match local {
+ Some(attachment) => Some(attachment.clone()),
+ None => {
+ let attachment = HashMap::<String, Value>::new();
+ let mutex = Arc::new(Mutex::new(attachment));
+ let mutex_clone = Arc::clone(&mutex);
+ APPLICATION_CONTEXT.set_local(move || {
+ return Arc::clone(&mutex_clone);
+ });
+ Some(Arc::clone(&mutex))
+ }
+ }
}
}
#[cfg(test)]
mod tests {
+ use tokio::time;
+
use super::*;
- use std::thread::sleep;
use std::time::Duration;
#[test]
@@ -96,25 +78,26 @@
let mut handles = Vec::with_capacity(10);
- for i in 0..10 {
+ for i in 0..=10 {
handles.push(rt.spawn(async move {
- let mut attachments = RpcContext::current().attachments;
- attachments.insert("key1".into(), Arc::new(format!("data-{i}")));
+ if let Some(attachments) = RpcContext::get_attachments() {
+ let mut attachments = attachments.lock().unwrap();
+ attachments.insert("key1".into(), Value::from(format!("data-{i}")));
- if i == 10 {
- attachments.insert("key2".into(), Arc::new(2));
- assert_eq!(attachments.len(), 2);
- } else {
- assert_eq!(attachments.len(), 1);
- }
+ assert!(attachments.len() > 0);
+ };
+
+ time::sleep(Duration::from_millis(1000)).await;
+
+ if let Some(attachments) = RpcContext::get_attachments() {
+ let attachments = attachments.lock().unwrap();
+ assert!(attachments.len() > 0);
+ };
}));
}
- sleep(Duration::from_millis(500));
-
for handle in handles {
rt.block_on(handle).unwrap();
}
- assert_eq!(RpcContext::current().attachments.len(), 0);
}
}