blob: 764fa76efe754fb58d1fe0ebbc318068eb64e6b9 [file]
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
.. _execution_metrics:
Execution Metrics
=================
Overview
--------
When DataFusion executes a query it compiles the logical plan into a tree of
*physical plan operators* (e.g. ``FilterExec``, ``ProjectionExec``,
``HashAggregateExec``). Each operator can record runtime statistics while it
runs. These statistics are called **execution metrics**.
Typical metrics include:
- **output_rows** number of rows produced by the operator
- **elapsed_compute** total CPU time (nanoseconds) spent inside the operator
- **spill_count** number of times the operator spilled data to disk
- **spilled_bytes** total bytes written to disk during spills
- **spilled_rows** total rows written to disk during spills
Metrics are collected *per-partition*: DataFusion may execute each operator
in parallel across several partitions. The convenience properties on
:py:class:`~datafusion.MetricsSet` (e.g. ``output_rows``, ``elapsed_compute``)
automatically sum the named metric across **all** partitions, giving a single
aggregate value for the operator as a whole. You can also access the raw
per-partition :py:class:`~datafusion.Metric` objects via
:py:meth:`~datafusion.MetricsSet.metrics`.
When Are Metrics Available?
---------------------------
Some operators (for example ``DataSourceExec``) eagerly create a
:py:class:`~datafusion.MetricsSet` when the physical plan is built, so
:py:meth:`~datafusion.ExecutionPlan.metrics` may return a set even before any
rows have been processed. However, metric **values** such as ``output_rows``
are only meaningful **after** the DataFrame has been executed via one of the
terminal operations:
- :py:meth:`~datafusion.DataFrame.collect`
- :py:meth:`~datafusion.DataFrame.collect_partitioned`
- :py:meth:`~datafusion.DataFrame.execute_stream`
(metrics are available once the stream has been fully consumed)
- :py:meth:`~datafusion.DataFrame.execute_stream_partitioned`
(metrics are available once all partition streams have been fully consumed)
Before execution, metric values will be ``0`` or ``None``.
.. note::
**display() does not populate metrics.**
When a DataFrame is displayed in a notebook (e.g. via ``display(df)`` or
automatic ``repr`` output), DataFusion runs a *limited* internal execution
to fetch preview rows. This internal execution does **not** cache the
physical plan used, so :py:meth:`~datafusion.ExecutionPlan.collect_metrics`
will not reflect the display execution. To access metrics you must call
one of the terminal operations listed above.
If you call :py:meth:`~datafusion.DataFrame.collect` (or another terminal
operation) multiple times on the same DataFrame, each call creates a fresh
physical plan. Metrics from :py:meth:`~datafusion.DataFrame.execution_plan`
always reflect the **most recent** execution.
Reading the Physical Plan Tree
--------------------------------
:py:meth:`~datafusion.DataFrame.execution_plan` returns the root
:py:class:`~datafusion.ExecutionPlan` node of the physical plan tree. The tree
mirrors the operator pipeline: the root is typically a projection or
coalescing node; its children are filters, aggregates, scans, etc.
The ``operator_name`` string returned by
:py:meth:`~datafusion.ExecutionPlan.collect_metrics` is the *display* name of
the node, for example ``"FilterExec: column1@0 > 1"``. This is the same string
you would see when calling ``plan.display()``.
Aggregated vs Per-Partition Metrics
------------------------------------
DataFusion executes each operator across one or more **partitions** in
parallel. The :py:class:`~datafusion.MetricsSet` convenience properties
(``output_rows``, ``elapsed_compute``, etc.) automatically **sum** the named
metric across all partitions, giving a single aggregate value.
To inspect individual partitions for example to detect data skew where one
partition processes far more rows than others iterate over the raw
:py:class:`~datafusion.Metric` objects:
.. code-block:: python
for metric in metrics_set.metrics():
print(f" partition={metric.partition} {metric.name}={metric.value}")
The ``partition`` property is a 0-based index (``0``, ``1``, …) identifying
which parallel slot processed this metric. It is ``None`` for metrics that
apply globally (not tied to a specific partition).
Available Metrics
-----------------
The following metrics are directly accessible as properties on
:py:class:`~datafusion.MetricsSet`:
.. list-table::
:header-rows: 1
:widths: 25 75
* - Property
- Description
* - ``output_rows``
- Number of rows emitted by the operator (summed across partitions).
* - ``elapsed_compute``
- Wall-clock CPU time **in nanoseconds** spent inside the operator's
compute loop, excluding I/O wait. Useful for identifying which
operators are most expensive (summed across partitions).
* - ``spill_count``
- Number of spill-to-disk events triggered by memory pressure. This is
a unitless count of events, not a measure of data volume (summed across
partitions).
* - ``spilled_bytes``
- Total bytes written to disk during spill events (summed across
partitions).
* - ``spilled_rows``
- Total rows written to disk during spill events (summed across
partitions).
Any metric not listed above can be accessed via
:py:meth:`~datafusion.MetricsSet.sum_by_name`, or by iterating over the raw
:py:class:`~datafusion.Metric` objects returned by
:py:meth:`~datafusion.MetricsSet.metrics`.
Labels
------
A :py:class:`~datafusion.Metric` may carry *labels*: key/value pairs that
provide additional context. Labels are operator-specific; most metrics have
an empty label dict.
Some operators tag their metrics with labels to distinguish variants. For
example, a ``HashAggregateExec`` may record separate ``output_rows`` metrics
for intermediate and final output:
.. code-block:: python
for metric in metrics_set.metrics():
print(metric.name, metric.labels())
# output_rows {'output_type': 'final'}
# output_rows {'output_type': 'intermediate'}
When summing by name (via :py:attr:`~datafusion.MetricsSet.output_rows` or
:py:meth:`~datafusion.MetricsSet.sum_by_name`), **all** metrics with that
name are summed regardless of labels. To filter by label, iterate over the
raw :py:class:`~datafusion.Metric` objects directly.
End-to-End Example
------------------
.. code-block:: python
from datafusion import SessionContext
ctx = SessionContext()
ctx.sql("CREATE TABLE sales AS VALUES (1, 100), (2, 200), (3, 50)")
df = ctx.sql("SELECT * FROM sales WHERE column1 > 1")
# Execute the query — this populates the metrics
results = df.collect()
# Retrieve the physical plan with metrics
plan = df.execution_plan()
# Walk every operator and print its metrics
for operator_name, ms in plan.collect_metrics():
if ms.output_rows is not None:
print(f"{operator_name}")
print(f" output_rows = {ms.output_rows}")
print(f" elapsed_compute = {ms.elapsed_compute} ns")
# Access raw per-partition metrics
for operator_name, ms in plan.collect_metrics():
for metric in ms.metrics():
print(
f" partition={metric.partition} "
f"{metric.name}={metric.value} "
f"labels={metric.labels()}"
)
API Reference
-------------
- :py:class:`datafusion.ExecutionPlan` — physical plan node
- :py:meth:`datafusion.ExecutionPlan.collect_metrics` — walk the tree and
return ``(operator_name, MetricsSet)`` pairs
- :py:meth:`datafusion.ExecutionPlan.metrics` — return the
:py:class:`~datafusion.MetricsSet` for a single node
- :py:class:`datafusion.MetricsSet` — aggregated metrics for one operator
- :py:class:`datafusion.Metric` — a single per-partition metric value