blob: 218383ccd676f69bdb49a08dbcc80afe47ea0053 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
use crate::context::trace_context::TracingContext;
use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient;
use crate::skywalking_proto::v3::SegmentObject;
use std::error::Error;
use tokio::sync::mpsc;
use tonic::transport::Channel;
pub type ReporterClient = TraceSegmentReportServiceClient<Channel>;
async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<(), tonic::Status> {
let stream = async_stream::stream! {
yield context;
};
match client.collect(stream).await {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
pub struct Reporter {
tx: mpsc::Sender<TracingContext>,
shutdown_tx: mpsc::Sender<()>,
}
static CHANNEL_BUF_SIZE: usize = 1024;
impl Reporter {
/// Open gRPC client stream to send collected trace context.
/// This function generates a new async task which watch to arrive new trace context.
/// We can send collected context to push into sender.
///
/// # Example
///
/// ```no_run
/// use std::error::Error;
///
/// use tokio;
///
/// use skywalking_rust::context::trace_context::TracingContext;
/// use skywalking_rust::reporter::grpc::Reporter;
///
/// #[tokio::main]
/// async fn main () -> Result<(), Box<dyn Error>> {
/// let reporter = Reporter::start("localhost:12800").await;
/// let mut context = TracingContext::default("service", "instance");
/// reporter.sender().send(context).await?;
/// reporter.shutdown().await?;
/// Ok(())
/// }
/// ```
pub async fn start(address: impl Into<String>) -> Self {
let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
mpsc::channel(CHANNEL_BUF_SIZE);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
let mut reporter = ReporterClient::connect(address.into()).await.unwrap();
tokio::spawn(async move {
loop {
tokio::select! {
message = rx.recv() => {
if let Some(message) = message {
flush(&mut reporter, message.convert_segment_object()).await.unwrap();
} else {
break;
}
},
_ = shutdown_rx.recv() => {
break;
}
}
}
rx.close();
while let Some(message) = rx.recv().await {
flush(&mut reporter, message.convert_segment_object())
.await
.unwrap();
}
});
Self { tx, shutdown_tx }
}
pub async fn shutdown(self) -> Result<(), Box<dyn Error>> {
self.shutdown_tx.send(()).await?;
self.shutdown_tx.closed().await;
Ok(())
}
pub fn sender(&self) -> mpsc::Sender<TracingContext> {
self.tx.clone()
}
}