blob: de35532285df66073ef095ae8894dc9c3d617e45 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from contextlib import contextmanager
import inspect
import tempfile
import unittest
import os
import sys
import warnings
from io import StringIO
from typing import Iterator, cast
from pyspark import SparkConf
from pyspark.errors import PySparkValueError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, arrow_udf, pandas_udf, udf
from pyspark.sql.window import Window
from pyspark.profiler import UDFBasicProfiler
from pyspark.testing.sqlutils import ReusedSQLTestCase
from pyspark.testing.utils import (
have_pandas,
have_pyarrow,
have_flameprof,
pandas_requirement_message,
pyarrow_requirement_message,
)
def _do_computation(spark, *, action=lambda df: df.collect(), use_arrow=False):
@udf("long", useArrow=use_arrow)
def add1(x):
return x + 1
@udf("long", useArrow=use_arrow)
def add2(x):
return x + 2
df = spark.range(10).select(add1("id"), add2("id"), add1("id"), add2(col("id") + 1))
action(df)
class UDFProfilerTests(unittest.TestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
conf = SparkConf().set("spark.python.profile", "true")
self.spark = (
SparkSession.builder.master("local[4]")
.config(conf=conf)
.appName(class_name)
.getOrCreate()
)
self.sc = self.spark.sparkContext
def tearDown(self):
self.spark.stop()
sys.path = self._old_sys_path
def test_udf_profiler(self):
_do_computation(self.spark)
profilers = self.sc.profiler_collector.profilers
self.assertEqual(4, len(profilers))
old_stdout = sys.stdout
try:
sys.stdout = io = StringIO()
self.sc.show_profiles()
finally:
sys.stdout = old_stdout
with tempfile.TemporaryDirectory(prefix="test_udf_profiler") as d:
self.sc.dump_profiles(d)
for i, udf_name in enumerate(["add1", "add2", "add1", "add2"]):
id, profiler, _ = profilers[i]
with self.subTest(id=id, udf_name=udf_name):
stats = profiler.stats()
self.assertTrue(stats is not None)
width, stat_list = stats.get_print_list([])
func_names = [func_name for fname, n, func_name in stat_list]
self.assertTrue(udf_name in func_names)
self.assertTrue(udf_name in io.getvalue())
self.assertTrue("udf_%d.pstats" % id in os.listdir(d))
def test_custom_udf_profiler(self):
class TestCustomProfiler(UDFBasicProfiler):
def show(self, id):
self.result = "Custom formatting"
self.sc.profiler_collector.udf_profiler_cls = TestCustomProfiler
_do_computation(self.spark)
profilers = self.sc.profiler_collector.profilers
self.assertEqual(4, len(profilers))
_, profiler, _ = profilers[0]
self.assertTrue(isinstance(profiler, TestCustomProfiler))
self.sc.show_profiles()
self.assertEqual("Custom formatting", profiler.result)
# Unsupported
def exec_pandas_udf_iter_to_iter(self):
import pandas as pd
@pandas_udf("int")
def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]:
for ser in batch_ser:
yield ser + 1
self.spark.range(10).select(iter_to_iter("id")).collect()
def exec_arrow_udf_iter_to_iter(self):
import pyarrow as pa
@arrow_udf("int")
def iter_to_iter(iter: Iterator[pa.Array]) -> Iterator[pa.Array]:
for s in iter:
yield pa.compute.add(s, 1)
self.spark.range(10).select(iter_to_iter("id")).collect()
# Unsupported
def exec_map(self):
import pandas as pd
def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in pdfs:
yield pdf[pdf.id == 1]
df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v"))
df.mapInPandas(map, schema=df.schema).collect()
@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_unsupported(self):
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
self.exec_pandas_udf_iter_to_iter()
user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Profiling UDFs with iterators input/output is not supported" in str(user_warns[0])
)
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
self.exec_arrow_udf_iter_to_iter()
user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Profiling UDFs with iterators input/output is not supported" in str(user_warns[0])
)
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
self.exec_map()
user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Profiling UDFs with iterators input/output is not supported" in str(user_warns[0])
)
class UDFProfiler2TestsMixin:
@contextmanager
def trap_stdout(self):
old_stdout = sys.stdout
sys.stdout = io = StringIO()
try:
yield io
finally:
sys.stdout = old_stdout
@property
def profile_results(self):
return self.spark._profiler_collector._perf_profile_results
def assert_udf_profile_present(self, udf_id, expected_line_count_prefix):
"""
Assert that the performance profile for a given UDF ID is present and correctly formatted.
This checks the output of `spark.profile.show()` for the specified UDF ID, ensures that
it includes a line matching the expected line count prefix and file name, and optionally
verifies that `spark.profile.render()` produces SVG output when flameprof is available.
"""
with self.trap_stdout() as io:
self.spark.profile.show(udf_id, type="perf")
self.assertIn(f"Profile of UDF<id={udf_id}>", io.getvalue())
self.assertRegex(
io.getvalue(),
f"{expected_line_count_prefix}.*{os.path.basename(inspect.getfile(_do_computation))}",
)
if have_flameprof:
self.assertIn("svg", self.spark.profile.render(udf_id))
def test_perf_profiler_udf(self):
_do_computation(self.spark)
# Without the conf enabled, no profile results are collected.
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
with self.trap_stdout():
self.spark.profile.show(type="perf")
with tempfile.TemporaryDirectory(prefix="test_perf_profiler_udf") as d:
self.spark.profile.dump(d, type="perf")
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=10)
self.assertTrue(f"udf_{id}_perf.pstats" in os.listdir(d))
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_udf_with_arrow(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark, use_arrow=True)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=10)
def test_perf_profiler_udf_multiple_actions(self):
def action(df):
df.collect()
df.show()
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark, action=action)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=20)
def test_perf_profiler_udf_registered(self):
@udf("long")
def add1(x):
return x + 1
self.spark.udf.register("add1", add1)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
self.spark.sql("SELECT id, add1(id) add1 FROM range(10)").collect()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=10)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_pandas_udf(self):
@pandas_udf("long")
def add1(x):
return x + 1
@pandas_udf("long")
def add2(x):
return x + 2
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.range(10, numPartitions=2).select(
add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
)
df.collect()
self.assertEqual(3, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_perf_profiler_arrow_udf(self):
import pyarrow as pa
@arrow_udf("long")
def add1(x):
return pa.compute.add(x, 1)
@arrow_udf("long")
def add2(x):
return pa.compute.add(x, 2)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.range(10, numPartitions=2).select(
add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
)
df.collect()
self.assertEqual(3, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_pandas_udf_iterator_not_supported(self):
import pandas as pd
@pandas_udf("long")
def add1(x):
return x + 1
@pandas_udf("long")
def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iter:
yield s + 2
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.range(10, numPartitions=2).select(
add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
)
df.collect()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_perf_profiler_arrow_udf_iterator_not_supported(self):
import pyarrow as pa
@arrow_udf("long")
def add1(x):
return pa.compute.add(x, 1)
@arrow_udf("long")
def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]:
for s in iter:
yield pa.compute.add(s, 2)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.range(10, numPartitions=2).select(
add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
)
df.collect()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_map_in_pandas_not_supported(self):
df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.mapInPandas(filter_func, df.schema).show()
self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_pandas_udf_window(self):
# WindowInPandasExec
import pandas as pd
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
)
w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.withColumn("mean_v", mean_udf("v").over(w)).show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=5)
@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_perf_profiler_arrow_udf_window(self):
import pyarrow as pa
@arrow_udf("double")
def mean_udf(v: pa.Array) -> float:
return pa.compute.mean(v)
df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
)
w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.withColumn("mean_v", mean_udf("v").over(w)).show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=5)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_aggregate_in_pandas(self):
# AggregateInPandasExec
import pandas as pd
@pandas_udf("double")
def min_udf(v: pd.Series) -> float:
return v.min()
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.createDataFrame(
[(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]
)
df.groupBy(df.name).agg(min_udf(df.age)).show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_perf_profiler_arrow_udf_agg(self):
import pyarrow as pa
@arrow_udf("double")
def min_udf(v: pa.Array) -> float:
return pa.compute.min(v)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.createDataFrame(
[(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]
)
df.groupBy(df.name).agg(min_udf(df.age)).show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_group_apply_in_pandas(self):
# FlatMapGroupsInBatchExec
df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
)
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_cogroup_apply_in_pandas(self):
# FlatMapCoGroupsInBatchExec
import pandas as pd
df1 = self.spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"),
)
df2 = self.spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
)
def asof_join(left, right):
return pd.merge_asof(left, right, on="time", by="id")
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string"
).show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_group_apply_in_arrow(self):
# FlatMapGroupsInBatchExec
import pyarrow.compute as pc
df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
)
def normalize(table):
v = table.column("v")
norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
return table.set_column(1, "v", norm)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.groupby("id").applyInArrow(normalize, schema="id long, v double").show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_cogroup_apply_in_arrow(self):
import pyarrow as pa
df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1"))
df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
def summarize(left, right):
return pa.Table.from_pydict({"left": [left.num_rows], "right": [right.num_rows]})
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
summarize, schema="left long, right long"
).show()
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
def test_perf_profiler_render(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
id = list(self.profile_results.keys())[0]
if have_flameprof:
self.assertIn("svg", self.spark.profile.render(id))
self.assertIn("svg", self.spark.profile.render(id, type="perf"))
self.assertIn("svg", self.spark.profile.render(id, renderer="flameprof"))
with self.assertRaises(PySparkValueError) as pe:
self.spark.profile.render(id, type="unknown")
self.check_error(
exception=pe.exception,
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "type",
"allowed_values": "['perf', 'memory']",
},
)
with self.assertRaises(PySparkValueError) as pe:
self.spark.profile.render(id, type="memory")
self.check_error(
exception=pe.exception,
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "(type, renderer)",
"allowed_values": "[('perf', None), ('perf', 'flameprof')]",
},
)
with self.assertRaises(PySparkValueError) as pe:
self.spark.profile.render(id, renderer="unknown")
self.check_error(
exception=pe.exception,
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "(type, renderer)",
"allowed_values": "[('perf', None), ('perf', 'flameprof')]",
},
)
with self.trap_stdout() as io:
self.spark.profile.show(id, type="perf")
show_value = io.getvalue()
with self.trap_stdout() as io:
self.spark.profile.render(
id, renderer=lambda s: s.sort_stats("time", "cumulative").print_stats()
)
render_value = io.getvalue()
self.assertIn(render_value, show_value)
def test_perf_profiler_clear(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
for id in self.profile_results:
self.spark.profile.clear(id)
self.assertNotIn(id, self.profile_results)
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
self.spark.profile.clear(type="memory")
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
self.spark.profile.clear(type="perf")
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
self.spark.profile.clear()
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
super().setUp()
self.spark._profiler_collector._accumulator._value = None
if __name__ == "__main__":
from pyspark.sql.tests.test_udf_profiler import * # noqa: F401
try:
import xmlrunner # type: ignore
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)