ARROW-12402: [Rust] [DataFusion] Implement SQL metrics example

This introduces a new method on `ExecutionPlan` to be able to access generic metrics from any physical operator.

One metric is implemented to demonstrate usage.

Closes #10049 from andygrove/ARROW-12402

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index 1a4cb17..b78e8bc 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -18,7 +18,7 @@
 //! Defines the execution plan for the hash aggregate operation
 
 use std::any::Any;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use ahash::RandomState;
@@ -28,7 +28,7 @@
 };
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Accumulator, AggregateExpr};
+use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
 use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr};
 
 use arrow::{
@@ -94,6 +94,8 @@
     /// same as input.schema() but for the final aggregate it will be the same as the input
     /// to the partial aggregate
     input_schema: SchemaRef,
+    /// Metric to track number of output rows
+    output_rows: Arc<Mutex<SQLMetric>>,
 }
 
 fn create_schema(
@@ -142,6 +144,11 @@
 
         let schema = Arc::new(schema);
 
+        let output_rows = Arc::new(Mutex::new(SQLMetric::new(
+            "outputRows",
+            MetricType::Counter,
+        )));
+
         Ok(HashAggregateExec {
             mode,
             group_expr,
@@ -149,6 +156,7 @@
             input,
             schema,
             input_schema,
+            output_rows,
         })
     }
 
@@ -223,6 +231,7 @@
                 group_expr,
                 self.aggr_expr.clone(),
                 input,
+                self.output_rows.clone(),
             )))
         }
     }
@@ -244,6 +253,15 @@
             )),
         }
     }
+
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        let mut metrics = HashMap::new();
+        metrics.insert(
+            "outputRows".to_owned(),
+            self.output_rows.lock().unwrap().clone(),
+        );
+        metrics
+    }
 }
 
 /*
@@ -277,6 +295,7 @@
         #[pin]
         output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
         finished: bool,
+        output_rows: Arc<Mutex<SQLMetric>>,
     }
 }
 
@@ -628,6 +647,7 @@
         group_expr: Vec<Arc<dyn PhysicalExpr>>,
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         input: SendableRecordBatchStream,
+        output_rows: Arc<Mutex<SQLMetric>>,
     ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
 
@@ -648,6 +668,7 @@
             schema,
             output: rx,
             finished: false,
+            output_rows,
         }
     }
 }
@@ -667,6 +688,8 @@
             return Poll::Ready(None);
         }
 
+        let output_rows = self.output_rows.clone();
+
         // is the output ready?
         let this = self.project();
         let output_poll = this.output.poll(cx);
@@ -680,6 +703,12 @@
                     Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
                     Ok(result) => result,
                 };
+
+                if let Ok(batch) = &result {
+                    let mut output_rows = output_rows.lock().unwrap();
+                    output_rows.add(batch.num_rows())
+                }
+
                 Poll::Ready(Some(result))
             }
             Poll::Pending => Poll::Pending,
@@ -1255,6 +1284,11 @@
         ];
 
         assert_batches_sorted_eq!(&expected, &result);
+
+        let metrics = merged_aggregate.metrics();
+        let output_rows = metrics.get("outputRows").unwrap();
+        assert_eq!(3, output_rows.value());
+
         Ok(())
     }
 
diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs
index d529e98..054d585 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -33,6 +33,7 @@
 use futures::stream::Stream;
 
 use self::merge::MergeExec;
+use hashbrown::HashMap;
 
 /// Trait for types that stream [arrow::record_batch::RecordBatch]
 pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
@@ -46,6 +47,46 @@
 /// Trait for a stream of record batches.
 pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;
 
+/// SQL metric type
+#[derive(Debug, Clone)]
+pub enum MetricType {
+    /// Simple counter
+    Counter,
+}
+
+/// SQL metric such as counter (number of input or output rows) or timing information about
+/// a physical operator.
+#[derive(Debug, Clone)]
+pub struct SQLMetric {
+    /// Metric name
+    name: String,
+    /// Metric value
+    value: usize,
+    /// Metric type
+    metric_type: MetricType,
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(name: &str, metric_type: MetricType) -> Self {
+        Self {
+            name: name.to_owned(),
+            value: 0,
+            metric_type,
+        }
+    }
+
+    /// Add to the value
+    pub fn add(&mut self, n: usize) {
+        self.value += n;
+    }
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value
+    }
+}
+
 /// Physical query planner that converts a `LogicalPlan` to an
 /// `ExecutionPlan` suitable for execution.
 pub trait PhysicalPlanner {
@@ -84,6 +125,11 @@
 
     /// creates an iterator
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
+
+    /// Return a snapshot of the metrics collected during execution
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        HashMap::new()
+    }
 }
 
 /// Execute the [ExecutionPlan] and collect the results in memory