blob: c25b8a633c0ae92b60c1afccf9981eb783209917 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::time::Duration;
use opentelemetry::metrics::Counter;
use opentelemetry::metrics::Histogram;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::UpDownCounter;
use opentelemetry::KeyValue;
use crate::layers::observe;
use crate::raw::*;
/// Add [opentelemetry::metrics](https://docs.rs/opentelemetry/latest/opentelemetry/metrics/index.html) for every operation.
///
/// # Examples
///
/// ```no_run
/// # use opendal::layers::OtelMetricsLayer;
/// # use opendal::services;
/// # use opendal::Operator;
/// # use opendal::Result;
///
/// # fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let _ = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().register(&meter))
/// .finish();
/// Ok(())
/// # }
/// ```
#[derive(Clone, Debug)]
pub struct OtelMetricsLayer {
interceptor: OtelMetricsInterceptor,
}
impl OtelMetricsLayer {
/// Create a [`OtelMetricsLayerBuilder`] to set the configuration of metrics.
///
/// # Examples
///
/// ```no_run
/// # use opendal::layers::OtelMetricsLayer;
/// # use opendal::services;
/// # use opendal::Operator;
/// # use opendal::Result;
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let op = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().register(&meter))
/// .finish();
///
/// Ok(())
/// # }
/// ```
pub fn builder() -> OtelMetricsLayerBuilder {
OtelMetricsLayerBuilder::default()
}
}
/// [`OtelMetricsLayerBuilder`] is a config builder to build a [`OtelMetricsLayer`].
pub struct OtelMetricsLayerBuilder {
bytes_boundaries: Vec<f64>,
bytes_rate_boundaries: Vec<f64>,
entries_boundaries: Vec<f64>,
entries_rate_boundaries: Vec<f64>,
duration_seconds_boundaries: Vec<f64>,
ttfb_boundaries: Vec<f64>,
}
impl Default for OtelMetricsLayerBuilder {
fn default() -> Self {
Self {
bytes_boundaries: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
bytes_rate_boundaries: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
entries_boundaries: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
entries_rate_boundaries: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
duration_seconds_boundaries: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
ttfb_boundaries: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
}
}
}
impl OtelMetricsLayerBuilder {
/// Set boundaries for bytes related histogram like `operation_bytes`.
pub fn bytes_boundaries(mut self, boundaries: Vec<f64>) -> Self {
if !boundaries.is_empty() {
self.bytes_boundaries = boundaries;
}
self
}
/// Set boundaries for bytes rate related histogram like `operation_bytes_rate`.
pub fn bytes_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
if !boundaries.is_empty() {
self.bytes_rate_boundaries = boundaries;
}
self
}
/// Set boundaries for entries related histogram like `operation_entries`.
pub fn entries_boundaries(mut self, boundaries: Vec<f64>) -> Self {
if !boundaries.is_empty() {
self.entries_boundaries = boundaries;
}
self
}
/// Set boundaries for entries rate related histogram like `operation_entries_rate`.
pub fn entries_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
if !boundaries.is_empty() {
self.entries_rate_boundaries = boundaries;
}
self
}
/// Set boundaries for duration seconds related histogram like `operation_duration_seconds`.
pub fn duration_seconds_boundaries(mut self, boundaries: Vec<f64>) -> Self {
if !boundaries.is_empty() {
self.duration_seconds_boundaries = boundaries;
}
self
}
/// Set boundaries for ttfb related histogram like `operation_ttfb_seconds`.
pub fn ttfb_boundaries(mut self, boundaries: Vec<f64>) -> Self {
if !boundaries.is_empty() {
self.ttfb_boundaries = boundaries;
}
self
}
/// Register the metrics and return a [`OtelMetricsLayer`].
///
/// # Examples
///
/// ```no_run
/// # use opendal::layers::OtelMetricsLayer;
/// # use opendal::services;
/// # use opendal::Operator;
/// # use opendal::Result;
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let op = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().register(&meter))
/// .finish();
///
/// Ok(())
/// # }
/// ```
pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
let operation_bytes = {
let metric = observe::MetricValue::OperationBytes(0);
register_u64_histogram_meter(
meter,
"opendal.operation.bytes",
metric,
self.bytes_boundaries.clone(),
)
};
let operation_bytes_rate = {
let metric = observe::MetricValue::OperationBytesRate(0.0);
register_f64_histogram_meter(
meter,
"opendal.operation.bytes_rate",
metric,
self.bytes_rate_boundaries.clone(),
)
};
let operation_entries = {
let metric = observe::MetricValue::OperationEntries(0);
register_u64_histogram_meter(
meter,
"opendal.operation.entries",
metric,
self.entries_boundaries.clone(),
)
};
let operation_entries_rate = {
let metric = observe::MetricValue::OperationEntriesRate(0.0);
register_f64_histogram_meter(
meter,
"opendal.operation.entries_rate",
metric,
self.entries_rate_boundaries.clone(),
)
};
let operation_duration_seconds = {
let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
register_f64_histogram_meter(
meter,
"opendal.operation.duration",
metric,
self.duration_seconds_boundaries.clone(),
)
};
let operation_errors_total = {
let metric = observe::MetricValue::OperationErrorsTotal;
meter
.u64_counter("opendal.operation.errors")
.with_description(metric.help())
.build()
};
let operation_executing = {
let metric = observe::MetricValue::OperationExecuting(0);
meter
.i64_up_down_counter("opendal.operation.executing")
.with_description(metric.help())
.build()
};
let operation_ttfb_seconds = {
let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
register_f64_histogram_meter(
meter,
"opendal.operation.ttfb",
metric,
self.duration_seconds_boundaries.clone(),
)
};
let http_executing = {
let metric = observe::MetricValue::HttpExecuting(0);
meter
.i64_up_down_counter("opendal.http.executing")
.with_description(metric.help())
.build()
};
let http_request_bytes = {
let metric = observe::MetricValue::HttpRequestBytes(0);
register_u64_histogram_meter(
meter,
"opendal.http.request.bytes",
metric,
self.bytes_boundaries.clone(),
)
};
let http_request_bytes_rate = {
let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
register_f64_histogram_meter(
meter,
"opendal.http.request.bytes_rate",
metric,
self.bytes_rate_boundaries.clone(),
)
};
let http_request_duration_seconds = {
let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
register_f64_histogram_meter(
meter,
"opendal.http.request.duration",
metric,
self.duration_seconds_boundaries.clone(),
)
};
let http_response_bytes = {
let metric = observe::MetricValue::HttpResponseBytes(0);
register_u64_histogram_meter(
meter,
"opendal.http.response.bytes",
metric,
self.bytes_boundaries.clone(),
)
};
let http_response_bytes_rate = {
let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
register_f64_histogram_meter(
meter,
"opendal.http.response.bytes_rate",
metric,
self.bytes_rate_boundaries.clone(),
)
};
let http_response_duration_seconds = {
let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
register_f64_histogram_meter(
meter,
"opendal.http.response.duration",
metric,
self.duration_seconds_boundaries.clone(),
)
};
let http_connection_errors_total = {
let metric = observe::MetricValue::HttpConnectionErrorsTotal;
meter
.u64_counter("opendal.http.connection_errors")
.with_description(metric.help())
.build()
};
let http_status_errors_total = {
let metric = observe::MetricValue::HttpStatusErrorsTotal;
meter
.u64_counter("opendal.http.status_errors")
.with_description(metric.help())
.build()
};
OtelMetricsLayer {
interceptor: OtelMetricsInterceptor {
operation_bytes,
operation_bytes_rate,
operation_entries,
operation_entries_rate,
operation_duration_seconds,
operation_errors_total,
operation_executing,
operation_ttfb_seconds,
http_executing,
http_request_bytes,
http_request_bytes_rate,
http_request_duration_seconds,
http_response_bytes,
http_response_bytes_rate,
http_response_duration_seconds,
http_connection_errors_total,
http_status_errors_total,
},
}
}
}
impl<A: Access> Layer<A> for OtelMetricsLayer {
type LayeredAccess = observe::MetricsAccessor<A, OtelMetricsInterceptor>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
}
}
#[derive(Clone, Debug)]
pub struct OtelMetricsInterceptor {
operation_bytes: Histogram<u64>,
operation_bytes_rate: Histogram<f64>,
operation_entries: Histogram<u64>,
operation_entries_rate: Histogram<f64>,
operation_duration_seconds: Histogram<f64>,
operation_errors_total: Counter<u64>,
operation_executing: UpDownCounter<i64>,
operation_ttfb_seconds: Histogram<f64>,
http_executing: UpDownCounter<i64>,
http_request_bytes: Histogram<u64>,
http_request_bytes_rate: Histogram<f64>,
http_request_duration_seconds: Histogram<f64>,
http_response_bytes: Histogram<u64>,
http_response_bytes_rate: Histogram<f64>,
http_response_duration_seconds: Histogram<f64>,
http_connection_errors_total: Counter<u64>,
http_status_errors_total: Counter<u64>,
}
impl observe::MetricsIntercept for OtelMetricsInterceptor {
fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
let attributes = self.create_attributes(labels);
match value {
observe::MetricValue::OperationBytes(v) => self.operation_bytes.record(v, &attributes),
observe::MetricValue::OperationBytesRate(v) => {
self.operation_bytes_rate.record(v, &attributes)
}
observe::MetricValue::OperationEntries(v) => {
self.operation_entries.record(v, &attributes)
}
observe::MetricValue::OperationEntriesRate(v) => {
self.operation_entries_rate.record(v, &attributes)
}
observe::MetricValue::OperationDurationSeconds(v) => self
.operation_duration_seconds
.record(v.as_secs_f64(), &attributes),
observe::MetricValue::OperationErrorsTotal => {
self.operation_errors_total.add(1, &attributes)
}
observe::MetricValue::OperationExecuting(v) => {
self.operation_executing.add(v as i64, &attributes)
}
observe::MetricValue::OperationTtfbSeconds(v) => self
.operation_ttfb_seconds
.record(v.as_secs_f64(), &attributes),
observe::MetricValue::HttpExecuting(v) => {
self.http_executing.add(v as i64, &attributes)
}
observe::MetricValue::HttpRequestBytes(v) => {
self.http_request_bytes.record(v, &attributes)
}
observe::MetricValue::HttpRequestBytesRate(v) => {
self.http_request_bytes_rate.record(v, &attributes)
}
observe::MetricValue::HttpRequestDurationSeconds(v) => self
.http_request_duration_seconds
.record(v.as_secs_f64(), &attributes),
observe::MetricValue::HttpResponseBytes(v) => {
self.http_response_bytes.record(v, &attributes)
}
observe::MetricValue::HttpResponseBytesRate(v) => {
self.http_response_bytes_rate.record(v, &attributes)
}
observe::MetricValue::HttpResponseDurationSeconds(v) => self
.http_response_duration_seconds
.record(v.as_secs_f64(), &attributes),
observe::MetricValue::HttpConnectionErrorsTotal => {
self.http_connection_errors_total.add(1, &attributes)
}
observe::MetricValue::HttpStatusErrorsTotal => {
self.http_status_errors_total.add(1, &attributes)
}
}
}
}
impl OtelMetricsInterceptor {
fn create_attributes(&self, attrs: observe::MetricLabels) -> Vec<KeyValue> {
let mut attributes = Vec::with_capacity(6);
attributes.extend([
KeyValue::new(observe::LABEL_SCHEME, attrs.scheme),
KeyValue::new(observe::LABEL_NAMESPACE, attrs.namespace),
KeyValue::new(observe::LABEL_ROOT, attrs.root),
KeyValue::new(observe::LABEL_OPERATION, attrs.operation),
]);
if let Some(error) = attrs.error {
attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static()));
}
if let Some(status_code) = attrs.status_code {
attributes.push(KeyValue::new(
observe::LABEL_STATUS_CODE,
status_code.as_u16() as i64,
));
}
attributes
}
}
fn register_u64_histogram_meter(
meter: &Meter,
name: &'static str,
metric: observe::MetricValue,
boundaries: Vec<f64>,
) -> Histogram<u64> {
let (_name, unit) = metric.name_with_unit();
let description = metric.help();
let builder = meter
.u64_histogram(name)
.with_description(description)
.with_boundaries(boundaries);
if let Some(unit) = unit {
builder.with_unit(unit).build()
} else {
builder.build()
}
}
fn register_f64_histogram_meter(
meter: &Meter,
name: &'static str,
metric: observe::MetricValue,
boundaries: Vec<f64>,
) -> Histogram<f64> {
let (_name, unit) = metric.name_with_unit();
let description = metric.help();
let builder = meter
.f64_histogram(name)
.with_description(description)
.with_boundaries(boundaries);
if let Some(unit) = unit {
builder.with_unit(unit).build()
} else {
builder.build()
}
}