Add context capture and continued methods. (#29)
diff --git a/Cargo.toml b/Cargo.toml
index 5ed4d53..b1c1476 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -45,7 +45,7 @@
prost = "0.10.4"
prost-derive = "0.10.1"
thiserror = "1.0.31"
-tokio = { version = "1.18.2", features = ["full"] }
+tokio = { version = "1.18.2", features = ["parking_lot"] }
tonic = { version = "0.7.2", features = ["codegen"] }
tracing = "0.1.35"
uuid = { version = "1.1.0", features = ["serde", "v4"] }
@@ -54,6 +54,7 @@
tonic-build = "0.7.2"
[dev-dependencies]
+tokio = { version = "1.18.2", features = ["rt-multi-thread"] }
tokio-stream = { version = "0.1.8", features = ["net"] }
[[test]]
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 162046e..f4ff6e1 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -16,13 +16,35 @@
# under the License.
#
segmentItems:
-- segmentSize: 1
+- segmentSize: gt 1
segments:
- segmentId: not null
spans:
- componentId: 11000
endTime: gt 0
isError: false
+ operationName: async-callback
+ parentSpanId: -1
+ peer: ''
+ refs:
+ - networkAddress: ''
+ parentEndpoint: async-job
+ parentService: producer
+ parentServiceInstance: node_0
+ parentSpanId: 2
+ parentTraceSegmentId: not null
+ refType: CrossThread
+ traceId: not null
+ skipAnalysis: false
+ spanId: 0
+ spanLayer: Http
+ spanType: Entry
+ startTime: gt 0
+ - segmentId: not null
+ spans:
+ - componentId: 11000
+ endTime: gt 0
+ isError: false
operationName: /pong
parentSpanId: 0
peer: consumer:8082
@@ -34,6 +56,17 @@
- componentId: 11000
endTime: gt 0
isError: false
+ operationName: async-job
+ parentSpanId: 0
+ peer: ''
+ skipAnalysis: false
+ spanId: 2
+ spanLayer: Unknown
+ spanType: Local
+ startTime: gt 0
+ - componentId: 11000
+ endTime: gt 0
+ isError: false
operationName: /ping
parentSpanId: -1
peer: ''
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index ae2432b..5611d9f 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -50,6 +50,18 @@
client.request(req).await.unwrap();
}
+ {
+ let _span3 = context.create_local_span("async-job");
+ let snapshot = context.capture();
+
+ tokio::spawn(async move {
+ let mut context2 = tracer::create_trace_context();
+ let _span3 = context2.create_entry_span("async-callback");
+ context2.continued(snapshot);
+ })
+ .await
+ .unwrap();
+ }
Ok(Response::new(Body::from("hoge")))
}
diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs
index bad1fff..f7f321b 100644
--- a/examples/simple_trace_report.rs
+++ b/examples/simple_trace_report.rs
@@ -26,7 +26,7 @@
{
// Generate an Entry Span when a request is received.
// An Entry Span is generated only once per context.
- // You should assign a variable name to guard the span not be dropped immediately.
+ // Assign a variable name to guard the span not to be dropped immediately.
let _span = ctx.create_entry_span("op1");
// Something...
diff --git a/src/context/span.rs b/src/context/span.rs
index 6d0bf21..c6c5e18 100644
--- a/src/context/span.rs
+++ b/src/context/span.rs
@@ -37,7 +37,7 @@
/// {
/// // Generate an Entry Span when a request is received.
/// // An Entry Span is generated only once per context.
-/// // You should assign a variable name to guard the span not be dropped immediately.
+/// // Assign a variable name to guard the span not to be dropped immediately.
/// let _span = ctx.create_entry_span("op1");
///
/// // Something...
@@ -57,7 +57,7 @@
/// // Auto report ctx when dropped.
/// }
/// ```
-#[derive(Clone)]
+#[must_use = "assign a variable name to guard the span not be dropped immediately."]
pub struct Span {
index: usize,
context: WeakTracingContext,
@@ -118,6 +118,8 @@
self.context.upgrade().expect("Context has dropped")
}
+ // Notice: Perhaps in the future, `RwLock` can be used instead of `Mutex`, so `with_*` can be nested.
+ // (Although I can't find the meaning of such use at present.)
pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
self.upgrade_context()
.with_active_span_stack(|stack| f(&stack[self.index]))
diff --git a/src/context/trace_context.rs b/src/context/trace_context.rs
index 014c75b..e56d8a9 100644
--- a/src/context/trace_context.rs
+++ b/src/context/trace_context.rs
@@ -33,12 +33,12 @@
mem::take,
sync::{
atomic::{AtomicI32, Ordering},
- Arc, Mutex, Weak,
+ Arc, Mutex, RwLock, Weak,
},
};
struct Inner {
- trace_id: String,
+ trace_id: RwLock<String>,
trace_segment_id: String,
service: String,
service_instance: String,
@@ -49,8 +49,7 @@
primary_endpoint_name: Mutex<String>,
}
-#[derive(Clone)]
-#[must_use = "You should call `create_entry_span` after `TracingContext` created."]
+#[must_use = "call `create_entry_span` after `TracingContext` created."]
pub struct TracingContext {
inner: Arc<Inner>,
tracer: WeakTracer,
@@ -79,7 +78,7 @@
) -> Self {
TracingContext {
inner: Arc::new(Inner {
- trace_id: RandomGenerator::generate(),
+ trace_id: RwLock::new(RandomGenerator::generate()),
trace_segment_id: RandomGenerator::generate(),
service: service_name.to_string(),
service_instance: instance_name.to_string(),
@@ -104,7 +103,7 @@
) -> Self {
TracingContext {
inner: Arc::new(Inner {
- trace_id: context.parent_trace_id.clone(),
+ trace_id: RwLock::new(context.parent_trace_id.clone()),
trace_segment_id: RandomGenerator::generate(),
service: service_name.to_string(),
service_instance: instance_name.to_string(),
@@ -119,8 +118,16 @@
}
#[inline]
- pub fn trace_id(&self) -> &str {
- &self.inner.trace_id
+ pub fn trace_id(&self) -> String {
+ self.with_trace_id(ToString::to_string)
+ }
+
+ fn with_trace_id<T>(&self, f: impl FnOnce(&String) -> T) -> T {
+ f(&*self.inner.trace_id.try_read().expect(LOCK_MSG))
+ }
+
+ fn with_trace_id_mut<T>(&mut self, f: impl FnOnce(&mut String) -> T) -> T {
+ f(&mut *self.inner.trace_id.try_write().expect(LOCK_MSG))
}
#[inline]
@@ -182,8 +189,13 @@
self.with_active_span_stack(|stack| stack.last().map(f))
}
- // TODO Using for capture and continued.
- #[allow(dead_code)]
+ pub(crate) fn with_active_span_mut<T>(
+ &mut self,
+ f: impl FnOnce(&mut SpanObject) -> T,
+ ) -> Option<T> {
+ self.with_active_span_stack_mut(|stack| stack.last_mut().map(f))
+ }
+
fn with_primary_endpoint_name<T>(&self, f: impl FnOnce(&String) -> T) -> T {
f(&*self.inner.primary_endpoint_name.try_lock().expect(LOCK_MSG))
}
@@ -209,7 +221,7 @@
if let Some(segment_link) = &self.inner.segment_link {
span.refs.push(SegmentReference {
ref_type: RefType::CrossProcess as i32,
- trace_id: self.inner.trace_id.clone(),
+ trace_id: self.trace_id(),
parent_trace_segment_id: segment_link.parent_trace_segment_id.clone(),
parent_span_id: segment_link.parent_span_id,
parent_service: segment_link.parent_service.clone(),
@@ -273,6 +285,40 @@
Span::new(index, self.downgrade())
}
+ /// Capture a snapshot for cross-thread propagation.
+ pub fn capture(&self) -> ContextSnapshot {
+ ContextSnapshot {
+ trace_id: self.trace_id(),
+ trace_segment_id: self.trace_segment_id().to_owned(),
+ span_id: self.peek_active_span_id().unwrap_or(-1),
+ parent_endpoint: self.with_primary_endpoint_name(Clone::clone),
+ }
+ }
+
+ /// Build the reference between this segment and a cross-thread segment.
+ pub fn continued(&mut self, snapshot: ContextSnapshot) {
+ if snapshot.is_valid() {
+ self.with_trace_id_mut(|trace_id| *trace_id = snapshot.trace_id.clone());
+
+ let tracer = self.upgrade_tracer();
+
+ let segment_ref = SegmentReference {
+ ref_type: RefType::CrossThread as i32,
+ trace_id: snapshot.trace_id,
+ parent_trace_segment_id: snapshot.trace_segment_id,
+ parent_span_id: snapshot.span_id,
+ parent_service: tracer.service_name().to_owned(),
+ parent_service_instance: tracer.instance_name().to_owned(),
+ parent_endpoint: snapshot.parent_endpoint,
+ network_address_used_at_peer: Default::default(),
+ };
+
+ self.with_active_span_mut(|span| {
+ span.refs.push(segment_ref);
+ });
+ }
+ }
+
/// Close span. We can't use closed span after finalize called.
pub(crate) fn finalize_span(&mut self, index: usize) -> Result<(), ()> {
let span = self.pop_active_span(index);
@@ -290,7 +336,7 @@
///
/// Notice: The spans will taked, so this method shouldn't be called twice.
pub(crate) fn convert_segment_object(&mut self) -> SegmentObject {
- let trace_id = self.trace_id().to_owned();
+ let trace_id = self.trace_id();
let trace_segment_id = self.trace_segment_id().to_owned();
let service = self.service().to_owned();
let service_instance = self.service_instance().to_owned();
@@ -368,12 +414,11 @@
}
}
+#[derive(Debug)]
pub struct ContextSnapshot {
trace_id: String,
trace_segment_id: String,
span_id: i32,
- // TODO Using for capture and continued.
- #[allow(dead_code)]
parent_endpoint: String,
}
diff --git a/src/reporter/log.rs b/src/reporter/log.rs
index b08ef3c..39fe7e0 100644
--- a/src/reporter/log.rs
+++ b/src/reporter/log.rs
@@ -16,7 +16,6 @@
use super::Reporter;
use crate::skywalking_proto::v3::SegmentObject;
-
use std::collections::LinkedList;
use tonic::async_trait;
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index 3d376de..0cd992d 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -27,9 +27,10 @@
SpanType,
};
use std::collections::LinkedList;
-use std::future;
use std::sync::Mutex;
use std::{cell::Ref, sync::Arc};
+use std::{future, thread};
+use tokio::runtime::Handle;
/// Serialize from A should equal Serialize from B
#[allow(dead_code)]
@@ -237,9 +238,6 @@
context2.with_spans(|spans| {
let span3 = spans.last().unwrap();
- return;
-
- let span3 = spans.last().unwrap();
assert_eq!(span3.span_id, 0);
assert_eq!(span3.parent_span_id, -1);
assert_eq!(span3.refs.len(), 1);
@@ -261,6 +259,53 @@
}
}
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn cross_threads_test() {
+ MockReporter::with_many(
+ |reporter| {
+ let tracer = Tracer::new("service", "instance", reporter);
+ let mut ctx1 = tracer.create_trace_context();
+ let _span1 = ctx1.create_entry_span("op1");
+ let _span2 = ctx1.create_local_span("op2");
+ let snapshot = ctx1.capture();
+
+ let tracer_ = tracer.clone();
+ thread::spawn(move || {
+ let mut ctx2 = tracer_.create_trace_context();
+ let _span3 = ctx2.create_entry_span("op3");
+ ctx2.continued(snapshot);
+ })
+ .join()
+ .unwrap();
+
+ tracer
+ },
+ |segments| {
+ let iter = segments.iter();
+ let first = iter.nth(0).unwrap();
+ let second = iter.nth(1).unwrap();
+
+ assert_eq!(first.trace_id, second.trace_id);
+ assert_eq!(first.spans.refs.len(), 1);
+ assert_eq!(
+ first.spans.refs[0],
+ SegmentReference {
+ ref_type: RefType::CrossThread as i32,
+ trace_id: second.trace_id.clone(),
+ parent_trace_segment_id: second.trace_segment_id.clone(),
+ parent_span_id: 1,
+ parent_service: "service".to_owned(),
+ parent_service_instance: "instance".to_owned(),
+ parent_endpoint: "op2".to_owned(),
+ ..Default::default()
+ }
+ );
+ assert_eq!(second.spans.len(), 2);
+ },
+ )
+ .await;
+}
+
#[derive(Default, Clone)]
struct MockReporter {
segments: Arc<Mutex<LinkedList<SegmentObject>>>,
@@ -268,6 +313,13 @@
impl MockReporter {
async fn with(f1: impl FnOnce(MockReporter) -> Tracer, f2: impl FnOnce(&SegmentObject)) {
+ Self::with_many(f1, |segments| f2(&segments.front().unwrap())).await;
+ }
+
+ async fn with_many(
+ f1: impl FnOnce(MockReporter) -> Tracer,
+ f2: impl FnOnce(&LinkedList<SegmentObject>),
+ ) {
let reporter = MockReporter::default();
let tracer = f1(reporter.clone());
@@ -275,8 +327,7 @@
tracer.reporting(future::ready(())).await.unwrap();
let segments = reporter.segments.try_lock().unwrap();
- let segment = segments.front().unwrap();
- f2(segment);
+ f2(&*segments);
}
}