| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| //! TracingContext is the context of the tracing process. Span should only be |
| //! created through context, and be archived into the context after the span |
| //! finished. |
| |
| use crate::{ |
| common::{ |
| random_generator::RandomGenerator, |
| system_time::{fetch_time, TimePeriod}, |
| wait_group::WaitGroup, |
| }, |
| error::LOCK_MSG, |
| proto::v3::{RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject, SpanType}, |
| trace::{ |
| propagation::context::PropagationContext, |
| span::{AbstractSpan, Span}, |
| tracer::{Tracer, WeakTracer}, |
| }, |
| }; |
| use parking_lot::{ |
| MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, |
| }; |
| use std::{ |
| fmt::Formatter, |
| mem::take, |
| sync::{ |
| atomic::{AtomicUsize, Ordering}, |
| Arc, |
| }, |
| }; |
| |
| /// The span uid is to identify the [Span] for crate. |
| pub(crate) type SpanUid = usize; |
| |
| pub(crate) struct ActiveSpan { |
| uid: SpanUid, |
| span_id: i32, |
| /// For [TracingContext::continued] used. |
| r#ref: Option<SegmentReference>, |
| } |
| |
| impl ActiveSpan { |
| fn new(uid: SpanUid, span_id: i32) -> Self { |
| Self { |
| uid, |
| span_id, |
| r#ref: None, |
| } |
| } |
| |
| #[inline] |
| pub(crate) fn uid(&self) -> SpanUid { |
| self.uid |
| } |
| } |
| |
| pub(crate) struct FinalizeSpan { |
| uid: SpanUid, |
| /// When the span is [AsyncSpan] and unfinished, it is None. |
| obj: Option<SpanObject>, |
| /// For [TracingContext::continued] used. |
| r#ref: Option<SegmentReference>, |
| } |
| |
| impl FinalizeSpan { |
| pub(crate) fn new( |
| uid: usize, |
| obj: Option<SpanObject>, |
| r#ref: Option<SegmentReference>, |
| ) -> Self { |
| Self { uid, obj, r#ref } |
| } |
| } |
| |
| #[derive(Default)] |
| pub(crate) struct SpanStack { |
| pub(crate) finalized: RwLock<Vec<FinalizeSpan>>, |
| pub(crate) active: RwLock<Vec<ActiveSpan>>, |
| } |
| |
| impl SpanStack { |
| pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<FinalizeSpan>> { |
| self.finalized.try_read().expect(LOCK_MSG) |
| } |
| |
| pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_, Vec<FinalizeSpan>> { |
| self.finalized.try_write().expect(LOCK_MSG) |
| } |
| |
| pub(crate) fn active(&self) -> RwLockReadGuard<'_, Vec<ActiveSpan>> { |
| self.active.try_read().expect(LOCK_MSG) |
| } |
| |
| pub(crate) fn active_mut(&self) -> RwLockWriteGuard<'_, Vec<ActiveSpan>> { |
| self.active.try_write().expect(LOCK_MSG) |
| } |
| |
| fn pop_active(&self, uid: SpanUid) -> Option<ActiveSpan> { |
| let mut stack = self.active_mut(); |
| if stack |
| .last() |
| .map(|span| span.uid() == uid) |
| .unwrap_or_default() |
| { |
| stack.pop() |
| } else { |
| None |
| } |
| } |
| |
| /// Close span. We can't use closed span after finalize called. |
| pub(crate) fn finalize_span(&self, uid: SpanUid, obj: Option<SpanObject>) { |
| let Some(active_span) = self.pop_active(uid) else { |
| panic!("Finalize span isn't the active span"); |
| }; |
| |
| let finalize_span = match obj { |
| Some(mut obj) => { |
| obj.end_time = fetch_time(TimePeriod::End); |
| if let Some(r#ref) = active_span.r#ref { |
| obj.refs.push(r#ref); |
| } |
| FinalizeSpan::new(uid, Some(obj), None) |
| } |
| None => FinalizeSpan::new(uid, None, active_span.r#ref), |
| }; |
| |
| self.finalized_mut().push(finalize_span); |
| } |
| |
| /// Close async span, fill the span object. |
| pub(crate) fn finalize_async_span(&self, uid: SpanUid, mut obj: SpanObject) { |
| for finalize_span in &mut *self.finalized_mut() { |
| if finalize_span.uid == uid { |
| obj.end_time = fetch_time(TimePeriod::End); |
| if let Some(r#ref) = take(&mut finalize_span.r#ref) { |
| obj.refs.push(r#ref); |
| } |
| finalize_span.obj = Some(obj); |
| return; |
| } |
| } |
| |
| unreachable!() |
| } |
| } |
| |
| /// TracingContext is the context of the tracing process. Span should only be |
| /// created through context, and be archived into the context after the span |
| /// finished. |
| #[must_use = "call `create_entry_span` after `TracingContext` created."] |
| pub struct TracingContext { |
| trace_id: String, |
| trace_segment_id: String, |
| service: String, |
| service_instance: String, |
| next_span_id: i32, |
| span_stack: Arc<SpanStack>, |
| primary_endpoint_name: String, |
| span_uid_generator: AtomicUsize, |
| wg: WaitGroup, |
| tracer: WeakTracer, |
| } |
| |
| impl std::fmt::Debug for TracingContext { |
| fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("TracingContext") |
| .field("trace_id", &self.trace_id) |
| .field("trace_segment_id", &self.trace_segment_id) |
| .field("service", &self.service) |
| .field("service_instance", &self.service_instance) |
| .field("next_span_id", &self.next_span_id) |
| .finish() |
| } |
| } |
| |
| impl TracingContext { |
| /// Generate a new trace context. |
| pub(crate) fn new( |
| service_name: impl Into<String>, |
| instance_name: impl Into<String>, |
| tracer: WeakTracer, |
| ) -> Self { |
| TracingContext { |
| trace_id: RandomGenerator::generate(), |
| trace_segment_id: RandomGenerator::generate(), |
| service: service_name.into(), |
| service_instance: instance_name.into(), |
| next_span_id: Default::default(), |
| span_stack: Default::default(), |
| primary_endpoint_name: Default::default(), |
| span_uid_generator: AtomicUsize::new(0), |
| wg: Default::default(), |
| tracer, |
| } |
| } |
| |
| /// Get trace id. |
| #[inline] |
| pub fn trace_id(&self) -> &str { |
| &self.trace_id |
| } |
| |
| /// Get trace segment id. |
| #[inline] |
| pub fn trace_segment_id(&self) -> &str { |
| &self.trace_segment_id |
| } |
| |
| /// Get service name. |
| #[inline] |
| pub fn service(&self) -> &str { |
| &self.service |
| } |
| |
| /// Get service instance. |
| #[inline] |
| pub fn service_instance(&self) -> &str { |
| &self.service_instance |
| } |
| |
| fn next_span_id(&self) -> i32 { |
| self.next_span_id |
| } |
| |
| #[inline] |
| fn inc_next_span_id(&mut self) -> i32 { |
| let span_id = self.next_span_id; |
| self.next_span_id += 1; |
| span_id |
| } |
| |
| /// The span uid is to identify the [Span] for crate. |
| fn generate_span_uid(&self) -> SpanUid { |
| self.span_uid_generator.fetch_add(1, Ordering::SeqCst) |
| } |
| |
| /// Clone the last finalized span. |
| #[doc(hidden)] |
| pub fn last_span(&self) -> Option<SpanObject> { |
| let spans = &*self.span_stack.finalized(); |
| spans.iter().rev().find_map(|span| span.obj.clone()) |
| } |
| |
| fn finalize_spans_mut(&mut self) -> RwLockWriteGuard<'_, Vec<FinalizeSpan>> { |
| self.span_stack.finalized.try_write().expect(LOCK_MSG) |
| } |
| |
| pub(crate) fn active_span_stack(&self) -> RwLockReadGuard<'_, Vec<ActiveSpan>> { |
| self.span_stack.active() |
| } |
| |
| pub(crate) fn active_span_stack_mut(&mut self) -> RwLockWriteGuard<'_, Vec<ActiveSpan>> { |
| self.span_stack.active_mut() |
| } |
| |
| pub(crate) fn active_span(&self) -> Option<MappedRwLockReadGuard<'_, ActiveSpan>> { |
| RwLockReadGuard::try_map(self.active_span_stack(), |stack| stack.last()).ok() |
| } |
| |
| pub(crate) fn active_span_mut(&mut self) -> Option<MappedRwLockWriteGuard<'_, ActiveSpan>> { |
| RwLockWriteGuard::try_map(self.active_span_stack_mut(), |stack| stack.last_mut()).ok() |
| } |
| |
| /// Create a new entry span, which is an initiator of collection of spans. |
| /// This should be called by invocation of the function which is triggered |
| /// by external service. |
| /// |
| /// Typically called when no context has |
| /// been propagated and a new trace is to be started. |
| pub fn create_entry_span(&mut self, operation_name: &str) -> Span { |
| let span = Span::new_obj( |
| self.inc_next_span_id(), |
| self.peek_active_span_id().unwrap_or(-1), |
| operation_name.to_string(), |
| String::default(), |
| SpanType::Entry, |
| SpanLayer::Http, |
| false, |
| ); |
| |
| let index = self.push_active_span(&span); |
| Span::new(index, span, self.wg.clone(), self.span_stack.clone()) |
| } |
| |
| /// Create a new entry span, which is an initiator of collection of spans. |
| /// This should be called by invocation of the function which is triggered |
| /// by external service. |
| /// |
| /// They should be propagated on `sw8` header in HTTP request with encoded |
| /// form. You can retrieve decoded context with |
| /// `skywalking::context::propagation::encoder::encode_propagation` |
| pub fn create_entry_span_with_propagation( |
| &mut self, |
| operation_name: &str, |
| propagation: &PropagationContext, |
| ) -> Span { |
| let mut span = self.create_entry_span(operation_name); |
| self.trace_id = propagation.parent_trace_id.clone(); |
| span.span_object_mut().refs.push(SegmentReference { |
| ref_type: RefType::CrossProcess as i32, |
| trace_id: self.trace_id().to_owned(), |
| parent_trace_segment_id: propagation.parent_trace_segment_id.clone(), |
| parent_span_id: propagation.parent_span_id, |
| parent_service: propagation.parent_service.clone(), |
| parent_service_instance: propagation.parent_service_instance.clone(), |
| parent_endpoint: propagation.destination_endpoint.clone(), |
| network_address_used_at_peer: propagation.destination_address.clone(), |
| }); |
| span |
| } |
| |
| /// Create a new exit span, which will be created when tracing context will |
| /// generate new span for function invocation. |
| /// |
| /// Currently, this SDK supports RPC call. So we must set `remote_peer`. |
| /// |
| /// # Panics |
| /// |
| /// Panic if entry span not existed. |
| #[inline] |
| pub fn create_exit_span(&mut self, operation_name: &str, remote_peer: &str) -> Span { |
| self.create_common_span( |
| operation_name, |
| remote_peer, |
| SpanType::Exit, |
| self.peek_active_span_id().unwrap_or(-1), |
| ) |
| } |
| |
| /// Create a new local span. |
| /// |
| /// # Panics |
| /// |
| /// Panic if entry span not existed. |
| #[inline] |
| pub fn create_local_span(&mut self, operation_name: &str) -> Span { |
| self.create_common_span( |
| operation_name, |
| "", |
| SpanType::Local, |
| self.peek_active_span_id().unwrap_or(-1), |
| ) |
| } |
| |
| /// create exit or local span common logic. |
| fn create_common_span( |
| &mut self, |
| operation_name: &str, |
| remote_peer: &str, |
| span_type: SpanType, |
| parent_span_id: i32, |
| ) -> Span { |
| if self.next_span_id() == 0 { |
| panic!("entry span must be existed."); |
| } |
| |
| let span = Span::new_obj( |
| self.inc_next_span_id(), |
| parent_span_id, |
| operation_name.to_string(), |
| remote_peer.to_string(), |
| span_type, |
| SpanLayer::Unknown, |
| false, |
| ); |
| |
| let uid = self.push_active_span(&span); |
| Span::new(uid, span, self.wg.clone(), self.span_stack.clone()) |
| } |
| |
| /// Capture a snapshot for cross-thread propagation. |
| pub fn capture(&self) -> ContextSnapshot { |
| ContextSnapshot { |
| trace_id: self.trace_id().to_owned(), |
| trace_segment_id: self.trace_segment_id().to_owned(), |
| span_id: self.peek_active_span_id().unwrap_or(-1), |
| parent_endpoint: self.primary_endpoint_name.clone(), |
| } |
| } |
| |
| /// Build the reference between this segment and a cross-thread segment. |
| pub fn continued(&mut self, snapshot: ContextSnapshot) { |
| if snapshot.is_valid() { |
| self.trace_id = snapshot.trace_id.clone(); |
| |
| let tracer = self.upgrade_tracer(); |
| |
| let segment_ref = SegmentReference { |
| ref_type: RefType::CrossThread as i32, |
| trace_id: snapshot.trace_id, |
| parent_trace_segment_id: snapshot.trace_segment_id, |
| parent_span_id: snapshot.span_id, |
| parent_service: tracer.service_name().to_owned(), |
| parent_service_instance: tracer.instance_name().to_owned(), |
| parent_endpoint: snapshot.parent_endpoint, |
| network_address_used_at_peer: Default::default(), |
| }; |
| |
| if let Some(mut span) = self.active_span_mut() { |
| span.r#ref = Some(segment_ref); |
| } |
| } |
| } |
| |
| /// Wait all async span dropped which, created by [Span::prepare_for_async]. |
| pub fn wait(self) { |
| self.wg.clone().wait(); |
| } |
| |
| /// It converts tracing context into segment object. |
| /// This conversion should be done before sending segments into OAP. |
| /// |
| /// Notice: The spans will be taken, so this method shouldn't be called |
| /// twice. |
| pub(crate) fn convert_to_segment_object(&mut self) -> SegmentObject { |
| let trace_id = self.trace_id().to_owned(); |
| let trace_segment_id = self.trace_segment_id().to_owned(); |
| let service = self.service().to_owned(); |
| let service_instance = self.service_instance().to_owned(); |
| let spans = take(&mut *self.finalize_spans_mut()); |
| |
| let spans = spans |
| .into_iter() |
| .map(|span| span.obj.expect("Some async span haven't finished")) |
| .collect(); |
| |
| SegmentObject { |
| trace_id, |
| trace_segment_id, |
| spans, |
| service, |
| service_instance, |
| is_size_limited: false, |
| } |
| } |
| |
| pub(crate) fn peek_active_span_id(&self) -> Option<i32> { |
| self.active_span().map(|span| span.span_id) |
| } |
| |
| fn push_active_span(&mut self, span: &SpanObject) -> SpanUid { |
| let uid = self.generate_span_uid(); |
| |
| self.primary_endpoint_name = span.operation_name.clone(); |
| let mut stack = self.active_span_stack_mut(); |
| stack.push(ActiveSpan::new(uid, span.span_id)); |
| |
| uid |
| } |
| |
| fn upgrade_tracer(&self) -> Tracer { |
| self.tracer.upgrade().expect("Tracer has dropped") |
| } |
| } |
| |
| impl Drop for TracingContext { |
| /// Convert to segment object, and send to tracer for reporting. |
| /// |
| /// # Panics |
| /// |
| /// Panic if tracer is dropped. |
| fn drop(&mut self) { |
| self.upgrade_tracer().finalize_context(self) |
| } |
| } |
| |
| /// Cross threads context snapshot. |
| #[derive(Debug)] |
| pub struct ContextSnapshot { |
| trace_id: String, |
| trace_segment_id: String, |
| span_id: i32, |
| parent_endpoint: String, |
| } |
| |
| impl ContextSnapshot { |
| /// Check if the snapshot is created from current context. |
| pub fn is_from_current(&self, context: &TracingContext) -> bool { |
| !self.trace_segment_id.is_empty() && self.trace_segment_id == context.trace_segment_id() |
| } |
| |
| /// Check if the snapshot is valid. |
| pub fn is_valid(&self) -> bool { |
| !self.trace_segment_id.is_empty() && self.span_id > -1 && !self.trace_id.is_empty() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| trait AssertSend: Send {} |
| |
| impl AssertSend for TracingContext {} |
| } |