blob: 31676e7ca2991b1100b19f17cc5f580905329f7b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//! Grpc implementation of [Report].
use super::{CollectItemConsume, CollectItemProduce};
#[cfg(feature = "management")]
use crate::proto::v3::management_service_client::ManagementServiceClient;
use crate::{
proto::v3::{
log_report_service_client::LogReportServiceClient,
meter_report_service_client::MeterReportServiceClient,
trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData,
SegmentObject,
},
reporter::{CollectItem, Report},
};
use futures_core::Stream;
use futures_util::future::{try_join_all, TryJoinAll};
use std::{
error::Error,
future::{pending, Future},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use tokio::{
select,
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
task::JoinHandle,
try_join,
};
use tokio_stream::StreamExt;
use tonic::{
metadata::{Ascii, MetadataValue},
service::{interceptor::InterceptedService, Interceptor},
transport::{self, Channel, Endpoint},
Request, Status,
};
use tracing::error;
type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
type DynErrHandler = dyn Fn(&str, &dyn Error) + Send + Sync + 'static;
type DynStatusHandler = dyn Fn(&str, &Status) + Send + Sync + 'static;
fn default_err_handle(message: &str, err: &dyn Error) {
error!(?err, "{}", message);
}
fn default_status_handle(message: &str, status: &Status) {
error!(?status, "{}", message);
}
#[derive(Default, Clone)]
struct CustomInterceptor {
authentication: Option<Arc<String>>,
custom_intercept: Option<Arc<DynInterceptHandler>>,
}
impl Interceptor for CustomInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
if let Some(authentication) = &self.authentication {
if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
request
.metadata_mut()
.insert("authentication", authentication);
}
}
if let Some(custom_intercept) = &self.custom_intercept {
request = custom_intercept(request)?;
}
Ok(request)
}
}
struct State {
is_reporting: AtomicBool,
is_closing: AtomicBool,
}
impl State {
fn is_closing(&self) -> bool {
self.is_closing.load(Ordering::Relaxed)
}
}
/// Reporter which will report to Skywalking OAP server via grpc protocol.
pub struct GrpcReporter<P, C> {
state: Arc<State>,
producer: Arc<P>,
consumer: Arc<Mutex<Option<C>>>,
err_handle: Arc<DynErrHandler>,
channel: Channel,
interceptor: CustomInterceptor,
}
impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
/// New with exists [Channel], so you can clone the [Channel] for multiplex.
pub fn new(channel: Channel) -> Self {
let (p, c) = mpsc::unbounded_channel();
Self::new_with_pc(channel, p, c)
}
/// Connect to the Skywalking OAP server.
pub async fn connect(
address: impl TryInto<Endpoint, Error = transport::Error>,
) -> crate::Result<Self> {
let endpoint = address.try_into()?;
let channel = endpoint.connect().await?;
Ok(Self::new(channel))
}
}
impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
/// Special purpose, used for user-defined produce and consume operations,
/// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
Self {
state: Arc::new(State {
is_reporting: Default::default(),
is_closing: Default::default(),
}),
producer: Arc::new(producer),
consumer: Arc::new(Mutex::new(Some(consumer))),
err_handle: Arc::new(default_err_handle),
channel,
interceptor: Default::default(),
}
}
/// Set error handle. By default, the error will not be handle.
pub fn with_err_handle(
mut self,
handle: impl Fn(&str, &dyn Error) + Send + Sync + 'static,
) -> Self {
self.err_handle = Arc::new(handle);
self
}
/// Set the authentication header value. By default, the authentication is
/// not set.
pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
self.interceptor.authentication = Some(Arc::new(authentication.into()));
self
}
/// Set the custom intercept. By default, the custom intercept is not set.
pub fn with_custom_intercept(
mut self,
custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
) -> Self {
self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
self
}
/// Start to reporting.
///
/// # Panics
///
/// Panic if call more than once.
pub async fn reporting(&self) -> Reporting<C> {
if self.state.is_reporting.swap(true, Ordering::Relaxed) {
panic!("reporting already called");
}
let (trace_sender, trace_receiver) = mpsc::channel(255);
let (log_sender, log_receiver) = mpsc::channel(255);
let (meter_sender, meter_receiver) = mpsc::channel(255);
let status_handle = Arc::new(default_status_handle);
Reporting {
report_sender: ReportSender {
state: Arc::clone(&self.state),
inner_report_sender: InnerReportSender {
status_handle: Arc::new(default_status_handle),
err_handle: self.err_handle.clone(),
trace_sender,
log_sender,
meter_sender,
#[cfg(feature = "management")]
management_client: ManagementServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
},
shutdown_signal: Box::pin(pending()),
consumer: self.consumer.lock().await.take().unwrap(),
},
trace_receive_reporter: TraceReceiveReporter {
trace_client: TraceSegmentReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
trace_receiver,
status_handle: status_handle.clone(),
},
log_receive_reporter: LogReceiveReporter {
log_client: LogReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
log_receiver,
status_handle: status_handle.clone(),
},
meter_receive_reporter: MeterReceiveReporter {
meter_client: MeterReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
meter_receiver,
status_handle,
},
}
}
}
impl<P, C> Clone for GrpcReporter<P, C> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
producer: self.producer.clone(),
consumer: self.consumer.clone(),
err_handle: self.err_handle.clone(),
channel: self.channel.clone(),
interceptor: self.interceptor.clone(),
}
}
}
impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
fn report(&self, item: CollectItem) {
if !self.state.is_closing() {
if let Err(e) = self.producer.produce(item) {
(self.err_handle)("report collect item failed", &*e);
}
}
}
}
struct InnerReportSender {
status_handle: Arc<DynStatusHandler>,
err_handle: Arc<DynErrHandler>,
trace_sender: Sender<SegmentObject>,
log_sender: Sender<LogData>,
meter_sender: Sender<MeterData>,
#[cfg(feature = "management")]
management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
}
impl InnerReportSender {
async fn report(&mut self, item: CollectItem) {
match item {
CollectItem::Trace(item) => {
if let Err(e) = self.trace_sender.try_send(*item) {
(self.err_handle)("report trace segment failed", &e as &dyn Error);
}
}
CollectItem::Log(item) => {
if let Err(e) = self.log_sender.try_send(*item) {
(self.err_handle)("report log data failed", &e as &dyn Error);
}
}
CollectItem::Meter(item) => {
if let Err(e) = self.meter_sender.try_send(*item) {
(self.err_handle)("report meter data failed", &e as &dyn Error);
}
}
#[cfg(feature = "management")]
CollectItem::Instance(item) => {
if let Err(e) = self
.management_client
.report_instance_properties(*item)
.await
{
(self.status_handle)("Report instance properties failed", &e);
}
}
#[cfg(feature = "management")]
CollectItem::Ping(item) => {
if let Err(e) = self.management_client.keep_alive(*item).await {
(self.status_handle)("Ping failed", &e);
}
}
}
}
}
struct ReportSender<C> {
state: Arc<State>,
inner_report_sender: InnerReportSender,
consumer: C,
shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
}
impl<C: CollectItemConsume> ReportSender<C> {
async fn start(self) -> crate::Result<()> {
let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
let ReportSender {
state,
mut inner_report_sender,
consumer: mut collect_item_consumer,
shutdown_signal,
..
} = self;
let work_fut = async move {
loop {
select! {
item = collect_item_consumer.consume() => {
match item {
Ok(Some(item)) => {
inner_report_sender.report(item).await;
}
Ok(None) => break,
Err(err) => return Err(crate::Error::Other(err)),
}
}
_ = shutdown_rx.recv() => break,
}
}
state.is_closing.store(true, Ordering::Relaxed);
// Flush.
loop {
match collect_item_consumer.try_consume().await {
Ok(Some(item)) => {
inner_report_sender.report(item).await;
}
Ok(None) => break,
Err(err) => return Err(err.into()),
}
}
Ok::<_, crate::Error>(())
};
let shutdown_fut = async move {
shutdown_signal.await;
shutdown_tx
.send(())
.map_err(|e| crate::Error::Other(Box::new(e)))?;
Ok(())
};
try_join!(work_fut, shutdown_fut)?;
Ok(())
}
}
/// Handle of [GrpcReporter::reporting].
pub struct Reporting<C> {
report_sender: ReportSender<C>,
trace_receive_reporter: TraceReceiveReporter,
log_receive_reporter: LogReceiveReporter,
meter_receive_reporter: MeterReceiveReporter,
}
impl<C: CollectItemConsume> Reporting<C> {
/// Quit when shutdown_signal received.
///
/// Accept a `shutdown_signal` argument as a graceful shutdown signal.
pub fn with_graceful_shutdown(
mut self,
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
) -> Self {
self.report_sender.shutdown_signal = Box::pin(shutdown_signal);
self
}
/// Set the failed status handle. By default, the status will not be handle.
pub fn with_status_handle(
mut self,
handle: impl Fn(&str, &Status) + Send + Sync + 'static,
) -> Self {
let handle = Arc::new(handle);
self.report_sender.inner_report_sender.status_handle = handle.clone();
self.trace_receive_reporter.status_handle = handle.clone();
self.log_receive_reporter.status_handle = handle.clone();
self.meter_receive_reporter.status_handle = handle;
self
}
/// Spawn the reporting in background.
pub fn spawn(self) -> ReportingJoinHandle {
ReportingJoinHandle {
handles: try_join_all(vec![
tokio::spawn(self.report_sender.start()),
tokio::spawn(self.trace_receive_reporter.start()),
tokio::spawn(self.log_receive_reporter.start()),
tokio::spawn(self.meter_receive_reporter.start()),
]),
}
}
}
/// Handle of [Reporting::spawn].
pub struct ReportingJoinHandle {
handles: TryJoinAll<JoinHandle<crate::Result<()>>>,
}
impl Future for ReportingJoinHandle {
type Output = crate::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handles).poll(cx).map(|rs| {
let rs = rs?;
for r in rs {
r?;
}
Ok(())
})
}
}
struct TraceReceiveReporter {
trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
trace_receiver: Receiver<SegmentObject>,
status_handle: Arc<DynStatusHandler>,
}
impl TraceReceiveReporter {
async fn start(mut self) -> crate::Result<()> {
let rf = ReceiveFrom::new(self.trace_receiver);
while let Some(stream) = rf.stream() {
if let Err(err) = self.trace_client.collect(stream).await {
(self.status_handle)("Collect trace segment by stream failed", &err);
}
}
Ok(())
}
}
struct LogReceiveReporter {
log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
log_receiver: Receiver<LogData>,
status_handle: Arc<DynStatusHandler>,
}
impl LogReceiveReporter {
async fn start(mut self) -> crate::Result<()> {
let rf = ReceiveFrom::new(self.log_receiver);
while let Some(stream) = rf.stream() {
if let Err(err) = self.log_client.collect(stream).await {
(self.status_handle)("Collect log data by stream failed", &err);
}
}
Ok(())
}
}
struct MeterReceiveReporter {
meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
meter_receiver: Receiver<MeterData>,
status_handle: Arc<DynStatusHandler>,
}
impl MeterReceiveReporter {
async fn start(mut self) -> crate::Result<()> {
let rf = ReceiveFrom::new(self.meter_receiver);
while let Some(stream) = rf.stream() {
if let Err(err) = self.meter_client.collect(stream).await {
(self.status_handle)("Collect meter data by stream failed", &err);
}
}
Ok(())
}
}
struct ReceiveFrom<I> {
receiver: Arc<Mutex<Receiver<I>>>,
is_closed: Arc<AtomicBool>,
}
impl<I> ReceiveFrom<I> {
fn new(receiver: Receiver<I>) -> Self {
Self {
receiver: Arc::new(Mutex::new(receiver)),
is_closed: Default::default(),
}
}
fn stream(&self) -> Option<impl Stream<Item = I>> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
let is_closed = self.is_closed.clone();
let receiver = self.receiver.clone();
Some(
ReceiveFromStream {
receiver,
is_closed,
}
.timeout(Duration::from_secs(30))
.map_while(|item| item.ok()),
)
}
}
struct ReceiveFromStream<I> {
receiver: Arc<Mutex<Receiver<I>>>,
is_closed: Arc<AtomicBool>,
}
impl<I> Stream for ReceiveFromStream<I> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.try_lock().unwrap().poll_recv(cx).map(|item| {
if item.is_none() {
self.is_closed.store(true, Ordering::Relaxed);
}
item
})
}
}