ARROW-12402: [Rust] [DataFusion] Implement SQL metrics example
This introduces a new method on `ExecutionPlan` to be able to access generic metrics from any physical operator.
One metric is implemented to demonstrate usage.
Closes #10049 from andygrove/ARROW-12402
Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index 1a4cb17..b78e8bc 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -18,7 +18,7 @@
//! Defines the execution plan for the hash aggregate operation
use std::any::Any;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use ahash::RandomState;
@@ -28,7 +28,7 @@
};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Accumulator, AggregateExpr};
+use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr};
use arrow::{
@@ -94,6 +94,8 @@
/// same as input.schema() but for the final aggregate it will be the same as the input
/// to the partial aggregate
input_schema: SchemaRef,
+ /// Metric to track number of output rows
+ output_rows: Arc<Mutex<SQLMetric>>,
}
fn create_schema(
@@ -142,6 +144,11 @@
let schema = Arc::new(schema);
+ let output_rows = Arc::new(Mutex::new(SQLMetric::new(
+ "outputRows",
+ MetricType::Counter,
+ )));
+
Ok(HashAggregateExec {
mode,
group_expr,
@@ -149,6 +156,7 @@
input,
schema,
input_schema,
+ output_rows,
})
}
@@ -223,6 +231,7 @@
group_expr,
self.aggr_expr.clone(),
input,
+ self.output_rows.clone(),
)))
}
}
@@ -244,6 +253,15 @@
)),
}
}
+
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ let mut metrics = HashMap::new();
+ metrics.insert(
+ "outputRows".to_owned(),
+ self.output_rows.lock().unwrap().clone(),
+ );
+ metrics
+ }
}
/*
@@ -277,6 +295,7 @@
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
+ output_rows: Arc<Mutex<SQLMetric>>,
}
}
@@ -628,6 +647,7 @@
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
+ output_rows: Arc<Mutex<SQLMetric>>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
@@ -648,6 +668,7 @@
schema,
output: rx,
finished: false,
+ output_rows,
}
}
}
@@ -667,6 +688,8 @@
return Poll::Ready(None);
}
+ let output_rows = self.output_rows.clone();
+
// is the output ready?
let this = self.project();
let output_poll = this.output.poll(cx);
@@ -680,6 +703,12 @@
Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
Ok(result) => result,
};
+
+ if let Ok(batch) = &result {
+ let mut output_rows = output_rows.lock().unwrap();
+ output_rows.add(batch.num_rows())
+ }
+
Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
@@ -1255,6 +1284,11 @@
];
assert_batches_sorted_eq!(&expected, &result);
+
+ let metrics = merged_aggregate.metrics();
+ let output_rows = metrics.get("outputRows").unwrap();
+ assert_eq!(3, output_rows.value());
+
Ok(())
}
diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs
index d529e98..054d585 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -33,6 +33,7 @@
use futures::stream::Stream;
use self::merge::MergeExec;
+use hashbrown::HashMap;
/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
@@ -46,6 +47,46 @@
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;
+/// SQL metric type
+#[derive(Debug, Clone)]
+pub enum MetricType {
+ /// Simple counter
+ Counter,
+}
+
+/// SQL metric such as counter (number of input or output rows) or timing information about
+/// a physical operator.
+#[derive(Debug, Clone)]
+pub struct SQLMetric {
+ /// Metric name
+ name: String,
+ /// Metric value
+ value: usize,
+ /// Metric type
+ metric_type: MetricType,
+}
+
+impl SQLMetric {
+ /// Create a new SQLMetric
+ pub fn new(name: &str, metric_type: MetricType) -> Self {
+ Self {
+ name: name.to_owned(),
+ value: 0,
+ metric_type,
+ }
+ }
+
+ /// Add to the value
+ pub fn add(&mut self, n: usize) {
+ self.value += n;
+ }
+
+ /// Get the current value
+ pub fn value(&self) -> usize {
+ self.value
+ }
+}
+
/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
@@ -84,6 +125,11 @@
/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
+
+ /// Return a snapshot of the metrics collected during execution
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ HashMap::new()
+ }
}
/// Execute the [ExecutionPlan] and collect the results in memory