blob: a7998a8ecb98b6e6f5e36aced069ff99143b4b23 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::config::MetricsConfig;
use crate::runtime::manager::RuntimeManager;
use log::{error, info};
use once_cell::sync::Lazy;
use prometheus::{
histogram_opts, labels, register_histogram_vec_with_registry, register_int_gauge_vec,
Histogram, HistogramOpts, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Registry,
};
use std::time::Duration;
const DEFAULT_BUCKETS: &[f64; 16] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 40.0, 60.0, 80.0, 100.0,
];
pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
pub static TOTAL_RECEIVED_DATA: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_received_data", "Incoming Requests").expect("metric should be created")
});
pub static TOTAL_READ_DATA: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_read_data", "Reading Data").expect("metric should be created")
});
pub static TOTAL_READ_DATA_FROM_MEMORY: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_read_data_from_memory", "Reading Data from memory")
.expect("metric should be created")
});
pub static TOTAL_READ_DATA_FROM_LOCALFILE: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new(
"total_read_data_from_localfile",
"Reading Data from localfile",
)
.expect("metric should be created")
});
pub static GRPC_GET_MEMORY_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_get_memory_data_transport_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_GET_MEMORY_DATA_PROCESS_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_get_memory_data_process_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_GET_LOCALFILE_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_get_localfile_data_transport_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_GET_LOCALFILE_DATA_PROCESS_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_get_localfile_data_process_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_SEND_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_send_data_transport_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_SEND_DATA_PROCESS_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_send_data_process_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_BUFFER_REQUIRE_PROCESS_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_buffer_require_process_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
let histogram = Histogram::with_opts(opts).unwrap();
histogram
});
pub static GRPC_LATENCY_TIME_SEC: Lazy<HistogramVec> = Lazy::new(|| {
let opts = histogram_opts!(
"grpc_duration_seconds",
"gRPC latency",
Vec::from(DEFAULT_BUCKETS as &'static [f64])
);
let grpc_latency = register_histogram_vec_with_registry!(opts, &["path"], REGISTRY).unwrap();
grpc_latency
});
pub static TOTAL_MEMORY_USED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_memory_used", "Total memory used").expect("metric should be created")
});
pub static TOTAL_LOCALFILE_USED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_localfile_used", "Total localfile used")
.expect("metric should be created")
});
pub static TOTAL_HDFS_USED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_hdfs_used", "Total hdfs used").expect("metric should be created")
});
pub static GAUGE_MEMORY_USED: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("memory_used", "memory used").expect("metric should be created"));
pub static GAUGE_MEMORY_ALLOCATED: Lazy<IntGauge> = Lazy::new(|| {
IntGauge::new("memory_allocated", "memory allocated").expect("metric should be created")
});
pub static GAUGE_MEMORY_CAPACITY: Lazy<IntGauge> = Lazy::new(|| {
IntGauge::new("memory_capacity", "memory capacity").expect("metric should be created")
});
pub static TOTAL_MEMORY_SPILL_OPERATION: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_memory_spill", "memory capacity").expect("metric should be created")
});
pub static TOTAL_MEMORY_SPILL_OPERATION_FAILED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_memory_spill_failed", "memory capacity")
.expect("metric should be created")
});
pub static TOTAL_MEMORY_SPILL_TO_LOCALFILE: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new(
"total_memory_spill_to_localfile",
"memory spill to localfile",
)
.expect("metric should be created")
});
pub static TOTAL_MEMORY_SPILL_TO_HDFS: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_memory_spill_to_hdfs", "memory spill to hdfs")
.expect("metric should be created")
});
pub static GAUGE_MEMORY_SPILL_OPERATION: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("memory_spill", "memory spill").expect("metric should be created"));
pub static GAUGE_MEMORY_SPILL_TO_LOCALFILE: Lazy<IntGauge> = Lazy::new(|| {
IntGauge::new("memory_spill_to_localfile", "memory spill to localfile")
.expect("metric should be created")
});
pub static GAUGE_MEMORY_SPILL_TO_HDFS: Lazy<IntGauge> = Lazy::new(|| {
IntGauge::new("memory_spill_to_hdfs", "memory spill to hdfs").expect("metric should be created")
});
pub static TOTAL_APP_NUMBER: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_app_number", "total_app_number").expect("metrics should be created")
});
pub static TOTAL_PARTITION_NUMBER: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_partition_number", "total_partition_number")
.expect("metrics should be created")
});
pub static GAUGE_APP_NUMBER: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("app_number", "app_number").expect("metrics should be created"));
pub static GAUGE_PARTITION_NUMBER: Lazy<IntGauge> = Lazy::new(|| {
IntGauge::new("partition_number", "partition_number").expect("metrics should be created")
});
pub static TOTAL_REQUIRE_BUFFER_FAILED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_require_buffer_failed", "total_require_buffer_failed")
.expect("metrics should be created")
});
pub static TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new(
"total_huge_partition_require_buffer_failed",
"total_huge_partition_require_buffer_failed",
)
.expect("metrics should be created")
});
pub static GAUGE_LOCAL_DISK_CAPACITY: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"local_disk_capacity",
"local disk capacity for root path",
&["root"]
)
.unwrap()
});
pub static GAUGE_LOCAL_DISK_USED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"local_disk_used",
"local disk used for root path",
&["root"]
)
.unwrap()
});
pub static GAUGE_LOCAL_DISK_IS_HEALTHY: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"local_disk_is_healthy",
"local disk is_healthy for root path",
&["root"]
)
.unwrap()
});
pub static GAUGE_RUNTIME_ALIVE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"runtime_thread_alive_gauge",
"alive thread number for runtime",
&["name"]
)
.unwrap()
});
pub static GAUGE_RUNTIME_IDLE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"runtime_thread_idle_gauge",
"idle thread number for runtime",
&["name"]
)
.unwrap()
});
pub static GAUGE_TOPN_APP_RESIDENT_DATA_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"topN_app_resident_data_size",
"topN app resident data size",
&["app_id"]
)
.unwrap()
});
pub static GAUGE_IN_SPILL_DATA_SIZE: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("in_spill_data_size", "total data size in spill").unwrap());
pub static TOTAL_GRPC_REQUEST: Lazy<IntCounter> =
Lazy::new(|| IntCounter::new("total_grpc_request_number", "total request number").expect(""));
pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("grpc_request_number", "current grpc request queue size").unwrap());
pub static TOTAL_SPILL_EVENTS_DROPPED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new(
"total_spill_events_dropped",
"total spill events dropped number",
)
.expect("")
});
fn register_custom_metrics() {
REGISTRY
.register(Box::new(TOTAL_GRPC_REQUEST.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_GRPC_REQUEST_QUEUE_SIZE.clone()))
.expect("");
REGISTRY
.register(Box::new(TOTAL_SPILL_EVENTS_DROPPED.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
.expect("");
REGISTRY
.register(Box::new(TOTAL_READ_DATA_FROM_LOCALFILE.clone()))
.expect("total_read_data must be registered");
REGISTRY
.register(Box::new(TOTAL_READ_DATA_FROM_MEMORY.clone()))
.expect("total_read_data must be registered");
REGISTRY
.register(Box::new(GAUGE_IN_SPILL_DATA_SIZE.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_LOCAL_DISK_CAPACITY.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_LOCAL_DISK_USED.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_LOCAL_DISK_IS_HEALTHY.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_RUNTIME_ALIVE_THREAD_NUM.clone()))
.expect("");
REGISTRY
.register(Box::new(GAUGE_RUNTIME_IDLE_THREAD_NUM.clone()))
.expect("");
REGISTRY
.register(Box::new(TOTAL_RECEIVED_DATA.clone()))
.expect("total_received_data must be registered");
REGISTRY
.register(Box::new(TOTAL_READ_DATA.clone()))
.expect("total_read_data must be registered");
REGISTRY
.register(Box::new(TOTAL_MEMORY_USED.clone()))
.expect("total_memory_used must be registered");
REGISTRY
.register(Box::new(TOTAL_LOCALFILE_USED.clone()))
.expect("total_localfile_used must be registered");
REGISTRY
.register(Box::new(TOTAL_HDFS_USED.clone()))
.expect("total_hdfs_used must be registered");
REGISTRY
.register(Box::new(TOTAL_MEMORY_SPILL_OPERATION.clone()))
.expect("total_memory_spill_operation must be registered");
REGISTRY
.register(Box::new(TOTAL_MEMORY_SPILL_OPERATION_FAILED.clone()))
.expect("total_memory_spill_operation_failed must be registered");
REGISTRY
.register(Box::new(TOTAL_APP_NUMBER.clone()))
.expect("total_app_number must be registered");
REGISTRY
.register(Box::new(TOTAL_PARTITION_NUMBER.clone()))
.expect("total_partition_number must be registered");
REGISTRY
.register(Box::new(TOTAL_REQUIRE_BUFFER_FAILED.clone()))
.expect("total_require_buffer_failed must be registered");
REGISTRY
.register(Box::new(TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED.clone()))
.expect("total_huge_partition_require_buffer_failed must be registered");
REGISTRY
.register(Box::new(TOTAL_MEMORY_SPILL_TO_LOCALFILE.clone()))
.expect("total_memory_spill_to_localfile must be registered");
REGISTRY
.register(Box::new(TOTAL_MEMORY_SPILL_TO_HDFS.clone()))
.expect("total_memory_spill_to_hdfs must be registered");
REGISTRY
.register(Box::new(GAUGE_MEMORY_USED.clone()))
.expect("memory_used must be registered");
REGISTRY
.register(Box::new(GAUGE_MEMORY_ALLOCATED.clone()))
.expect("memory_allocated must be registered");
REGISTRY
.register(Box::new(GAUGE_MEMORY_CAPACITY.clone()))
.expect("memory_capacity must be registered");
REGISTRY
.register(Box::new(GAUGE_APP_NUMBER.clone()))
.expect("app_number must be registered");
REGISTRY
.register(Box::new(GAUGE_PARTITION_NUMBER.clone()))
.expect("partition_number must be registered");
REGISTRY
.register(Box::new(GAUGE_MEMORY_SPILL_OPERATION.clone()))
.expect("memory_spill_operation must be registered");
REGISTRY
.register(Box::new(GAUGE_MEMORY_SPILL_TO_LOCALFILE.clone()))
.expect("memory_spill_to_localfile must be registered");
REGISTRY
.register(Box::new(GAUGE_MEMORY_SPILL_TO_HDFS.clone()))
.expect("memory_spill_to_hdfs must be registered");
REGISTRY
.register(Box::new(GRPC_BUFFER_REQUIRE_PROCESS_TIME.clone()))
.expect("grpc_buffer_require_process_time must be registered");
REGISTRY
.register(Box::new(GRPC_SEND_DATA_TRANSPORT_TIME.clone()))
.expect("grpc_send_data_transport_time must be registered");
REGISTRY
.register(Box::new(GRPC_SEND_DATA_PROCESS_TIME.clone()))
.expect("grpc_send_data_process_time must be registered");
REGISTRY
.register(Box::new(GRPC_GET_MEMORY_DATA_PROCESS_TIME.clone()))
.expect("grpc_get_memory_data_process_time must be registered");
REGISTRY
.register(Box::new(GRPC_GET_LOCALFILE_DATA_TRANSPORT_TIME.clone()))
.expect("grpc_get_localfile_data_transport_time must be registered");
REGISTRY
.register(Box::new(GRPC_GET_LOCALFILE_DATA_PROCESS_TIME.clone()))
.expect("grpc_get_localfile_data_process_time must be registered");
REGISTRY
.register(Box::new(GRPC_GET_MEMORY_DATA_TRANSPORT_TIME.clone()))
.expect("grpc_get_memory_data_transport_time must be registered");
}
pub fn init_metric_service(
runtime_manager: RuntimeManager,
metric_config: &Option<MetricsConfig>,
worker_uid: String,
) -> bool {
if metric_config.is_none() {
return false;
}
register_custom_metrics();
let job_name = "uniffle-worker";
let cfg = metric_config.clone().unwrap();
let push_gateway_endpoint = cfg.push_gateway_endpoint;
if let Some(ref _endpoint) = push_gateway_endpoint {
let push_interval_sec = cfg.push_interval_sec.unwrap_or(60);
runtime_manager.default_runtime.spawn(async move {
info!("Starting prometheus metrics exporter...");
loop {
tokio::time::sleep(Duration::from_secs(push_interval_sec as u64)).await;
let general_metrics = prometheus::gather();
let custom_metrics = REGISTRY.gather();
let mut metrics = vec![];
metrics.extend_from_slice(&custom_metrics);
metrics.extend_from_slice(&general_metrics);
let pushed_result = prometheus::push_metrics(
job_name,
labels! {"worker_id".to_owned() => worker_uid.to_owned(),},
&push_gateway_endpoint.to_owned().unwrap().to_owned(),
metrics,
None,
);
if pushed_result.is_err() {
error!("Errors on pushing metrics. {:?}", pushed_result.err());
}
}
});
}
return true;
}