blob: f7b391ec8000f5e6cec1709af48ff987ce73da42 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABC, abstractmethod
from io import StringIO
import os
import pstats
from threading import RLock
from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union, TYPE_CHECKING, overload
import warnings
from pyspark.accumulators import (
Accumulator,
AccumulatorParam,
SpecialAccumulatorIds,
_accumulatorRegistry,
)
from pyspark.errors import PySparkValueError
from pyspark.profiler import (
CodeMapDict,
MemoryProfiler,
MemUsageParam,
PStatsParam,
has_memory_profiler,
)
if TYPE_CHECKING:
from pyspark.sql._typing import ProfileResults
class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]):
"""
AccumulatorParam for profilers.
"""
@staticmethod
def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]:
return value
@staticmethod
def addInPlace(
value1: Optional["ProfileResults"], value2: Optional["ProfileResults"]
) -> Optional["ProfileResults"]:
if value1 is None or len(value1) == 0:
value1 = {}
if value2 is None or len(value2) == 0:
value2 = {}
value = value1.copy()
for key, (perf, mem, *_) in value2.items():
if key in value1:
orig_perf, orig_mem, *_ = value1[key]
else:
orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None))
value[key] = (
PStatsParam.addInPlace(orig_perf, perf),
MemUsageParam.addInPlace(orig_mem, mem),
)
return value
ProfileResultsParam = _ProfileResultsParam()
class ProfilerCollector(ABC):
"""
A base class of profiler collectors for session based profilers.
This supports cProfiler and memory-profiler enabled by setting a SQL config
`spark.sql.pyspark.udf.profiler` to "perf" or "memory".
"""
def __init__(self) -> None:
self._lock = RLock()
def show_perf_profiles(self, id: Optional[int] = None) -> None:
"""
Show the perf profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
stats = self._perf_profile_results
def show(id: int) -> None:
s = stats.get(id)
if s is not None:
print("=" * 60)
print(f"Profile of UDF<id={id}>")
print("=" * 60)
s.sort_stats("time", "cumulative").print_stats()
if id is not None:
show(id)
else:
for id in sorted(stats.keys()):
show(id)
@property
def _perf_profile_results(self) -> Dict[int, pstats.Stats]:
with self._lock:
return {
result_id: perf
for result_id, (perf, _, *_) in self._profile_results.items()
if perf is not None
}
def show_memory_profiles(self, id: Optional[int] = None) -> None:
"""
Show the memory profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
code_map = self._memory_profile_results
if not has_memory_profiler and not code_map:
warnings.warn(
"Install the 'memory_profiler' library in the cluster to enable memory profiling",
UserWarning,
)
def show(id: int) -> None:
cm = code_map.get(id)
if cm is not None:
print("=" * 60)
print(f"Profile of UDF<id={id}>")
print("=" * 60)
MemoryProfiler._show_results(cm)
if id is not None:
show(id)
else:
for id in sorted(code_map.keys()):
show(id)
@property
def _memory_profile_results(self) -> Dict[int, CodeMapDict]:
with self._lock:
return {
result_id: mem
for result_id, (_, mem, *_) in self._profile_results.items()
if mem is not None
}
@property
@abstractmethod
def _profile_results(self) -> "ProfileResults":
"""
Get the profile results.
"""
...
def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None:
"""
Dump the perf profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the perf profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
stats = self._perf_profile_results
def dump(id: int) -> None:
s = stats.get(id)
if s is not None:
os.makedirs(path, exist_ok=True)
p = os.path.join(path, f"udf_{id}_perf.pstats")
s.dump_stats(p)
if id is not None:
dump(id)
else:
for id in sorted(stats.keys()):
dump(id)
def dump_memory_profiles(self, path: str, id: Optional[int] = None) -> None:
"""
Dump the memory profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the memory profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
code_map = self._memory_profile_results
if not has_memory_profiler and not code_map:
warnings.warn(
"Install the 'memory_profiler' library in the cluster to enable memory profiling",
UserWarning,
)
def dump(id: int) -> None:
cm = code_map.get(id)
if cm is not None:
os.makedirs(path, exist_ok=True)
p = os.path.join(path, f"udf_{id}_memory.txt")
with open(p, "w+") as f:
MemoryProfiler._show_results(cm, stream=f)
if id is not None:
dump(id)
else:
for id in sorted(code_map.keys()):
dump(id)
def clear_perf_profiles(self, id: Optional[int] = None) -> None:
"""
Clear the perf profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
"""
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id]
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
def clear_memory_profiles(self, id: Optional[int] = None) -> None:
"""
Clear the memory profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
"""
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id]
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
class AccumulatorProfilerCollector(ProfilerCollector):
def __init__(self) -> None:
super().__init__()
if SpecialAccumulatorIds.SQL_UDF_PROFIER in _accumulatorRegistry:
self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER]
else:
self._accumulator = Accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
)
@property
def _profile_results(self) -> "ProfileResults":
with self._lock:
value = self._accumulator.value
return value if value is not None else {}
class Profile:
"""User-facing profile API. This instance can be accessed by
:attr:`spark.profile`.
.. versionadded:: 4.0.0
"""
def __init__(self, profiler_collector: ProfilerCollector):
self.profiler_collector = profiler_collector
def show(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Show the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
type : str, optional
The profiler type, which can be either "perf" or "memory".
Notes
-----
The results are gathered from all Python executions. For example, if there are
8 tasks, each processing 1,000 rows, the total output will display the results
for 8,000 rows.
"""
if type == "memory":
self.profiler_collector.show_memory_profiles(id)
elif type == "perf" or type is None:
self.profiler_collector.show_perf_profiles(id)
if type is None: # Show both perf and memory profiles
self.profiler_collector.show_memory_profiles(id)
else:
raise PySparkValueError(
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Dump the profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
type : str, optional
The profiler type, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.dump_memory_profiles(path, id)
elif type == "perf" or type is None:
self.profiler_collector.dump_perf_profiles(path, id)
if type is None: # Dump both perf and memory profiles
self.profiler_collector.dump_memory_profiles(path, id)
else:
raise PySparkValueError(
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
@overload
def render(self, id: int, *, type: Optional[str] = None, renderer: Optional[str] = None) -> Any:
...
@overload
def render(
self, id: int, *, type: Optional[Literal["perf"]], renderer: Callable[[pstats.Stats], Any]
) -> Any:
...
@overload
def render(
self, id: int, *, type: Literal["memory"], renderer: Callable[[CodeMapDict], Any]
) -> Any:
...
def render(
self,
id: int,
*,
type: Optional[str] = None,
renderer: Optional[
Union[str, Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]
] = None,
) -> Any:
"""
Render the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int
The UDF ID whose profiling results should be rendered.
type : str, optional
The profiler type to render results for, which can be either "perf" or "memory".
If not specified, defaults to "perf".
renderer : str or callable, optional
The renderer to use. If not specified, the default renderer will be "flameprof"
for the "perf" profiler, which returns an :class:`IPython.display.HTML` object in
an IPython environment to draw the figure; otherwise, it returns the SVG source string.
For the "memory" profiler, no default renderer is provided.
If a callable is provided, it should take a `pstats.Stats` object for "perf" profiler,
and `CodeMapDict` for "memory" profiler, and return the rendered result.
"""
result: Optional[Union[pstats.Stats, CodeMapDict]]
if type is None:
type = "perf"
if type == "perf":
result = self.profiler_collector._perf_profile_results.get(id)
elif type == "memory":
result = self.profiler_collector._memory_profile_results.get(id)
else:
raise PySparkValueError(
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
render: Optional[Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]] = None
if renderer is None or isinstance(renderer, str):
render = _renderers.get((type, renderer))
elif callable(renderer):
render = renderer
if render is None:
raise PySparkValueError(
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "(type, renderer)",
"allowed_values": str(list(_renderers.keys())),
},
)
if result is not None:
return render(result) # type:ignore[arg-type]
def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Clear the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
type : str, optional
The profiler type to clear results for, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.clear_memory_profiles(id)
elif type == "perf" or type is None:
self.profiler_collector.clear_perf_profiles(id)
if type is None: # Clear both perf and memory profiles
self.profiler_collector.clear_memory_profiles(id)
else:
raise PySparkValueError(
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
def _render_flameprof(stats: pstats.Stats) -> Any:
try:
from flameprof import render
except ImportError:
raise PySparkValueError(
errorClass="PACKAGE_NOT_INSTALLED",
messageParameters={"package_name": "flameprof", "minimum_version": "0.4"},
)
buf = StringIO()
render(stats.stats, buf) # type: ignore[attr-defined]
svg = buf.getvalue()
try:
import IPython
ipython = IPython.get_ipython()
except ImportError:
ipython = None
if ipython:
from IPython.display import HTML
return HTML(svg)
else:
return svg
_renderers: Dict[
Tuple[str, Optional[str]], Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]
] = {
("perf", None): _render_flameprof,
("perf", "flameprof"): _render_flameprof,
}