Add context capture and continued methods. (#29)

diff --git a/Cargo.toml b/Cargo.toml
index 5ed4d53..b1c1476 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -45,7 +45,7 @@
 prost = "0.10.4"
 prost-derive = "0.10.1"
 thiserror = "1.0.31"
-tokio = { version = "1.18.2", features = ["full"] }
+tokio = { version = "1.18.2", features = ["parking_lot"] }
 tonic = { version = "0.7.2", features = ["codegen"] }
 tracing = "0.1.35"
 uuid = { version = "1.1.0", features = ["serde", "v4"] }
@@ -54,6 +54,7 @@
 tonic-build = "0.7.2"
 
 [dev-dependencies]
+tokio = { version = "1.18.2", features = ["rt-multi-thread"] }
 tokio-stream = { version = "0.1.8", features = ["net"] }
 
 [[test]]
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 162046e..f4ff6e1 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -16,13 +16,35 @@
 # under the License.
 #
 segmentItems:
-- segmentSize: 1
+- segmentSize: gt 1
   segments:
   - segmentId: not null
     spans:
     - componentId: 11000
       endTime: gt 0
       isError: false
+      operationName: async-callback
+      parentSpanId: -1
+      peer: ''
+      refs:
+      - networkAddress: ''
+        parentEndpoint: async-job
+        parentService: producer
+        parentServiceInstance: node_0
+        parentSpanId: 2
+        parentTraceSegmentId: not null
+        refType: CrossThread
+        traceId: not null
+      skipAnalysis: false
+      spanId: 0
+      spanLayer: Http
+      spanType: Entry
+      startTime: gt 0
+  - segmentId: not null
+    spans:
+    - componentId: 11000
+      endTime: gt 0
+      isError: false
       operationName: /pong
       parentSpanId: 0
       peer: consumer:8082
@@ -34,6 +56,17 @@
     - componentId: 11000
       endTime: gt 0
       isError: false
+      operationName: async-job
+      parentSpanId: 0
+      peer: ''
+      skipAnalysis: false
+      spanId: 2
+      spanLayer: Unknown
+      spanType: Local
+      startTime: gt 0
+    - componentId: 11000
+      endTime: gt 0
+      isError: false
       operationName: /ping
       parentSpanId: -1
       peer: ''
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index ae2432b..5611d9f 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -50,6 +50,18 @@
 
         client.request(req).await.unwrap();
     }
+    {
+        let _span3 = context.create_local_span("async-job");
+        let snapshot = context.capture();
+
+        tokio::spawn(async move {
+            let mut context2 = tracer::create_trace_context();
+            let _span3 = context2.create_entry_span("async-callback");
+            context2.continued(snapshot);
+        })
+        .await
+        .unwrap();
+    }
     Ok(Response::new(Body::from("hoge")))
 }
 
diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs
index bad1fff..f7f321b 100644
--- a/examples/simple_trace_report.rs
+++ b/examples/simple_trace_report.rs
@@ -26,7 +26,7 @@
     {
         // Generate an Entry Span when a request is received.
         // An Entry Span is generated only once per context.
-        // You should assign a variable name to guard the span not be dropped immediately.
+        // Assign a variable name to guard the span not to be dropped immediately.
         let _span = ctx.create_entry_span("op1");
 
         // Something...
diff --git a/src/context/span.rs b/src/context/span.rs
index 6d0bf21..c6c5e18 100644
--- a/src/context/span.rs
+++ b/src/context/span.rs
@@ -37,7 +37,7 @@
 ///     {
 ///         // Generate an Entry Span when a request is received.
 ///         // An Entry Span is generated only once per context.
-///         // You should assign a variable name to guard the span not be dropped immediately.
+///         // Assign a variable name to guard the span not to be dropped immediately.
 ///         let _span = ctx.create_entry_span("op1");
 ///
 ///         // Something...
@@ -57,7 +57,7 @@
 ///     // Auto report ctx when dropped.
 /// }
 /// ```
-#[derive(Clone)]
+#[must_use = "assign a variable name to guard the span not be dropped immediately."]
 pub struct Span {
     index: usize,
     context: WeakTracingContext,
@@ -118,6 +118,8 @@
         self.context.upgrade().expect("Context has dropped")
     }
 
+    // Notice: Perhaps in the future, `RwLock` can be used instead of `Mutex`, so `with_*` can be nested.
+    // (Although I can't find the meaning of such use at present.)
     pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
         self.upgrade_context()
             .with_active_span_stack(|stack| f(&stack[self.index]))
diff --git a/src/context/trace_context.rs b/src/context/trace_context.rs
index 014c75b..e56d8a9 100644
--- a/src/context/trace_context.rs
+++ b/src/context/trace_context.rs
@@ -33,12 +33,12 @@
     mem::take,
     sync::{
         atomic::{AtomicI32, Ordering},
-        Arc, Mutex, Weak,
+        Arc, Mutex, RwLock, Weak,
     },
 };
 
 struct Inner {
-    trace_id: String,
+    trace_id: RwLock<String>,
     trace_segment_id: String,
     service: String,
     service_instance: String,
@@ -49,8 +49,7 @@
     primary_endpoint_name: Mutex<String>,
 }
 
