blob: ee0937e257e3166c6185f7691b1c794778135a38 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{Datelike, Timelike};
use datafusion::physical_plan::metrics::{Metric, MetricValue, MetricsSet, Timestamp};
use pyo3::prelude::*;
#[pyclass(from_py_object, frozen, name = "MetricsSet", module = "datafusion")]
#[derive(Debug, Clone)]
pub struct PyMetricsSet {
metrics: MetricsSet,
}
impl PyMetricsSet {
pub fn new(metrics: MetricsSet) -> Self {
Self { metrics }
}
}
#[pymethods]
impl PyMetricsSet {
fn metrics(&self) -> Vec<PyMetric> {
self.metrics
.iter()
.map(|m| PyMetric::new(Arc::clone(m)))
.collect()
}
fn output_rows(&self) -> Option<usize> {
self.metrics.output_rows()
}
fn elapsed_compute(&self) -> Option<usize> {
self.metrics.elapsed_compute()
}
fn spill_count(&self) -> Option<usize> {
self.metrics.spill_count()
}
fn spilled_bytes(&self) -> Option<usize> {
self.metrics.spilled_bytes()
}
fn spilled_rows(&self) -> Option<usize> {
self.metrics.spilled_rows()
}
fn sum_by_name(&self, name: &str) -> Option<usize> {
self.metrics.sum_by_name(name).map(|v| v.as_usize())
}
fn __repr__(&self) -> String {
format!("{}", self.metrics)
}
}
#[pyclass(from_py_object, frozen, name = "Metric", module = "datafusion")]
#[derive(Debug, Clone)]
pub struct PyMetric {
metric: Arc<Metric>,
}
impl PyMetric {
pub fn new(metric: Arc<Metric>) -> Self {
Self { metric }
}
fn timestamp_to_pyobject<'py>(
py: Python<'py>,
ts: &Timestamp,
) -> PyResult<Option<Bound<'py, PyAny>>> {
match ts.value() {
Some(dt) => {
let datetime_mod = py.import("datetime")?;
let datetime_cls = datetime_mod.getattr("datetime")?;
let tz_utc = datetime_mod.getattr("timezone")?.getattr("utc")?;
let result = datetime_cls.call1((
dt.year(),
dt.month(),
dt.day(),
dt.hour(),
dt.minute(),
dt.second(),
dt.timestamp_subsec_micros(),
tz_utc,
))?;
Ok(Some(result))
}
None => Ok(None),
}
}
}
#[pymethods]
impl PyMetric {
#[getter]
fn name(&self) -> String {
self.metric.value().name().to_string()
}
#[getter]
fn value<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyAny>>> {
match self.metric.value() {
MetricValue::OutputRows(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
MetricValue::OutputBytes(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
MetricValue::ElapsedCompute(t) => Ok(Some(t.value().into_pyobject(py)?.into_any())),
MetricValue::SpillCount(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
MetricValue::SpilledBytes(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
MetricValue::SpilledRows(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
MetricValue::CurrentMemoryUsage(g) => Ok(Some(g.value().into_pyobject(py)?.into_any())),
MetricValue::Count { count, .. } => {
Ok(Some(count.value().into_pyobject(py)?.into_any()))
}
MetricValue::Gauge { gauge, .. } => {
Ok(Some(gauge.value().into_pyobject(py)?.into_any()))
}
MetricValue::Time { time, .. } => Ok(Some(time.value().into_pyobject(py)?.into_any())),
MetricValue::StartTimestamp(ts) | MetricValue::EndTimestamp(ts) => {
Self::timestamp_to_pyobject(py, ts)
}
_ => Ok(None),
}
}
#[getter]
fn value_as_datetime<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyAny>>> {
match self.metric.value() {
MetricValue::StartTimestamp(ts) | MetricValue::EndTimestamp(ts) => {
Self::timestamp_to_pyobject(py, ts)
}
_ => Ok(None),
}
}
#[getter]
fn partition(&self) -> Option<usize> {
self.metric.partition()
}
fn labels(&self) -> HashMap<String, String> {
self.metric
.labels()
.iter()
.map(|l| (l.name().to_string(), l.value().to_string()))
.collect()
}
fn __repr__(&self) -> String {
format!("{}", self.metric.value())
}
}