blob: 43f96648ad698d3e9e563bafddf9c5c2c87c144a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
use crate::context::trace_context::TracingContext;
use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient;
use crate::skywalking_proto::v3::SegmentObject;
use tokio::sync::mpsc;
use tonic::transport::Channel;
pub type ReporterClient = TraceSegmentReportServiceClient<Channel>;
async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<(), tonic::Status> {
let stream = async_stream::stream! {
yield context;
};
match client.collect(stream).await {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
pub struct Reporter {}
static CHANNEL_BUF_SIZE: usize = 1024;
pub type ContextReporter = mpsc::Sender<TracingContext>;
impl Reporter {
/// Open gRPC client stream to send collected trace context.
/// This function generates a new async task which watch to arrive new trace context.
/// We can send collected context to push into sender.
///
/// # Example
///
/// ```
/// use tokio;
///
/// #[tokio::main]
/// async fn main {
/// let tx = Reporter::start("localhost:12800");
/// tx.send(context).await;
/// }
/// ```
pub async fn start(address: &str) -> ContextReporter {
let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
mpsc::channel(CHANNEL_BUF_SIZE);
let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap();
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
flush(&mut reporter, message.convert_segment_object())
.await
.unwrap();
}
});
tx
}
}