blob: 711e39de4723b7c2a79abe89cab279bd7eb45f63 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABC, abstractmethod
import os
import pstats
from threading import RLock
from typing import Dict, Optional, TYPE_CHECKING
from pyspark.accumulators import (
Accumulator,
AccumulatorParam,
SpecialAccumulatorIds,
_accumulatorRegistry,
)
from pyspark.errors import PySparkValueError
from pyspark.profiler import CodeMapDict, MemoryProfiler, MemUsageParam, PStatsParam
if TYPE_CHECKING:
from pyspark.sql._typing import ProfileResults
class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]):
"""
AccumulatorParam for profilers.
"""
@staticmethod
def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]:
return value
@staticmethod
def addInPlace(
value1: Optional["ProfileResults"], value2: Optional["ProfileResults"]
) -> Optional["ProfileResults"]:
if value1 is None or len(value1) == 0:
value1 = {}
if value2 is None or len(value2) == 0:
value2 = {}
value = value1.copy()
for key, (perf, mem, *_) in value2.items():
if key in value1:
orig_perf, orig_mem, *_ = value1[key]
else:
orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None))
value[key] = (
PStatsParam.addInPlace(orig_perf, perf),
MemUsageParam.addInPlace(orig_mem, mem),
)
return value
ProfileResultsParam = _ProfileResultsParam()
class ProfilerCollector(ABC):
"""
A base class of profiler collectors for session based profilers.
This supports cProfiler and memory-profiler enabled by setting a SQL config
`spark.sql.pyspark.udf.profiler` to "perf" or "memory".
"""
def __init__(self) -> None:
self._lock = RLock()
def show_perf_profiles(self, id: Optional[int] = None) -> None:
"""
Show the perf profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
stats = self._perf_profile_results
def show(id: int) -> None:
s = stats.get(id)
if s is not None:
print("=" * 60)
print(f"Profile of UDF<id={id}>")
print("=" * 60)
s.sort_stats("time", "cumulative").print_stats()
if id is not None:
show(id)
else:
for id in sorted(stats.keys()):
show(id)
@property
def _perf_profile_results(self) -> Dict[int, pstats.Stats]:
with self._lock:
return {
result_id: perf
for result_id, (perf, _, *_) in self._profile_results.items()
if perf is not None
}
def show_memory_profiles(self, id: Optional[int] = None) -> None:
"""
Show the memory profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
code_map = self._memory_profile_results
def show(id: int) -> None:
cm = code_map.get(id)
if cm is not None:
print("=" * 60)
print(f"Profile of UDF<id={id}>")
print("=" * 60)
MemoryProfiler._show_results(cm)
if id is not None:
show(id)
else:
for id in sorted(code_map.keys()):
show(id)
@property
def _memory_profile_results(self) -> Dict[int, CodeMapDict]:
with self._lock:
return {
result_id: mem
for result_id, (_, mem, *_) in self._profile_results.items()
if mem is not None
}
@property
@abstractmethod
def _profile_results(self) -> "ProfileResults":
"""
Get the profile results.
"""
...
def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None:
"""
Dump the perf profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the perf profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
stats = self._perf_profile_results
def dump(id: int) -> None:
s = stats.get(id)
if s is not None:
if not os.path.exists(path):
os.makedirs(path)
p = os.path.join(path, f"udf_{id}_perf.pstats")
s.dump_stats(p)
if id is not None:
dump(id)
else:
for id in sorted(stats.keys()):
dump(id)
def dump_memory_profiles(self, path: str, id: Optional[int] = None) -> None:
"""
Dump the memory profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the memory profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
code_map = self._memory_profile_results
def dump(id: int) -> None:
cm = code_map.get(id)
if cm is not None:
if not os.path.exists(path):
os.makedirs(path)
p = os.path.join(path, f"udf_{id}_memory.txt")
with open(p, "w+") as f:
MemoryProfiler._show_results(cm, stream=f)
if id is not None:
dump(id)
else:
for id in sorted(code_map.keys()):
dump(id)
def clear_perf_profiles(self, id: Optional[int] = None) -> None:
"""
Clear the perf profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
"""
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id]
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
def clear_memory_profiles(self, id: Optional[int] = None) -> None:
"""
Clear the memory profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
"""
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id]
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
class AccumulatorProfilerCollector(ProfilerCollector):
def __init__(self) -> None:
super().__init__()
if SpecialAccumulatorIds.SQL_UDF_PROFIER in _accumulatorRegistry:
self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER]
else:
self._accumulator = Accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
)
@property
def _profile_results(self) -> "ProfileResults":
with self._lock:
value = self._accumulator.value
return value if value is not None else {}
class Profile:
"""User-facing profile API. This instance can be accessed by
:attr:`spark.profile`.
.. versionadded: 4.0.0
"""
def __init__(self, profiler_collector: ProfilerCollector):
self.profiler_collector = profiler_collector
def show(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Show the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
type : str, optional
The profiler type, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.show_memory_profiles(id)
elif type == "perf" or type is None:
self.profiler_collector.show_perf_profiles(id)
if type is None: # Show both perf and memory profiles
self.profiler_collector.show_memory_profiles(id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Dump the profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
type : str, optional
The profiler type, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.dump_memory_profiles(path, id)
elif type == "perf" or type is None:
self.profiler_collector.dump_perf_profiles(path, id)
if type is None: # Dump both perf and memory profiles
self.profiler_collector.dump_memory_profiles(path, id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Clear the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
type : str, optional
The profiler type to clear results for, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.clear_memory_profiles(id)
elif type == "perf" or type is None:
self.profiler_collector.clear_perf_profiles(id)
if type is None: # Clear both perf and memory profiles
self.profiler_collector.clear_memory_profiles(id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)