blob: 283ad367b07a3991954e23de99e2649ebc017081 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import time
import traceback
from packaging import version
from threading import Thread, Event, current_thread
from typing import Optional
from skywalking import agent
from skywalking import config
from skywalking import profile
from skywalking.loggings import logger
from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
from skywalking.profile.profile_task import ProfileTask
from skywalking.profile.snapshot import TracingThreadSnapshot
from skywalking.trace.context import SpanContext
from skywalking.utils.array import AtomicArray
from skywalking.utils.integer import AtomicInteger
from skywalking.utils.time import current_milli_time
THREAD_MODEL = 'thread'
try:
from gevent import monkey
import greenlet
from gevent.exceptions import BlockingSwitchOutError
if monkey.is_module_patched('threading'):
if version.parse(greenlet.__version__) <= version.parse('1.1.3.post0'):
# todo: greenlet will raise a segment fault with signal 11 when it upgrade to 2.0.0
# this issue may be caused by gevent's compatibility with greenlet
# we should do some tests when gevent release a new version to verify if this issue would be fixed
THREAD_MODEL = 'greenlet'
else:
logger.warn('greenlet profiler can not work with version >= 2.0.0')
except ImportError:
pass
class ProfileTaskExecutionContext:
def __init__(self, task: ProfileTask):
self.task = task # type: ProfileTask
self._current_profiling_cnt = AtomicInteger(var=0)
self._total_started_profiling_cnt = AtomicInteger(var=0)
self.profiling_segment_slots = AtomicArray(length=config.agent_profile_max_parallel)
self._profiling_thread = None # type: Optional[Thread]
self._profiling_stop_event = None # type: Optional[Event]
def start_profiling(self):
if THREAD_MODEL == 'greenlet':
# GreenletProfiler will be started when it is created
pass
else:
profile_thread = ProfileThread(self)
self._profiling_stop_event = Event()
self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
self._profiling_thread.start()
def stop_profiling(self):
if THREAD_MODEL == 'greenlet':
for profiler in self.profiling_segment_slots:
if profiler and isinstance(profiler, GreenletProfiler):
profiler.stop_profiling()
else:
if (
self._profiling_thread is not None
and self._profiling_stop_event is not None
):
self._profiling_stop_event.set()
def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
ProfileStatusReference:
"""
check have available slot to profile and add it
"""
# check has available slot
using_slot_cnt = self._current_profiling_cnt.get()
if using_slot_cnt >= config.agent_profile_max_parallel:
return ProfileStatusReference.create_with_none()
# check first operation name matches
if not self.task.first_span_op_name == first_span_opname:
return ProfileStatusReference.create_with_none()
# if out limit started profiling count then stop add profiling
if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
return ProfileStatusReference.create_with_none()
# try to occupy slot
if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
using_slot_cnt + 1):
return ProfileStatusReference.create_with_none()
if THREAD_MODEL == 'greenlet':
curr = greenlet.getcurrent()
thread_profiler = GreenletProfiler(
trace_context=trace_context,
segment_id=segment_id,
profiling_thread=curr,
profile_context=self,
)
thread_profiler.start_profiling(self)
else:
# default is thread
thread_profiler = ThreadProfiler(
trace_context=trace_context,
segment_id=segment_id,
profiling_thread=current_thread(),
profile_context=self,
)
slot_length = self.profiling_segment_slots.length()
for idx in range(slot_length):
# occupy slot success
if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
return thread_profiler.profile_status
return ProfileStatusReference.create_with_none()
def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
if trace_context.profile_status.is_being_watched():
return
# if first_span_opname was changed by other plugin, there can start profile as well
trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
segment_id,
first_span_opname).get())
def stop_tracing_profile(self, trace_context: SpanContext):
"""
find tracing context and clear on slot
"""
for idx, profiler in enumerate(self.profiling_segment_slots):
if profiler and profiler.matches(trace_context):
self.profiling_segment_slots.set(idx, None)
profiler.stop_profiling()
self._current_profiling_cnt.add_and_get(-1)
break
def is_start_profileable(self):
return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
class ProfileThread:
def __init__(self, context: ProfileTaskExecutionContext):
self._task_execution_context = context
self._task_execution_service = profile.profile_task_execution_service
self._stop_event = None # type: Optional[Event]
def start(self, stop_event: Event):
self._stop_event = stop_event
try:
self.profiling(self._task_execution_context)
except Exception as e:
logger.error('profiling task fail. task_id:[%s] error:[%s]', self._task_execution_context.task.task_id, e)
finally:
self._task_execution_service.stop_current_profile_task(self._task_execution_context)
def profiling(self, context: ProfileTaskExecutionContext):
max_sleep_period = context.task.thread_dump_period
while not self._stop_event.is_set():
current_loop_start_time = current_milli_time()
profilers = self._task_execution_context.profiling_segment_slots
for profiler in profilers: # type: ThreadProfiler
if profiler is None or isinstance(profiler, GreenletProfiler):
continue
if profiler.profile_status.get() is ProfileStatus.PENDING:
profiler.start_profiling_if_need()
elif profiler.profile_status.get() is ProfileStatus.PROFILING:
snapshot = profiler.build_snapshot()
if snapshot is not None:
agent.add_profiling_snapshot(snapshot)
else:
# tell execution context current tracing thread dump failed, stop it
context.stop_tracing_profile(profiler.trace_context)
need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
if not need_sleep > 0:
need_sleep = max_sleep_period
# convert to float second
time.sleep(need_sleep / 1000)
class ThreadProfiler:
def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
profile_context: ProfileTaskExecutionContext):
self.trace_context = trace_context
self._segment_id = segment_id
self._profiling_thread = profiling_thread
self._profile_context = profile_context
self._profile_start_time = -1
self.profiling_max_time_mills = config.agent_profile_duration * 60 * 1000
self.dump_sequence = 0
if trace_context.profile_status is None:
self.profile_status = ProfileStatusReference.create_with_pending()
else:
self.profile_status = trace_context.profile_status # type: ProfileStatusReference
self.profile_status.update_status(ProfileStatus.PENDING)
def start_profiling_if_need(self):
if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
self._profile_start_time = current_milli_time()
self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
def stop_profiling(self):
self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
if not self._profiling_thread.is_alive():
return None
current_time = current_milli_time()
stack_list = []
# get thread stack of target thread
stack = sys._current_frames().get(int(self._profiling_thread.ident))
if not stack:
return None
extracted = traceback.extract_stack(stack)
for idx, item in enumerate(extracted):
if idx > config.agent_profile_dump_max_stack_depth:
break
code_sig = f'{item.filename}.{item.name}: {item.lineno}'
stack_list.append(code_sig)
# if is first dump, check is can start profiling
if self.dump_sequence == 0 and not self._profile_context.is_start_profileable():
return None
t = TracingThreadSnapshot(self._profile_context.task.task_id,
self._segment_id,
self.dump_sequence,
current_time,
stack_list)
self.dump_sequence += 1
return t
def matches(self, trace_context: SpanContext) -> bool:
return self.trace_context == trace_context
class GreenletProfiler:
def __init__(
self,
trace_context: SpanContext,
segment_id: str,
profiling_thread, # greenlet
profile_context: ProfileTaskExecutionContext,
):
self._task_execution_service = profile.profile_task_execution_service
self.trace_context = trace_context
self._segment_id = segment_id
self._profiling_thread = profiling_thread
self._profile_context = profile_context
self._profile_start_time = -1
self.profiling_max_time_mills = config.agent_profile_duration * 60 * 1000
self.dump_sequence = 0
self.profile_status = ProfileStatusReference.create_with_pending()
def stop_profiling(self):
curr = self._profiling_thread
curr.settrace(self._old_trace)
self.profile_status.update_status(ProfileStatus.STOPPED)
def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
stack_list = []
extracted = traceback.extract_stack(self._profiling_thread.gr_frame)
for idx, item in enumerate(extracted):
if idx > config.agent_profile_dump_max_stack_depth:
break
code_sig = f'{item.filename}.{item.name}: {item.lineno}'
stack_list.append(code_sig)
# if is first dump, check is can start profiling
if (
self.dump_sequence == 0
and not self._profile_context.is_start_profileable()
):
return None
current_time = current_milli_time()
snapshot = TracingThreadSnapshot(
self._profile_context.task.task_id,
self._segment_id,
self.dump_sequence,
current_time,
stack_list,
)
self.dump_sequence += 1
return snapshot
def start_profiling(self, context: ProfileTaskExecutionContext):
self._task_execution_context = context
try:
curr = self._profiling_thread
def callback(event, args):
origin, target = args
if origin == curr or target == curr:
try:
snapshot = self.build_snapshot()
if snapshot is not None:
agent.add_profiling_snapshot(snapshot)
else:
# tell execution context current tracing thread dump failed, stop it
# todo test it
self._profile_context.stop_tracing_profile(self.trace_context)
except BlockingSwitchOutError:
self._profile_context.stop_tracing_profile(self.trace_context)
except Exception as e:
logger.error(f'build and add snapshot failed. error: {e}')
self._profile_context.stop_tracing_profile(self.trace_context)
raise e
self.profile_status.update_status(ProfileStatus.PROFILING)
self._old_trace = curr.settrace(callback)
except Exception as e:
logger.error('profiling task fail. task_id:[%s] error:[%s]', self._profile_context.task.task_id, e)
# todo test this can current stop profile task or not
self._task_execution_service.stop_current_profile_task(
self._task_execution_context
)
def matches(self, trace_context: SpanContext) -> bool:
return self.trace_context == trace_context