blob: b788f90bec26374980470317c3cb9828842ed964 [file]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Benchmark: Arrow columnar vs row-based input for scalar Arrow Python UDFs.
Compares end-to-end execution time of applying a scalar pandas_udf to data
from two sources:
1. ArrowBackedDataSourceV2 -- produces ColumnarBatch with ArrowColumnVector.
ArrowEvalPythonExec extracts Arrow FieldVectors directly via the columnar
path (no ColumnarToRow conversion).
2. spark.range() -- produces row-based data.
ArrowEvalPythonExec uses the standard path: InternalRow -> ArrowWriter.
The UDF does minimal computation (addition) so the benchmark isolates
the data transfer overhead between JVM and Python.
Usage:
cd $SPARK_HOME
python python/pyspark/sql/tests/pandas/bench_arrow_columnar_udf.py \
[--rows N] [--iterations N] [--partitions N]
"""
import argparse
import sys
import os
import time
# Allow running from the Spark root directory.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../../../.."))
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, col
ARROW_SOURCE = "org.apache.spark.sql.execution.python.ArrowBackedDataSourceV2"
def create_spark():
return (
SparkSession.builder.master("local[1]")
.appName("ArrowColumnarUDFBenchmark")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.python.worker.reuse", "true")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
)
def timed_run(df, warmup=2, iterations=5):
"""Run a DataFrame with noop sink, returning per-iteration timings."""
for _ in range(warmup):
df.write.format("noop").mode("overwrite").save()
times = []
for _ in range(iterations):
start = time.perf_counter()
df.write.format("noop").mode("overwrite").save()
elapsed = time.perf_counter() - start
times.append(elapsed)
return times
def print_stats(label, times):
avg = sum(times) / len(times)
mn = min(times)
mx = max(times)
print(f" {label}")
print(
f" avg = {avg * 1000:8.1f} ms "
f"min = {mn * 1000:8.1f} ms "
f"max = {mx * 1000:8.1f} ms "
f"({len(times)} iterations)"
)
return avg
def main():
parser = argparse.ArgumentParser(
description="Benchmark Arrow columnar vs row-based Python UDF input"
)
parser.add_argument(
"--rows", type=int, default=500_000, help="Number of rows (default: 500000)"
)
parser.add_argument("--iterations", type=int, default=5, help="Timed iterations (default: 5)")
parser.add_argument(
"--partitions", type=int, default=1, help="Number of partitions (default: 1)"
)
args = parser.parse_args()
spark = create_spark()
print("=" * 70)
print("Arrow Columnar vs Row-Based Input for Scalar Arrow Python UDF")
print("=" * 70)
print(f" rows={args.rows} partitions={args.partitions} iterations={args.iterations}")
print()
# Identity UDF -- returns input as-is to minimize Python-side cost
# and isolate JVM data transfer overhead.
@pandas_udf("string")
def identity_udf(data: pd.Series) -> pd.Series:
return data
conf_key = "spark.sql.execution.arrow.pythonUDF.columnarInput.enabled"
def make_df():
return (
spark.read.format(ARROW_SOURCE)
.option("numRows", str(args.rows))
.option("numPartitions", str(args.partitions))
.load()
.select(col("id"), col("name"), col("value"), col("data"), identity_udf(col("data")))
)
# ----- Benchmark 1: Arrow columnar (config=true) -----
spark.conf.set(conf_key, "true")
arrow_df = make_df()
print("--- Physical Plan: Arrow columnar source ---")
arrow_df.explain()
print()
print("--- Results ---")
print()
arrow_times = timed_run(arrow_df, iterations=args.iterations)
arrow_avg = print_stats("Arrow columnar (direct FieldVector extraction)", arrow_times)
print()
# ----- Benchmark 2: Row-based with ColumnarToRow (config=false) -----
spark.conf.set(conf_key, "false")
row_df = make_df()
print("--- Physical Plan: Row-based (ColumnarToRow) source ---")
row_df.explain()
print()
row_times = timed_run(row_df, iterations=args.iterations)
row_avg = print_stats("Row-based (ColumnarToRow + ArrowWriter)", row_times)
spark.conf.set(conf_key, "true")
print()
if arrow_avg > 0:
speedup = row_avg / arrow_avg
faster = "faster" if speedup > 1.0 else "slower"
print(f" Speedup: {speedup:.2f}x ({faster} with Arrow columnar)")
print()
spark.stop()
if __name__ == "__main__":
main()