blob: d716beef572bc9f2ae89a383be08fae73d3c4a4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{
collections::HashMap,
sync::{Arc, Mutex},
thread,
};
use crate::logger::tracing::debug;
use serde_json::Value;
use state::Container;
pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();
///
/// All environment information of during the current call will put into the context
/// on the filter composing process,and all configuration information will convert the parameters of URL instance.
///
/// RpcContext is a temporary status recorder of [thread_local],when accept RPC request or send RPC request,
/// The RpcContext will be changed.Such as: A call B and B call C.
/// On B machine,before B call C,the RpcContext will record the information of A call B.
/// After B call C,the RpcContext record the information of B call C
///
#[derive(Clone, Default)]
pub struct RpcContext {}
pub trait Context {
fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>>;
}
impl Context for RpcContext {
fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>> {
let local = APPLICATION_CONTEXT.try_get_local::<Arc<Mutex<HashMap<String, Value>>>>();
debug!("{:?} - {:?}", thread::current().id(), local);
match local {
Some(attachment) => Some(attachment.clone()),
None => {
let attachment = HashMap::<String, Value>::new();
let mutex = Arc::new(Mutex::new(attachment));
let mutex_clone = Arc::clone(&mutex);
APPLICATION_CONTEXT.set_local(move || {
return Arc::clone(&mutex_clone);
});
Some(Arc::clone(&mutex))
}
}
}
}
#[cfg(test)]
mod tests {
use tokio::time;
use super::*;
use std::time::Duration;
#[test]
fn context_with_thread_local() {
let rt = tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(2)
.enable_all()
.build()
.unwrap();
let mut handles = Vec::with_capacity(10);
for i in 0..=10 {
handles.push(rt.spawn(async move {
if let Some(attachments) = RpcContext::get_attachments() {
let mut attachments = attachments.lock().unwrap();
attachments.insert("key1".into(), Value::from(format!("data-{i}")));
assert!(attachments.len() > 0);
};
time::sleep(Duration::from_millis(1000)).await;
if let Some(attachments) = RpcContext::get_attachments() {
let attachments = attachments.lock().unwrap();
assert!(attachments.len() > 0);
};
}));
}
for handle in handles {
rt.block_on(handle).unwrap();
}
}
}