Enhance Trace Context machenism. (#20)
Add stack structure to make sure span ID and parent span ID are correctly linked
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 5a6570d..162046e 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -24,10 +24,10 @@
endTime: gt 0
isError: false
operationName: /pong
- parentSpanId: 1
+ parentSpanId: 0
peer: consumer:8082
skipAnalysis: false
- spanId: 2
+ spanId: 1
spanLayer: Http
spanType: Exit
startTime: gt 0
@@ -35,10 +35,10 @@
endTime: gt 0
isError: false
operationName: /ping
- parentSpanId: 0
+ parentSpanId: -1
peer: ''
skipAnalysis: false
- spanId: 1
+ spanId: 0
spanLayer: Http
spanType: Entry
startTime: gt 0
@@ -51,20 +51,20 @@
endTime: gt 0
isError: false
operationName: /pong
- parentSpanId: 0
+ parentSpanId: -1
peer: ''
refs:
- networkAddress: consumer:8082
parentEndpoint: /pong
parentService: producer
parentServiceInstance: node_0
- parentSpanId: 2
+ parentSpanId: 1
parentTraceSegmentId: not null
refType: CrossProcess
traceId: not null
skipAnalysis: false
- spanId: 1
+ spanId: 0
spanLayer: Http
spanType: Entry
startTime: gt 0
- serviceName: consumer
\ No newline at end of file
+ serviceName: consumer
diff --git a/src/context/propagation/context.rs b/src/context/propagation/context.rs
index c72f3aa..17a8e34 100644
--- a/src/context/propagation/context.rs
+++ b/src/context/propagation/context.rs
@@ -16,6 +16,7 @@
pub static SKYWALKING_HTTP_CONTEXT_HEADER_KEY: &str = "sw8";
+#[derive(Debug)]
pub struct PropagationContext {
/// It defines whether next span should be trace or not.
/// In SkyWalking, If `do_sample == true`, the span should be reported to
diff --git a/src/context/propagation/encoder.rs b/src/context/propagation/encoder.rs
index f928ece..c018e77 100644
--- a/src/context/propagation/encoder.rs
+++ b/src/context/propagation/encoder.rs
@@ -25,7 +25,7 @@
res += "1-";
res += format!("{}-", encode(context.trace_id.to_string())).as_str();
res += format!("{}-", encode(context.trace_segment_id.to_string())).as_str();
- res += format!("{}-", context.next_span_id).as_str();
+ res += format!("{}-", context.peek_active_span_id().unwrap_or(0)).as_str();
res += format!("{}-", encode(context.service.as_str())).as_str();
res += format!("{}-", encode(context.service_instance.as_str())).as_str();
res += format!("{}-", encode(endpoint)).as_str();
diff --git a/src/context/system_time.rs b/src/context/system_time.rs
index 69f2423..c5e29cc 100644
--- a/src/context/system_time.rs
+++ b/src/context/system_time.rs
@@ -25,6 +25,6 @@
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
- .as_secs() as i64
+ .as_millis() as i64
}
}
diff --git a/src/context/trace_context.rs b/src/context/trace_context.rs
index 63b4af4..87911d1 100644
--- a/src/context/trace_context.rs
+++ b/src/context/trace_context.rs
@@ -21,6 +21,7 @@
KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject,
SpanType,
};
+use std::collections::LinkedList;
use std::fmt::Formatter;
use std::sync::Arc;
@@ -70,10 +71,12 @@
}
}
-static SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
+const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
impl Span {
+ #[allow(clippy::too_many_arguments)]
pub fn new(
+ span_id: i32,
parent_span_id: i32,
operation_name: String,
remote_peer: String,
@@ -83,7 +86,7 @@
time_fetcher: Arc<dyn TimeFetcher + Sync + Send>,
) -> Self {
let span_internal = SpanObject {
- span_id: parent_span_id + 1,
+ span_id,
parent_span_id,
start_time: time_fetcher.get(),
end_time: 0, // not set
@@ -114,6 +117,10 @@
&self.span_internal
}
+ pub fn span_object_mut(&mut self) -> &mut SpanObject {
+ &mut self.span_internal
+ }
+
/// Add logs to the span.
pub fn add_log(&mut self, message: Vec<(&str, &str)>) {
let log = Log {
@@ -155,6 +162,7 @@
pub spans: Vec<Box<Span>>,
time_fetcher: Arc<dyn TimeFetcher + Sync + Send>,
segment_link: Option<PropagationContext>,
+ active_span_id_stack: LinkedList<i32>,
}
impl std::fmt::Debug for TracingContext {
@@ -192,6 +200,7 @@
time_fetcher,
spans: Vec::new(),
segment_link: None,
+ active_span_id_stack: LinkedList::new(),
}
}
@@ -227,6 +236,7 @@
time_fetcher,
spans: Vec::new(),
segment_link: Some(context),
+ active_span_id_stack: LinkedList::new(),
}
}
@@ -257,8 +267,11 @@
return Err("entry span have already exist.");
}
+ let parent_span_id = self.peek_active_span_id().unwrap_or(-1);
+
let mut span = Box::new(Span::new(
self.next_span_id,
+ parent_span_id,
operation_name.to_string(),
String::default(),
SpanType::Entry,
@@ -300,6 +313,8 @@
});
}
self.next_span_id += 1;
+ self.active_span_id_stack
+ .push_back(span.span_internal.span_id);
Ok(span)
}
@@ -335,8 +350,11 @@
return Err("entry span must be existed.");
}
+ let parent_span_id = self.peek_active_span_id().unwrap_or(-1);
+
let span = Box::new(Span::new(
self.next_span_id,
+ parent_span_id,
operation_name.to_string(),
remote_peer.to_string(),
SpanType::Exit,
@@ -345,6 +363,8 @@
self.time_fetcher.clone(),
));
self.next_span_id += 1;
+ self.active_span_id_stack
+ .push_back(span.span_internal.span_id);
Ok(span)
}
@@ -352,10 +372,7 @@
pub fn finalize_span(&mut self, mut span: Box<Span>) {
span.close();
self.spans.push(span);
- }
-
- pub fn finalize_span_for_test(&self, span: &mut Box<Span>) {
- span.close();
+ self.active_span_id_stack.pop_back();
}
/// It converts tracing context into segment object.
@@ -376,4 +393,8 @@
is_size_limited: false,
}
}
+
+ pub(crate) fn peek_active_span_id(&self) -> Option<i32> {
+ self.active_span_id_stack.back().copied()
+ }
}
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index af1eb21..a06a12d 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -16,22 +16,16 @@
#![allow(unused_imports)]
-pub mod skywalking_proto {
- pub mod v3 {
- tonic::include_proto!("skywalking.v3");
- }
-}
-
use prost::Message;
-use skywalking_proto::v3::{
- KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject,
- SpanType,
-};
use skywalking_rust::common::time::TimeFetcher;
use skywalking_rust::context::propagation::context::PropagationContext;
use skywalking_rust::context::propagation::decoder::decode_propagation;
use skywalking_rust::context::propagation::encoder::encode_propagation;
use skywalking_rust::context::trace_context::TracingContext;
+use skywalking_rust::skywalking_proto::v3::{
+ KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject,
+ SpanType,
+};
use std::{cell::Ref, sync::Arc};
/// Serialize from A should equal Serialize from B
@@ -98,9 +92,68 @@
.collect();
span1.add_tag(tags[0]);
+ {
+ let span2 = context.create_entry_span("op2");
+ assert!(span2.is_err());
+ }
+
+ {
+ let span3 = context.create_exit_span("op3", "example.com/test").unwrap();
+ context.finalize_span(span3);
+
+ let span3_expected = SpanObject {
+ span_id: 1,
+ parent_span_id: 0,
+ start_time: 100,
+ end_time: 100,
+ refs: Vec::<SegmentReference>::new(),
+ operation_name: "op3".to_string(),
+ peer: "example.com/test".to_string(),
+ span_type: SpanType::Exit as i32,
+ span_layer: SpanLayer::Http as i32,
+ component_id: 11000,
+ is_error: false,
+ tags: Vec::<KeyStringValuePair>::new(),
+ logs: Vec::<Log>::new(),
+ skip_analysis: false,
+ };
+ assert_eq!(*context.spans.last().unwrap().span_object(), span3_expected);
+ }
+
+ {
+ let span4 = context.create_exit_span("op3", "example.com/test").unwrap();
+
+ {
+ let span5 = context.create_exit_span("op4", "example.com/test").unwrap();
+ context.finalize_span(span5);
+
+ let span5_expected = SpanObject {
+ span_id: 3,
+ parent_span_id: 2,
+ start_time: 100,
+ end_time: 100,
+ refs: Vec::<SegmentReference>::new(),
+ operation_name: "op4".to_string(),
+ peer: "example.com/test".to_string(),
+ span_type: SpanType::Exit as i32,
+ span_layer: SpanLayer::Http as i32,
+ component_id: 11000,
+ is_error: false,
+ tags: Vec::<KeyStringValuePair>::new(),
+ logs: Vec::<Log>::new(),
+ skip_analysis: false,
+ };
+ assert_eq!(*context.spans.last().unwrap().span_object(), span5_expected);
+ }
+
+ context.finalize_span(span4);
+ }
+
+ context.finalize_span(span1);
+
let span1_expected = SpanObject {
- span_id: 1,
- parent_span_id: 0,
+ span_id: 0,
+ parent_span_id: -1,
start_time: 100,
end_time: 100,
refs: Vec::<SegmentReference>::new(),
@@ -114,35 +167,7 @@
logs: expected_log,
skip_analysis: false,
};
- context.finalize_span_for_test(&mut span1);
- check_serialize_equivalent(span1.span_object(), &span1_expected);
- }
-
- {
- let span2 = context.create_entry_span("op2");
- assert!(span2.is_err());
- }
-
- {
- let mut span3 = context.create_exit_span("op3", "example.com/test").unwrap();
- let span3_expected = SpanObject {
- span_id: 2,
- parent_span_id: 1,
- start_time: 100,
- end_time: 100,
- refs: Vec::<SegmentReference>::new(),
- operation_name: "op3".to_string(),
- peer: "example.com/test".to_string(),
- span_type: SpanType::Exit as i32,
- span_layer: SpanLayer::Http as i32,
- component_id: 11000,
- is_error: false,
- tags: Vec::<KeyStringValuePair>::new(),
- logs: Vec::<Log>::new(),
- skip_analysis: false,
- };
- context.finalize_span_for_test(&mut span3);
- check_serialize_equivalent(span3.span_object(), &span3_expected);
+ assert_eq!(*context.spans.last().unwrap().span_object(), span1_expected);
}
let segment = context.convert_segment_object();
@@ -181,40 +206,46 @@
assert_eq!(context1.service, "service");
assert_eq!(context1.service_instance, "instance");
- let mut span1 = context1.create_entry_span("op1").unwrap();
- context1.finalize_span_for_test(&mut span1);
+ let span1 = context1.create_entry_span("op1").unwrap();
+ {
+ let span2 = context1.create_exit_span("op2", "remote_peer").unwrap();
- let mut span2 = context1.create_exit_span("op2", "remote_peer").unwrap();
- context1.finalize_span_for_test(&mut span2);
+ {
+ let enc_prop = encode_propagation(&context1, "endpoint", "address");
+ let dec_prop = decode_propagation(&enc_prop).unwrap();
- let enc_prop = encode_propagation(&context1, "endpoint", "address");
- let dec_prop = decode_propagation(&enc_prop).unwrap();
+ let time_fetcher2 = MockTimeFetcher {};
+ let mut context2 = TracingContext::from_propagation_context_internal(
+ Arc::new(time_fetcher2),
+ "service2",
+ "instance2",
+ dec_prop,
+ );
- let time_fetcher2 = MockTimeFetcher {};
- let mut context2 = TracingContext::from_propagation_context_internal(
- Arc::new(time_fetcher2),
- "service2",
- "instance2",
- dec_prop,
- );
+ let span3 = context2.create_entry_span("op2").unwrap();
+ context2.finalize_span(span3);
- let mut span3 = context2.create_entry_span("op2").unwrap();
- context2.finalize_span_for_test(&mut span3);
+ let span3 = context2.spans.last().unwrap();
+ assert_eq!(span3.span_object().span_id, 0);
+ assert_eq!(span3.span_object().parent_span_id, -1);
+ assert_eq!(span3.span_object().refs.len(), 1);
- assert_eq!(span3.span_object().span_id, 1);
- assert_eq!(span3.span_object().parent_span_id, 0);
- assert_eq!(span3.span_object().refs.len(), 1);
+ let expected_ref = SegmentReference {
+ ref_type: RefType::CrossProcess as i32,
+ trace_id: context2.trace_id,
+ parent_trace_segment_id: context1.trace_segment_id.clone(),
+ parent_span_id: 1,
+ parent_service: context1.service.clone(),
+ parent_service_instance: context1.service_instance.clone(),
+ parent_endpoint: "endpoint".to_string(),
+ network_address_used_at_peer: "address".to_string(),
+ };
- let expected_ref = SegmentReference {
- ref_type: RefType::CrossProcess as i32,
- trace_id: context2.trace_id,
- parent_trace_segment_id: context1.trace_segment_id,
- parent_span_id: context1.next_span_id,
- parent_service: context1.service,
- parent_service_instance: context1.service_instance,
- parent_endpoint: "endpoint".to_string(),
- network_address_used_at_peer: "address".to_string(),
- };
+ check_serialize_equivalent(&expected_ref, &span3.span_object().refs[0]);
+ }
- check_serialize_equivalent(&expected_ref, &span3.span_object().refs[0]);
+ context1.finalize_span(span2);
+ }
+
+ context1.finalize_span(span1);
}