-#[derive(Clone)]
-#[must_use = "You should call `create_entry_span` after `TracingContext` created."]
+#[must_use = "call `create_entry_span` after `TracingContext` created."]
 pub struct TracingContext {
     inner: Arc<Inner>,
     tracer: WeakTracer,
@@ -79,7 +78,7 @@
     ) -> Self {
         TracingContext {
             inner: Arc::new(Inner {
-                trace_id: RandomGenerator::generate(),
+                trace_id: RwLock::new(RandomGenerator::generate()),
                 trace_segment_id: RandomGenerator::generate(),
                 service: service_name.to_string(),
                 service_instance: instance_name.to_string(),
@@ -104,7 +103,7 @@
     ) -> Self {
         TracingContext {
             inner: Arc::new(Inner {
-                trace_id: context.parent_trace_id.clone(),
+                trace_id: RwLock::new(context.parent_trace_id.clone()),
                 trace_segment_id: RandomGenerator::generate(),
                 service: service_name.to_string(),
                 service_instance: instance_name.to_string(),
@@ -119,8 +118,16 @@
     }
 
     #[inline]
-    pub fn trace_id(&self) -> &str {
-        &self.inner.trace_id
+    pub fn trace_id(&self) -> String {
+        self.with_trace_id(ToString::to_string)
+    }
+
+    fn with_trace_id<T>(&self, f: impl FnOnce(&String) -> T) -> T {
+        f(&*self.inner.trace_id.try_read().expect(LOCK_MSG))
+    }
+
+    fn with_trace_id_mut<T>(&mut self, f: impl FnOnce(&mut String) -> T) -> T {
+        f(&mut *self.inner.trace_id.try_write().expect(LOCK_MSG))
     }
 
     #[inline]
@@ -182,8 +189,13 @@
         self.with_active_span_stack(|stack| stack.last().map(f))
     }
 
-    // TODO Using for capture and continued.
-    #[allow(dead_code)]
+    pub(crate) fn with_active_span_mut<T>(
+        &mut self,
+        f: impl FnOnce(&mut SpanObject) -> T,
+    ) -> Option<T> {
+        self.with_active_span_stack_mut(|stack| stack.last_mut().map(f))
+    }
+
     fn with_primary_endpoint_name<T>(&self, f: impl FnOnce(&String) -> T) -> T {
         f(&*self.inner.primary_endpoint_name.try_lock().expect(LOCK_MSG))
     }
@@ -209,7 +221,7 @@
         if let Some(segment_link) = &self.inner.segment_link {
             span.refs.push(SegmentReference {
                 ref_type: RefType::CrossProcess as i32,
-                trace_id: self.inner.trace_id.clone(),
+                trace_id: self.trace_id(),
                 parent_trace_segment_id: segment_link.parent_trace_segment_id.clone(),
                 parent_span_id: segment_link.parent_span_id,
                 parent_service: segment_link.parent_service.clone(),
@@ -273,6 +285,40 @@
         Span::new(index, self.downgrade())
     }
 
+    /// Capture a snapshot for cross-thread propagation.
+    pub fn capture(&self) -> ContextSnapshot {
+        ContextSnapshot {
+            trace_id: self.trace_id(),
+            trace_segment_id: self.trace_segment_id().to_owned(),
+            span_id: self.peek_active_span_id().unwrap_or(-1),
+            parent_endpoint: self.with_primary_endpoint_name(Clone::clone),
+        }
+    }
+
+    /// Build the reference between this segment and a cross-thread segment.
+    pub fn continued(&mut self, snapshot: ContextSnapshot) {
+        if snapshot.is_valid() {
+            self.with_trace_id_mut(|trace_id| *trace_id = snapshot.trace_id.clone());
+
+            let tracer = self.upgrade_tracer();
+
+            let segment_ref = SegmentReference {
+                ref_type: RefType::CrossThread as i32,
+                trace_id: snapshot.trace_id,
+                parent_trace_segment_id: snapshot.trace_segment_id,
+                parent_span_id: snapshot.span_id,
+                parent_service: tracer.service_name().to_owned(),
+                parent_service_instance: tracer.instance_name().to_owned(),
+                parent_endpoint: snapshot.parent_endpoint,
+                network_address_used_at_peer: Default::default(),
+            };
+
+            self.with_active_span_mut(|span| {
+                span.refs.push(segment_ref);
+            });
+        }
+    }
+
     /// Close span. We can't use closed span after finalize called.
     pub(crate) fn finalize_span(&mut self, index: usize) -> Result<(), ()> {
         let span = self.pop_active_span(index);
@@ -290,7 +336,7 @@
     ///
     /// Notice: The spans will taked, so this method shouldn't be called twice.
     pub(crate) fn convert_segment_object(&mut self) -> SegmentObject {
-        let trace_id = self.trace_id().to_owned();
+        let trace_id = self.trace_id();
         let trace_segment_id = self.trace_segment_id().to_owned();
         let service = self.service().to_owned();
         let service_instance = self.service_instance().to_owned();
@@ -368,12 +414,11 @@
     }
 }
 
+#[derive(Debug)]
 pub struct ContextSnapshot {
     trace_id: String,
     trace_segment_id: String,
     span_id: i32,
-    // TODO Using for capture and continued.
-    #[allow(dead_code)]
     parent_endpoint: String,
 }
 
diff --git a/src/reporter/log.rs b/src/reporter/log.rs
index b08ef3c..39fe7e0 100644
--- a/src/reporter/log.rs
+++ b/src/reporter/log.rs
@@ -16,7 +16,6 @@
 
 use super::Reporter;
 use crate::skywalking_proto::v3::SegmentObject;
-
 use std::collections::LinkedList;
 use tonic::async_trait;
 
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index 3d376de..0cd992d 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -27,9 +27,10 @@
     SpanType,
 };
 use std::collections::LinkedList;
-use std::future;
 use std::sync::Mutex;
 use std::{cell::Ref, sync::Arc};
+use std::{future, thread};
+use tokio::runtime::Handle;
 
 /// Serialize from A should equal Serialize from B
 #[allow(dead_code)]
@@ -237,9 +238,6 @@
 
             context2.with_spans(|spans| {
                 let span3 = spans.last().unwrap();
-                return;
-
-                let span3 = spans.last().unwrap();
                 assert_eq!(span3.span_id, 0);
                 assert_eq!(span3.parent_span_id, -1);
                 assert_eq!(span3.refs.len(), 1);
@@ -261,6 +259,53 @@
     }
 }
 
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn cross_threads_test() {
+    MockReporter::with_many(
+        |reporter| {
+            let tracer = Tracer::new("service", "instance", reporter);
+            let mut ctx1 = tracer.create_trace_context();
+            let _span1 = ctx1.create_entry_span("op1");
+            let _span2 = ctx1.create_local_span("op2");
+            let snapshot = ctx1.capture();
+
+            let tracer_ = tracer.clone();
+            thread::spawn(move || {
+                let mut ctx2 = tracer_.create_trace_context();
+                let _span3 = ctx2.create_entry_span("op3");
+                ctx2.continued(snapshot);
+            })
+            .join()
+            .unwrap();
+
+            tracer
+        },
+        |segments| {
+            let iter = segments.iter();
+            let first = iter.nth(0).unwrap();
+            let second = iter.nth(1).unwrap();
+
+            assert_eq!(first.trace_id, second.trace_id);
+            assert_eq!(first.spans.refs.len(), 1);
+            assert_eq!(
+                first.spans.refs[0],
+                SegmentReference {
+                    ref_type: RefType::CrossThread as i32,
+                    trace_id: second.trace_id.clone(),
+                    parent_trace_segment_id: second.trace_segment_id.clone(),
+                    parent_span_id: 1,
+                    parent_service: "service".to_owned(),
+                    parent_service_instance: "instance".to_owned(),
+                    parent_endpoint: "op2".to_owned(),
+                    ..Default::default()
+                }
+            );
+            assert_eq!(second.spans.len(), 2);
+        },
+    )
+    .await;
+}
+
 #[derive(Default, Clone)]
 struct MockReporter {
     segments: Arc<Mutex<LinkedList<SegmentObject>>>,
@@ -268,6 +313,13 @@
 
 impl MockReporter {
     async fn with(f1: impl FnOnce(MockReporter) -> Tracer, f2: impl FnOnce(&SegmentObject)) {
+        Self::with_many(f1, |segments| f2(&segments.front().unwrap())).await;
+    }
+
+    async fn with_many(
+        f1: impl FnOnce(MockReporter) -> Tracer,
+        f2: impl FnOnce(&LinkedList<SegmentObject>),
+    ) {
         let reporter = MockReporter::default();
 
         let tracer = f1(reporter.clone());
@@ -275,8 +327,7 @@
         tracer.reporting(future::ready(())).await.unwrap();
 
         let segments = reporter.segments.try_lock().unwrap();
-        let segment = segments.front().unwrap();
-        f2(segment);
+        f2(&*segments);
     }
 }