from typing import (
import cProfile
import inspect
import pstats
import linecache
import os
import atexit
import sys
import warnings
from memory_profiler import CodeMap, LineProfiler
has_memory_profiler = True
except Exception:
has_memory_profiler = False
from pyspark.accumulators import AccumulatorParam
from pyspark.errors import PySparkRuntimeError
from pyspark.core.context import SparkContext
MemoryTuple = Tuple[float, float, int]
LineProfile = Tuple[int, Optional[MemoryTuple]]
CodeMapDict = Dict[str, List[LineProfile]]
class ProfilerCollector:
This class keeps track of different profilers on a per
stage/UDF basis. Also this is used to create new profilers for
the different stages/UDFs.
def __init__(
profiler_cls: Type["Profiler"],
udf_profiler_cls: Type["Profiler"],
memory_profiler_cls: Type["Profiler"],
dump_path: Optional[str] = None,
self.profiler_cls: Type[Profiler] = profiler_cls
self.udf_profiler_cls: Type[Profiler] = udf_profiler_cls
self.memory_profiler_cls: Type[Profiler] = memory_profiler_cls
self.profile_dump_path: Optional[str] = dump_path
self.profilers: List[List[Any]] = []
def new_profiler(self, ctx: "SparkContext") -> "Profiler":
"""Create a new profiler using class `profiler_cls`"""
return self.profiler_cls(ctx)
def new_udf_profiler(self, ctx: "SparkContext") -> "Profiler":
"""Create a new profiler using class `udf_profiler_cls`"""
return self.udf_profiler_cls(ctx)
def new_memory_profiler(self, ctx: "SparkContext") -> "Profiler":
"""Create a new profiler using class `memory_profiler_cls`"""
return self.memory_profiler_cls(ctx)
def add_profiler(self, id: int, profiler: "Profiler") -> None:
"""Add a profiler for RDD/UDF `id`"""
if not self.profilers:
if self.profile_dump_path:
atexit.register(self.dump_profiles, self.profile_dump_path)
self.profilers.append([id, profiler, False])
def dump_profiles(self, path: str) -> None:
"""Dump the profile stats into directory `path`"""
for id, profiler, _ in self.profilers:
profiler.dump(id, path)
self.profilers = []
def show_profiles(self) -> None:
"""Print the profile stats to stdout"""
for i, (id, profiler, showed) in enumerate(self.profilers):
if not showed and profiler:
# mark it as showed
self.profilers[i][2] = True
class Profiler:
PySpark supports custom profilers, this is to allow for different profilers to
be used as well as outputting to different formats than what is provided in the
A custom profiler has to define or inherit the following methods:
profile - will produce a system profile of some sort.
stats - return the collected stats.
dump - dumps the profiles to a path
add - adds a profile to the existing accumulated profile
The profiler class is chosen when creating a SparkContext
>>> from pyspark import SparkConf, SparkContext
>>> from pyspark import BasicProfiler
>>> class MyCustomProfiler(BasicProfiler):
... def show(self, id):
... print("My custom profiles for RDD:%s" % id)
>>> conf = SparkConf().set("spark.python.profile", "true")
>>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
>>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.parallelize(range(1000)).count()
>>> sc.show_profiles()
My custom profiles for RDD:1
My custom profiles for RDD:3
>>> sc.stop()
This API is a developer API.
def __init__(self, ctx: "SparkContext") -> None:
def profile(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""Do profiling on the function `func`"""
raise NotImplementedError
def stats(self) -> Union[pstats.Stats, Dict]:
"""Return the collected profiling stats"""
raise NotImplementedError
def show(self, id: int) -> None:
"""Print the profile stats to stdout"""
raise NotImplementedError
def dump(self, id: int, path: str) -> None:
"""Dump the profile into path"""
raise NotImplementedError
if has_memory_profiler:
class CodeMapForUDF(CodeMap):
def add(
code: Any,
toplevel_code: Optional[Any] = None,
sub_lines: Optional[List] = None,
start_line: Optional[int] = None,
) -> None:
if code in self:
if toplevel_code is None:
toplevel_code = code
filename = code.co_filename
if sub_lines is None or start_line is None:
(sub_lines, start_line) = inspect.getsourcelines(code)
linenos = range(start_line, start_line + len(sub_lines))
self._toplevel.append((filename, code, linenos))
self[code] = {}
self[code] = self[toplevel_code]
for subcode in filter(inspect.iscode, code.co_consts):
self.add(subcode, toplevel_code=toplevel_code)
class CodeMapForUDFV2(CodeMap):
def add(
code: Any,
toplevel_code: Optional[Any] = None,
) -> None:
if code in self:
if toplevel_code is None:
toplevel_code = code
filename = code.co_filename
self._toplevel.append((filename, code))
self[code] = {}
self[code] = self[toplevel_code]
for subcode in filter(inspect.iscode, code.co_consts):
self.add(subcode, toplevel_code=toplevel_code)
def items(self) -> Iterator[Tuple[str, Iterator[Tuple[int, Any]]]]:
"""Iterate on the toplevel code blocks."""
for filename, code in self._toplevel:
measures = self[code]
if not measures:
continue # skip if no measurement
line_iterator = ((line, measures[line]) for line in measures.keys())
yield (filename, line_iterator)
class UDFLineProfiler(LineProfiler):
def __init__(self, **kw: Any) -> None:
include_children = kw.get("include_children", False)
backend = kw.get("backend", "psutil")
self.code_map = CodeMapForUDF(include_children=include_children, backend=backend)
def __call__(
func: Optional[Callable[..., Any]] = None,
precision: int = 1,
sub_lines: Optional[List] = None,
start_line: Optional[int] = None,
) -> Callable[..., Any]:
if func is not None:
self.add_function(func, sub_lines=sub_lines, start_line=start_line)
f = self.wrap_function(func)
f.__module__ = func.__module__
f.__name__ = func.__name__
f.__doc__ = func.__doc__
f.__dict__.update(getattr(func, "__dict__", {}))
return f
def inner_partial(f: Callable[..., Any]) -> Any:
return self.__call__(f, precision=precision)
return inner_partial
def add_function(
func: Callable[..., Any],
sub_lines: Optional[List] = None,
start_line: Optional[int] = None,
) -> None:
"""Record line profiling information for the given Python function."""
# func_code does not exist in Python3
code = func.__code__
except AttributeError:
warnings.warn("Could not extract a code object for the object %r" % func)
self.code_map.add(code, sub_lines=sub_lines, start_line=start_line)
class UDFLineProfilerV2(LineProfiler):
def __init__(self, **kw: Any) -> None:
include_children = kw.get("include_children", False)
backend = kw.get("backend", "psutil")
self.code_map = CodeMapForUDFV2(include_children=include_children, backend=backend)
class PStatsParam(AccumulatorParam[Optional[pstats.Stats]]):
"""PStatsParam is used to merge pstats.Stats"""
def zero(value: Optional[pstats.Stats]) -> None:
return None
def addInPlace(
value1: Optional[pstats.Stats], value2: Optional[pstats.Stats]
) -> Optional[pstats.Stats]:
if value1 is None:
return value2
return value1
class MemUsageParam(AccumulatorParam[Optional[CodeMapDict]]):
"""MemUsageParam is used to merge memory usage code map"""
def zero(value: Optional[CodeMapDict]) -> None:
return None
def addInPlace(
value1: Optional[CodeMapDict], value2: Optional[CodeMapDict]
) -> Optional[CodeMapDict]:
# An example value looks as below
# {'<command-1598004922717618>': [(3, (144.2578125, 144.2578125, 1)),
# (4, (0.0, 144.2578125, 1))]}
if value1 is None or len(value1) == 0:
return value2
if value2 is None or len(value2) == 0:
return value1
# value1, value2 should have same keys - file name
for filename in value1:
l1 = cast(List[LineProfile], value1.get(filename))
l2 = cast(List[LineProfile], value2.get(filename))
c1 = dict((k, v) for k, v in l1)
c2 = dict((k, v) for k, v in l2)
udf_code_map: Dict[int, Optional[MemoryTuple]] = {}
all_line_numbers = set(c1.keys()) | set(c2.keys())
for lineno in all_line_numbers:
c1_line = c1.get(lineno)
c2_line = c2.get(lineno)
if c1_line and c2_line:
# c1, c2 should have same keys - line number
udf_code_map[lineno] = (
cast(MemoryTuple, c1_line)[0] + cast(MemoryTuple, c2_line)[0], # increment
cast(MemoryTuple, c1_line)[1] + cast(MemoryTuple, c2_line)[1], # mem_usage
cast(MemoryTuple, c1_line)[2]
+ cast(MemoryTuple, c2_line)[2], # occurrences
elif c1_line:
udf_code_map[lineno] = cast(MemoryTuple, c1_line)
elif c2_line:
udf_code_map[lineno] = cast(MemoryTuple, c2_line)
udf_code_map[lineno] = None
value1[filename] = [(k, v) for k, v in udf_code_map.items()]
return value1
class BasicProfiler(Profiler):
BasicProfiler is the default profiler, which is implemented based on
cProfile and Accumulator
def __init__(self, ctx: "SparkContext") -> None:
# Creates a new accumulator for combining the profiles of different
# partitions of a stage
self._accumulator = ctx.accumulator(None, PStatsParam) # type: ignore[arg-type]
def profile(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""Runs and profiles the method to_profile passed in. A profile object is returned."""
pr = cProfile.Profile()
ret = pr.runcall(func, *args, **kwargs)
st = pstats.Stats(pr) = None # type: ignore[attr-defined] # make it picklable
# Adds a new profile to the existing accumulated value
self._accumulator.add(st) # type: ignore[arg-type]
return ret
def stats(self) -> pstats.Stats:
return cast(pstats.Stats, self._accumulator.value)
def show(self, id: int) -> None:
"""Print the profile stats to stdout, id is the RDD id"""
stats = self.stats()
if stats:
print("=" * 60)
print("Profile of RDD<id=%d>" % id)
print("=" * 60)
stats.sort_stats("time", "cumulative").print_stats()
def dump(self, id: int, path: str) -> None:
"""Dump the profile into path, id is the RDD id"""
if not os.path.exists(path):
stats = self.stats()
if stats:
p = os.path.join(path, "rdd_%d.pstats" % id)
class UDFBasicProfiler(BasicProfiler):
UDFBasicProfiler is the profiler for Python/Pandas UDFs.
def show(self, id: int) -> None:
"""Print the profile stats to stdout, id is the PythonUDF id"""
stats = self.stats()
if stats:
print("=" * 60)
print("Profile of UDF<id=%d>" % id)
print("=" * 60)
stats.sort_stats("time", "cumulative").print_stats()
def dump(self, id: int, path: str) -> None:
"""Dump the profile into path, id is the PythonUDF id"""
if not os.path.exists(path):
stats = self.stats()
if stats:
p = os.path.join(path, "udf_%d.pstats" % id)
class MemoryProfiler(Profiler):
MemoryProfiler, which is implemented based on memory profiler and Accumulator
def __init__(self, ctx: "SparkContext") -> None:
# Creates a new accumulator for combining the profiles
self._accumulator = ctx.accumulator(None, MemUsageParam) # type: ignore[arg-type]
def profile( # type: ignore
sub_lines: Optional[List],
start_line: Optional[int],
func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
"""Runs and profiles the method func passed in. A profile object is returned."""
if has_memory_profiler:
profiler = UDFLineProfiler()
wrapped = profiler(func, sub_lines=sub_lines, start_line=start_line)
ret = wrapped(*args, **kwargs)
codemap_dict = {
filename: list(line_iterator)
for filename, line_iterator in profiler.code_map.items()
# Adds a new profile to the existing accumulated value
self._accumulator.add(codemap_dict) # type: ignore[arg-type]
return ret
raise PySparkRuntimeError(
def stats(self) -> CodeMapDict:
"""Return the collected memory profiles"""
return cast(CodeMapDict, self._accumulator.value)
def _show_results(
code_map: CodeMapDict, stream: Optional[Any] = None, precision: int = 1
) -> None:
if stream is None:
stream = sys.stdout
template = "{0:>6} {1:>12} {2:>12} {3:>10} {4:<}"
for filename, lines in code_map.items():
header = template.format(
"Line #", "Mem usage", "Increment", "Occurrences", "Line Contents"
stream.write("Filename: " + filename + "\n\n")
stream.write(header + "\n")
stream.write("=" * len(header) + "\n")
all_lines = linecache.getlines(filename)
float_format = "{0}.{1}f".format(precision + 4, precision)
template_mem = "{0:" + float_format + "} MiB"
lines_dict = {line[0]: line[1] for line in lines}
linenos = range(min(lines_dict), max(lines_dict) + 1)
for lineno in linenos:
total_mem: Union[float, str]
inc: Union[float, str]
occurrences: Union[float, str]
mem = lines_dict.get(lineno)
if mem:
inc = mem[0]
total_mem = mem[1]
total_mem = template_mem.format(total_mem)
occurrences = mem[2]
inc = template_mem.format(inc)
total_mem = ""
inc = ""
occurrences = ""
tmp = template.format(lineno, total_mem, inc, occurrences, all_lines[lineno - 1])
def show(self, id: int) -> None:
"""Print the profile stats to stdout, id is the PythonUDF id"""
code_map = self.stats()
if code_map:
print("=" * 60)
print("Profile of UDF<id=%d>" % id)
print("=" * 60)
def dump(self, id: int, path: str) -> None:
"""Dump the memory profile into path, id is the PythonUDF id"""
if not os.path.exists(path):
stats = self.stats() # dict
if stats:
p = os.path.join(path, "udf_%d_memory.txt" % id)
with open(p, "w+") as f:
self._show_results(stats, stream=f)
if __name__ == "__main__":
import doctest
(failure_count, test_count) = doctest.testmod()
if failure_count: