blob: 55ddaf2c6b7fa4a90d3abab857df8393cdb7d482 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//! Manager methods.
use super::instance::Properties;
use crate::reporter::{CollectItem, DynReport, Report};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{
spawn,
task::{JoinError, JoinHandle},
time,
};
/// Manager handles skywalking management operations, integrate with reporter.
pub struct Manager {
service_name: String,
instance_name: String,
reporter: Arc<DynReport>,
}
impl Manager {
/// New with service info and reporter.
pub fn new(
service_name: impl Into<String>,
instance_name: impl Into<String>,
reporter: impl Report + Send + Sync + 'static,
) -> Self {
Self {
service_name: service_name.into(),
instance_name: instance_name.into(),
reporter: Arc::new(reporter),
}
}
/// Get service name.
pub fn service_name(&self) -> &str {
&self.service_name
}
/// Get instance name.
pub fn instance_name(&self) -> &str {
&self.instance_name
}
/// Report instance properties.
pub fn report_properties(&self, properties: Properties) {
Self::reporter_report_properties(
&self.reporter,
self.service_name.clone(),
self.instance_name.clone(),
properties,
);
}
fn reporter_report_properties(
reporter: &Arc<DynReport>,
service_name: String,
instance_name: String,
properties: Properties,
) {
let props = properties.convert_to_instance_properties(service_name, instance_name);
reporter.report(CollectItem::Instance(Box::new(props)));
}
/// Do keep alive once.
pub fn keep_alive(&self) {
Self::reporter_keep_alive(
&self.reporter,
self.service_name.clone(),
self.instance_name.clone(),
);
}
fn reporter_keep_alive(reporter: &Arc<DynReport>, service_name: String, instance_name: String) {
reporter.report(CollectItem::Ping(Box::new(
crate::proto::v3::InstancePingPkg {
service: service_name,
service_instance: instance_name,
layer: Default::default(),
},
)));
}
/// Continuously report instance properties and keep alive. Run in
/// background.
///
/// Parameter `heartbeat_period` represents agent heartbeat report period.
///
/// Parameter `properties_report_period_factor` represents agent sends the
/// instance properties to the backend every `heartbeat_period` *
/// `properties_report_period_factor` seconds.
pub fn report_and_keep_alive(
&self,
properties: impl Fn() -> Properties + Send + 'static,
heartbeat_period: Duration,
properties_report_period_factor: usize,
) -> ReportAndKeepAlive {
let service_name = self.service_name.clone();
let instance_name = self.instance_name.clone();
let reporter = self.reporter.clone();
let handle = spawn(async move {
let mut counter = 0;
let mut ticker = time::interval(heartbeat_period);
loop {
ticker.tick().await;
if counter == 0 {
Self::reporter_report_properties(
&reporter,
service_name.clone(),
instance_name.clone(),
properties(),
);
} else {
Self::reporter_keep_alive(
&reporter,
service_name.clone(),
instance_name.clone(),
);
}
counter += 1;
if counter >= properties_report_period_factor {
counter = 0;
}
}
});
ReportAndKeepAlive { handle }
}
}
/// Handle of [Manager::report_and_keep_alive].
pub struct ReportAndKeepAlive {
handle: JoinHandle<()>,
}
impl ReportAndKeepAlive {
/// Get the inner tokio join handle.
pub fn handle(&self) -> &JoinHandle<()> {
&self.handle
}
}
impl Future for ReportAndKeepAlive {
type Output = Result<(), JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)
}
}