Add metrics APIs. (#44)
diff --git a/Cargo.toml b/Cargo.toml
index aef34c9..b4de204 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -43,6 +43,7 @@
cfg-if = "1.0.0"
futures-core = "0.3.21"
futures-util = "0.3.21"
+portable-atomic = { version = "0.3.13", features = ["float"] }
prost = "0.11.0"
prost-derive = "0.11.0"
serde = { version = "1.0.143", features = ["derive"] }
@@ -68,6 +69,10 @@
name = "logging"
required-features = ["mock"]
+[[test]]
+name = "metrics"
+required-features = ["mock"]
+
[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"
diff --git a/README.md b/README.md
index 7cf924a..97211d4 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,14 @@
LogRecord is the simple builder for the LogData, which is the Log format of Skywalking.
+## Metrics
+
+### Meter
+
+- **Counter** API represents a single monotonically increasing counter which automatically collects data and reports to the backend.
+- **Gauge** API represents a single numerical value.
+- **Histogram** API represents a summary sample observations with customized buckets.
+
# Example
```rust, no_run
@@ -54,6 +62,7 @@
logging::{logger::Logger, record::{LogRecord, RecordType}},
reporter::grpc::GrpcReporter,
trace::tracer::Tracer,
+ metrics::{meter::Counter, metricer::Metricer},
};
use std::error::Error;
use tokio::signal;
@@ -94,6 +103,18 @@
// Auto report ctx when dropped.
}
+async fn handle_metric(mut metricer: Metricer) {
+ let counter = metricer.register(
+ Counter::new("instance_trace_count")
+ .add_label("region", "us-west")
+ .add_label("az", "az-1"),
+ );
+
+ metricer.boot().await;
+
+ counter.increment(10.);
+}
+
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
@@ -109,7 +130,10 @@
.spawn();
let tracer = Tracer::new("service", "instance", reporter.clone());
- let logger = Logger::new("service", "instance", reporter);
+ let logger = Logger::new("service", "instance", reporter.clone());
+ let metricer = Metricer::new("service", "instance", reporter);
+
+ handle_metric(metricer).await;
handle_request(tracer, logger).await;
@@ -167,7 +191,7 @@
# Release
-The SkyWalking committer(PMC included) could follow [this doc](Release-guide.md) to release an official version.
+The SkyWalking committer(PMC included) could follow [this doc](https://github.com/apache/skywalking-rust/blob/master/Release-guide.md) to release an official version.
# License
diff --git a/build.rs b/build.rs
index fa64567..7ef9a20 100644
--- a/build.rs
+++ b/build.rs
@@ -24,6 +24,7 @@
.compile(
&[
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
+ "./skywalking-data-collect-protocol/language-agent/Meter.proto",
"./skywalking-data-collect-protocol/logging/Logging.proto",
],
&["./skywalking-data-collect-protocol"],
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 9778a2a..6551e9e 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -146,3 +146,33 @@
value: DEBUG
timestamp: gt 0
serviceName: consumer
+
+meterItems:
+- serviceName: consumer
+ meterSize: 3
+ meters:
+ - meterId:
+ name: instance_trace_count
+ tags:
+ - name: region
+ value: us-west
+ - name: az
+ value: az-1
+ singleValue: 30.0
+ - meterId:
+ name: instance_trace_count
+ tags:
+ - name: region
+ value: us-east
+ - name: az
+ value: az-3
+ singleValue: 20.0
+ - meterId:
+ name: instance_trace_count
+ tags:
+ - name: region
+ value: us-north
+ - name: az
+ value: az-1
+ histogramBuckets: [10.0, 20.0, 30.0]
+ histogramValues: [1, 2, 0]
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index f3ffd57..dc88364 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -25,6 +25,10 @@
logger::{self, Logger},
record::{LogRecord, RecordType},
},
+ metrics::{
+ meter::{Counter, Gauge, Histogram},
+ metricer::Metricer,
+ },
reporter::grpc::GrpcReporter,
trace::{
propagation::{
@@ -172,6 +176,33 @@
}
}
+fn run_consumer_metric(mut metricer: Metricer) {
+ let counter = metricer.register(
+ Counter::new("instance_trace_count")
+ .add_label("region", "us-west")
+ .add_label("az", "az-1"),
+ );
+ metricer.register(
+ Gauge::new("instance_trace_count", || 20.)
+ .add_label("region", "us-east")
+ .add_label("az", "az-3"),
+ );
+ let histogram = metricer.register(
+ Histogram::new("instance_trace_count", vec![10., 20., 30.])
+ .add_label("region", "us-north")
+ .add_label("az", "az-1"),
+ );
+
+ counter.increment(10.);
+ counter.increment(20.);
+
+ histogram.add_value(10.);
+ histogram.add_value(29.);
+ histogram.add_value(20.);
+
+ metricer.boot();
+}
+
#[derive(StructOpt)]
#[structopt(name = "basic")]
struct Opt {
@@ -187,7 +218,8 @@
if opt.mode == "consumer" {
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone()));
- logger::set_global_logger(Logger::new("consumer", "node_0", reporter));
+ logger::set_global_logger(Logger::new("consumer", "node_0", reporter.clone()));
+ run_consumer_metric(Metricer::new("consumer", "node_0", reporter));
run_consumer_service([0, 0, 0, 0]).await;
} else if opt.mode == "producer" {
tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone()));
diff --git a/examples/simple_metric_report.rs b/examples/simple_metric_report.rs
new file mode 100644
index 0000000..ef4f9c8
--- /dev/null
+++ b/examples/simple_metric_report.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+use skywalking::{
+ metrics::{meter::Counter, metricer::Metricer},
+ reporter::grpc::GrpcReporter,
+};
+use std::error::Error;
+use tokio::signal;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ // Connect to skywalking oap server.
+ let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+
+ // Spawn the reporting in background, with listening the graceful shutdown
+ // signal.
+ let handle = reporter
+ .reporting()
+ .await
+ .with_graceful_shutdown(async move {
+ signal::ctrl_c().await.expect("failed to listen for event");
+ })
+ .spawn();
+
+ // Do metrics.
+ let mut metricer = Metricer::new("service", "instance", reporter.clone());
+ let counter = metricer.register(
+ Counter::new("instance_trace_count")
+ .add_label("region", "us-west")
+ .add_label("az", "az-1"),
+ );
+
+ counter.increment(1.);
+
+ metricer.boot().await.unwrap();
+ handle.await.unwrap();
+
+ Ok(())
+}
diff --git a/src/common/system_time.rs b/src/common/system_time.rs
index ee97b4e..6513e81 100644
--- a/src/common/system_time.rs
+++ b/src/common/system_time.rs
@@ -19,6 +19,7 @@
pub(crate) enum TimePeriod {
Start,
Log,
+ Metric,
End,
}
@@ -28,6 +29,7 @@
match period {
TimePeriod::Start => 1,
TimePeriod::Log => 10,
+ TimePeriod::Metric => 10,
TimePeriod::End => 100,
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 9cfd58b..7d4cc0c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -20,6 +20,7 @@
pub mod common;
pub(crate) mod error;
pub mod logging;
+pub mod metrics;
pub mod reporter;
pub mod skywalking_proto;
pub mod trace;
diff --git a/src/logging/logger.rs b/src/logging/logger.rs
index 2571ea7..f1bb2dc 100644
--- a/src/logging/logger.rs
+++ b/src/logging/logger.rs
@@ -22,8 +22,8 @@
static GLOBAL_LOGGER: OnceCell<Logger> = OnceCell::const_new();
/// Set the global logger.
-pub fn set_global_logger(tracer: Logger) {
- if GLOBAL_LOGGER.set(tracer).is_err() {
+pub fn set_global_logger(logger: Logger) {
+ if GLOBAL_LOGGER.set(logger).is_err() {
panic!("global logger has setted")
}
}
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 87f22cf..02f5a83 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -55,16 +55,19 @@
Default::default()
}
+ #[inline]
pub fn custome_time(mut self, time: SystemTime) -> Self {
self.time = Some(time);
self
}
+ #[inline]
pub fn ignore_time(mut self) -> Self {
self.is_ignore_time = true;
self
}
+ #[inline]
pub fn endpoint(mut self, endpoint: impl ToString) -> Self {
self.endpoint = endpoint.to_string();
self
diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs
new file mode 100644
index 0000000..ea97852
--- /dev/null
+++ b/src/metrics/meter.rs
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use crate::{
+ common::system_time::{fetch_time, TimePeriod},
+ metrics::metricer::Metricer,
+ skywalking_proto::v3::{
+ meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue,
+ },
+};
+use portable_atomic::AtomicF64;
+use std::{
+ cmp::Ordering::Equal,
+ sync::atomic::{AtomicI64, Ordering},
+};
+
+pub trait Transform: Send + Sync {
+ fn meter_id(&self) -> MeterId;
+
+ fn transform(&self, metricer: &Metricer) -> MeterData;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub(crate) enum MeterType {
+ Counter,
+ Gauge,
+ Histogram,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct MeterId {
+ name: String,
+ typ: MeterType,
+ labels: Vec<(String, String)>,
+}
+
+impl MeterId {
+ fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+ self.labels.push((key.to_string(), value.to_string()));
+ self
+ }
+
+ fn add_labels<K, V, I>(mut self, tags: I) -> Self
+ where
+ K: ToString,
+ V: ToString,
+ I: IntoIterator<Item = (K, V)>,
+ {
+ self.labels.extend(
+ tags.into_iter()
+ .map(|(k, v)| (k.to_string(), v.to_string())),
+ );
+ self
+ }
+}
+
+/// Counter mode.
+pub enum CounterMode {
+ /// INCREMENT mode represents reporting the latest value.
+ INCREMENT,
+
+ /// RATE mode represents reporting the increment rate. Value = latest value
+ /// - last reported value.
+ RATE,
+}
+
+pub struct Counter {
+ id: MeterId,
+ mode: CounterMode,
+ count: AtomicF64,
+ previous_count: AtomicF64,
+}
+
+impl Counter {
+ #[inline]
+ pub fn new(name: impl ToString) -> Self {
+ Self {
+ id: MeterId {
+ name: name.to_string(),
+ typ: MeterType::Counter,
+ labels: vec![],
+ },
+ mode: CounterMode::INCREMENT,
+ count: AtomicF64::new(0.),
+ previous_count: AtomicF64::new(0.),
+ }
+ }
+
+ #[inline]
+ pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+ self.id = self.id.add_label(key, value);
+ self
+ }
+
+ #[inline]
+ pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
+ where
+ K: ToString,
+ V: ToString,
+ I: IntoIterator<Item = (K, V)>,
+ {
+ self.id = self.id.add_labels(tags);
+ self
+ }
+
+ #[inline]
+ pub fn mode(mut self, mode: CounterMode) -> Self {
+ self.mode = mode;
+ self
+ }
+
+ pub fn increment(&self, count: f64) {
+ self.count.fetch_add(count, Ordering::Acquire);
+ }
+
+ pub fn get(&self) -> f64 {
+ self.count.load(Ordering::Acquire)
+ }
+}
+
+impl Transform for Counter {
+ fn meter_id(&self) -> MeterId {
+ self.id.clone()
+ }
+
+ fn transform(&self, metricer: &Metricer) -> MeterData {
+ MeterData {
+ service: metricer.service_name().to_owned(),
+ service_instance: metricer.instance_name().to_owned(),
+ timestamp: fetch_time(TimePeriod::Metric),
+ metric: Some(Metric::SingleValue(MeterSingleValue {
+ name: self.id.name.to_owned(),
+ labels: self
+ .id
+ .labels
+ .iter()
+ .map(|(name, value)| Label {
+ name: name.clone(),
+ value: value.clone(),
+ })
+ .collect(),
+ value: match self.mode {
+ CounterMode::INCREMENT => self.get(),
+ CounterMode::RATE => {
+ let current_count = self.get();
+ let previous_count =
+ self.previous_count.swap(current_count, Ordering::Acquire);
+ current_count - previous_count
+ }
+ },
+ })),
+ }
+ }
+}
+
+pub struct Gauge<G> {
+ id: MeterId,
+ getter: G,
+}
+
+impl<G: Fn() -> f64> Gauge<G> {
+ #[inline]
+ pub fn new(name: impl ToString, getter: G) -> Self {
+ Self {
+ id: MeterId {
+ name: name.to_string(),
+ typ: MeterType::Gauge,
+ labels: vec![],
+ },
+ getter,
+ }
+ }
+
+ #[inline]
+ pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+ self.id = self.id.add_label(key, value);
+ self
+ }
+
+ #[inline]
+ pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
+ where
+ K: ToString,
+ V: ToString,
+ I: IntoIterator<Item = (K, V)>,
+ {
+ self.id = self.id.add_labels(tags);
+ self
+ }
+
+ pub fn get(&self) -> f64 {
+ (self.getter)()
+ }
+}
+
+impl<G: Fn() -> f64 + Send + Sync> Transform for Gauge<G> {
+ fn meter_id(&self) -> MeterId {
+ self.id.clone()
+ }
+
+ fn transform(&self, metricer: &Metricer) -> MeterData {
+ MeterData {
+ service: metricer.service_name().to_owned(),
+ service_instance: metricer.instance_name().to_owned(),
+ timestamp: fetch_time(TimePeriod::Metric),
+ metric: Some(Metric::SingleValue(MeterSingleValue {
+ name: self.id.name.to_owned(),
+ labels: self
+ .id
+ .labels
+ .iter()
+ .map(|(name, value)| Label {
+ name: name.clone(),
+ value: value.clone(),
+ })
+ .collect(),
+ value: self.get(),
+ })),
+ }
+ }
+}
+
+struct Bucket {
+ bucket: f64,
+ count: AtomicI64,
+}
+
+impl Bucket {
+ fn new(bucker: f64) -> Self {
+ Self {
+ bucket: bucker,
+ count: Default::default(),
+ }
+ }
+}
+
+pub struct Histogram {
+ id: MeterId,
+ buckets: Vec<Bucket>,
+}
+
+impl Histogram {
+ pub fn new(name: impl ToString, mut steps: Vec<f64>) -> Self {
+ Self {
+ id: MeterId {
+ name: name.to_string(),
+ typ: MeterType::Histogram,
+ labels: vec![],
+ },
+ buckets: {
+ steps.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Equal));
+ steps.dedup();
+ steps.into_iter().map(Bucket::new).collect()
+ },
+ }
+ }
+
+ #[inline]
+ pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+ self.id = self.id.add_label(key, value);
+ self
+ }
+
+ #[inline]
+ pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
+ where
+ K: ToString,
+ V: ToString,
+ I: IntoIterator<Item = (K, V)>,
+ {
+ self.id = self.id.add_labels(tags);
+ self
+ }
+
+ pub fn add_value(&self, value: f64) {
+ if let Some(index) = self.find_bucket(value) {
+ self.buckets[index].count.fetch_add(1, Ordering::Acquire);
+ }
+ }
+
+ fn find_bucket(&self, value: f64) -> Option<usize> {
+ match self
+ .buckets
+ .binary_search_by(|bucket| bucket.bucket.partial_cmp(&value).unwrap_or(Equal))
+ {
+ Ok(i) => Some(i),
+ Err(i) => {
+ if i >= 1 {
+ Some(i - 1)
+ } else {
+ None
+ }
+ }
+ }
+ }
+}
+
+impl Transform for Histogram {
+ fn meter_id(&self) -> MeterId {
+ self.id.clone()
+ }
+
+ fn transform(&self, metricer: &Metricer) -> MeterData {
+ MeterData {
+ service: metricer.service_name().to_owned(),
+ service_instance: metricer.instance_name().to_owned(),
+ timestamp: fetch_time(TimePeriod::Metric),
+ metric: Some(Metric::Histogram(MeterHistogram {
+ name: self.id.name.to_owned(),
+ labels: self
+ .id
+ .labels
+ .iter()
+ .map(|(name, value)| Label {
+ name: name.clone(),
+ value: value.clone(),
+ })
+ .collect(),
+ values: self
+ .buckets
+ .iter()
+ .map(|bucket| MeterBucketValue {
+ bucket: bucket.bucket,
+ count: bucket.count.load(Ordering::Acquire),
+ is_negative_infinity: false,
+ })
+ .collect(),
+ })),
+ }
+ }
+}
diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs
new file mode 100644
index 0000000..81b8351
--- /dev/null
+++ b/src/metrics/metricer.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use super::meter::{MeterId, Transform};
+use crate::reporter::{CollectItem, DynReport, Report};
+use std::{
+ collections::HashMap,
+ future::Future,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+ time::Duration,
+};
+use tokio::{
+ select, spawn,
+ sync::mpsc,
+ task::{spawn_blocking, JoinError, JoinHandle},
+ time::interval,
+};
+
+pub struct Metricer {
+ service_name: String,
+ instance_name: String,
+ reporter: Box<DynReport>,
+ meter_map: HashMap<MeterId, Arc<dyn Transform>>,
+ report_interval: Duration,
+}
+
+impl Metricer {
+ /// New with service info and reporter.
+ pub fn new(
+ service_name: impl ToString,
+ instance_name: impl ToString,
+ reporter: impl Report + Send + Sync + 'static,
+ ) -> Self {
+ Self {
+ service_name: service_name.to_string(),
+ instance_name: instance_name.to_string(),
+ reporter: Box::new(reporter),
+ meter_map: Default::default(),
+ report_interval: Duration::from_secs(20),
+ }
+ }
+
+ pub fn service_name(&self) -> &str {
+ &self.service_name
+ }
+
+ pub fn instance_name(&self) -> &str {
+ &self.instance_name
+ }
+
+ pub fn set_report_interval(&mut self, report_interval: Duration) {
+ self.report_interval = report_interval;
+ }
+
+ pub fn register<T: Transform + 'static>(&mut self, transform: T) -> Arc<T> {
+ let transform = Arc::new(transform);
+ self.meter_map
+ .insert(transform.meter_id(), transform.clone());
+ transform
+ }
+
+ pub fn boot(self) -> Booting {
+ let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
+
+ let handle = spawn(async move {
+ let mut ticker = interval(self.report_interval);
+ let metricer = Arc::new(self);
+ loop {
+ let metricer_ = metricer.clone();
+ let _ = spawn_blocking(move || {
+ for trans in metricer_.meter_map.values() {
+ metricer_
+ .reporter
+ .report(CollectItem::Meter(trans.transform(&metricer_)));
+ }
+ })
+ .await;
+
+ select! {
+ _ = ticker.tick() => {}
+ _ = shutdown_rx.recv() => { return; }
+ }
+ }
+ });
+ Booting {
+ handle,
+ shutdown_tx,
+ }
+ }
+}
+
+pub struct Booting {
+ handle: JoinHandle<()>,
+ shutdown_tx: mpsc::Sender<()>,
+}
+
+impl Booting {
+ pub async fn shutdown(self) -> crate::Result<()> {
+ self.shutdown_tx.send(()).await.unwrap();
+ Ok(self.await?)
+ }
+}
+
+impl Future for Booting {
+ type Output = Result<(), JoinError>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.handle).poll(cx)
+ }
+}
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs
new file mode 100644
index 0000000..3c67e08
--- /dev/null
+++ b/src/metrics/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+pub mod meter;
+pub mod metricer;
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index eb92517..fcea63d 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -18,7 +18,8 @@
reporter::{CollectItem, Report},
skywalking_proto::v3::{
log_report_service_client::LogReportServiceClient,
- trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData,
+ meter_report_service_client::MeterReportServiceClient,
+ trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData,
SegmentObject,
},
};
@@ -102,6 +103,7 @@
struct Inner<P, C> {
trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
log_client: Mutex<LogReportServiceClient<Channel>>,
+ meter_client: Mutex<MeterReportServiceClient<Channel>>,
producer: P,
consumer: Mutex<Option<C>>,
is_reporting: AtomicBool,
@@ -135,7 +137,8 @@
Self {
inner: Arc::new(Inner {
trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
- log_client: Mutex::new(LogReportServiceClient::new(channel)),
+ log_client: Mutex::new(LogReportServiceClient::new(channel.clone())),
+ meter_client: Mutex::new(MeterReportServiceClient::new(channel)),
producer,
consumer: Mutex::new(Some(consumer)),
is_reporting: Default::default(),
@@ -168,9 +171,10 @@
Reporting {
rb: ReporterAndBuffer {
inner: Arc::clone(&self.inner),
+ status_handle: None,
trace_buffer: Default::default(),
log_buffer: Default::default(),
- status_handle: None,
+ meter_buffer: Default::default(),
},
shutdown_signal: Box::pin(pending()),
consumer: self.inner.consumer.lock().await.take().unwrap(),
@@ -201,9 +205,10 @@
struct ReporterAndBuffer<P, C> {
inner: Arc<Inner<P, C>>,
+ status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
trace_buffer: LinkedList<SegmentObject>,
log_buffer: LinkedList<LogData>,
- status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
+ meter_buffer: LinkedList<MeterData>,
}
impl<P: CollectItemProduce, C: ColletcItemConsume> ReporterAndBuffer<P, C> {
@@ -216,6 +221,9 @@
CollectItem::Log(item) => {
self.log_buffer.push_back(item);
}
+ CollectItem::Meter(item) => {
+ self.meter_buffer.push_back(item);
+ }
}
if !self.trace_buffer.is_empty() {
@@ -248,6 +256,22 @@
}
}
}
+
+ if !self.meter_buffer.is_empty() {
+ let buffer = take(&mut self.meter_buffer);
+ if let Err(e) = self
+ .inner
+ .meter_client
+ .lock()
+ .await
+ .collect(stream::iter(buffer))
+ .await
+ {
+ if let Some(status_handle) = &self.status_handle {
+ status_handle(e);
+ }
+ }
+ }
}
}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 044afa2..79735bf 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -17,7 +17,7 @@
pub mod grpc;
pub mod print;
-use crate::skywalking_proto::v3::{LogData, SegmentObject};
+use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject};
use serde::{Deserialize, Serialize};
use std::{ops::Deref, sync::Arc};
use tokio::sync::OnceCell;
@@ -27,6 +27,7 @@
pub enum CollectItem {
Trace(SegmentObject),
Log(LogData),
+ Meter(MeterData),
}
pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index 6b355e5..e5b640f 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -36,11 +36,11 @@
impl Report for PrintReporter {
fn report(&self, items: CollectItem) {
match items {
- CollectItem::Trace(segment) => {
+ CollectItem::Trace(data) => {
if self.use_stderr {
- eprintln!("trace segment={:?}", segment);
+ eprintln!("trace segment={:?}", data);
} else {
- println!("trace segment={:?}", segment);
+ println!("trace segment={:?}", data);
}
}
CollectItem::Log(data) => {
@@ -50,6 +50,13 @@
println!("log data={:?}", data);
}
}
+ CollectItem::Meter(data) => {
+ if self.use_stderr {
+ eprintln!("meter data={:?}", data);
+ } else {
+ println!("meter data={:?}", data);
+ }
+ }
}
}
}
diff --git a/tests/metrics.rs b/tests/metrics.rs
new file mode 100644
index 0000000..f1d95d0
--- /dev/null
+++ b/tests/metrics.rs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use skywalking::{
+ metrics::{
+ meter::{Counter, Gauge, Histogram},
+ metricer::Metricer,
+ },
+ reporter::{CollectItem, Report},
+ skywalking_proto::v3::{
+ meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue,
+ },
+};
+use std::{
+ collections::LinkedList,
+ sync::{Arc, Mutex},
+};
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn metrics() {
+ let reporter = Arc::new(MockReporter::default());
+
+ {
+ let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone());
+ let counter = metricer.register(
+ Counter::new("instance_trace_count")
+ .add_label("region", "us-west")
+ .add_labels([("az", "az-1")]),
+ );
+ counter.increment(100.);
+
+ metricer.boot().shutdown().await.unwrap();
+
+ assert_eq!(
+ reporter.pop(),
+ MeterData {
+ timestamp: 10,
+ service: "service_name".to_owned(),
+ service_instance: "instance_name".to_owned(),
+ metric: Some(Metric::SingleValue(MeterSingleValue {
+ name: "instance_trace_count".to_owned(),
+ labels: vec![
+ Label {
+ name: "region".to_owned(),
+ value: "us-west".to_owned()
+ },
+ Label {
+ name: "az".to_owned(),
+ value: "az-1".to_owned()
+ },
+ ],
+ value: 100.
+ })),
+ }
+ );
+ }
+
+ {
+ let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone());
+ metricer.register(
+ Gauge::new("instance_trace_count", || 100.)
+ .add_label("region", "us-west")
+ .add_labels([("az", "az-1")]),
+ );
+
+ metricer.boot().shutdown().await.unwrap();
+
+ assert_eq!(
+ reporter.pop(),
+ MeterData {
+ timestamp: 10,
+ service: "service_name".to_owned(),
+ service_instance: "instance_name".to_owned(),
+ metric: Some(Metric::SingleValue(MeterSingleValue {
+ name: "instance_trace_count".to_owned(),
+ labels: vec![
+ Label {
+ name: "region".to_owned(),
+ value: "us-west".to_owned()
+ },
+ Label {
+ name: "az".to_owned(),
+ value: "az-1".to_owned()
+ },
+ ],
+ value: 100.
+ })),
+ }
+ );
+ }
+
+ {
+ let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone());
+ let histogram = metricer.register(
+ Histogram::new("instance_trace_count", vec![1., 2.])
+ .add_label("region", "us-west")
+ .add_labels([("az", "az-1")]),
+ );
+ histogram.add_value(1.);
+ histogram.add_value(1.5);
+ histogram.add_value(2.);
+
+ metricer.boot().shutdown().await.unwrap();
+
+ assert_eq!(
+ reporter.pop(),
+ MeterData {
+ timestamp: 10,
+ service: "service_name".to_owned(),
+ service_instance: "instance_name".to_owned(),
+ metric: Some(Metric::Histogram(MeterHistogram {
+ name: "instance_trace_count".to_owned(),
+ labels: vec![
+ Label {
+ name: "region".to_owned(),
+ value: "us-west".to_owned()
+ },
+ Label {
+ name: "az".to_owned(),
+ value: "az-1".to_owned()
+ },
+ ],
+ values: vec![
+ MeterBucketValue {
+ bucket: 1.,
+ count: 2,
+ is_negative_infinity: false
+ },
+ MeterBucketValue {
+ bucket: 2.,
+ count: 1,
+ is_negative_infinity: false
+ },
+ ]
+ })),
+ }
+ );
+ }
+}
+
+#[derive(Default, Clone)]
+struct MockReporter {
+ items: Arc<Mutex<LinkedList<MeterData>>>,
+}
+
+impl MockReporter {
+ fn pop(&self) -> MeterData {
+ self.items.try_lock().unwrap().pop_back().unwrap()
+ }
+}
+
+impl Report for MockReporter {
+ fn report(&self, item: CollectItem) {
+ match item {
+ CollectItem::Meter(data) => {
+ self.items.try_lock().unwrap().push_back(data);
+ }
+ _ => {}
+ }
+ }
+}