blob: 5caf0ee37f418f252f0c2374fdbd5f230627e673 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from threading import Timer, RLock, Lock
from typing import Tuple
from skywalking.agent import agent
from skywalking.loggings import logger, logger_debug_enabled
from skywalking.profile.profile_constants import ProfileConstants
from skywalking.profile.profile_context import ProfileTaskExecutionContext
from skywalking.profile.profile_status import ProfileStatusReference
from skywalking.profile.profile_task import ProfileTask
from skywalking.trace.context import SpanContext
from skywalking.utils.atomic_ref import AtomicRef
from skywalking.utils.time import current_milli_time
class Scheduler:
def schedule(milliseconds, func, *args, **kwargs):
seconds = milliseconds / 1000
if seconds < 0:
seconds = 0
t = Timer(seconds, func, *args, **kwargs)
t.daemon = True
class ProfileTaskExecutionService:
def __init__(self):
# Queue is thread safe
self._profile_task_list = Queue() # type: Queue
# queue_lock for making sure complex operation of this profile_task_list is thread safe
self.queue_lock = Lock()
self._last_command_create_time = -1 # type: int
# single thread executor
self.profile_executor = ThreadPoolExecutor(thread_name_prefix='profile-executor', max_workers=1)
self.task_execution_context = AtomicRef(None)
self.profile_task_scheduler = Scheduler()
# rlock for process_profile_task and stop_current_profile_task
self._rlock = RLock()
def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
Remove a task from profile_task_list in a thread safe state
with self.queue_lock:
item_left = []
result = False
while not self._profile_task_list.empty():
item = self._profile_task_list.get()
if item == task:
result = True
# not put in item_list for removing it
for item in item_left:
return result
def get_last_command_create_time(self) -> int:
return self._last_command_create_time
def add_profile_task(self, task: ProfileTask):
# update last command create time, which will be used in command query
if task.create_time > self._last_command_create_time:
self._last_command_create_time = task.create_time
# check profile task object
success, error_reason = self._check_profile_task(task)
if not success:
logger.warning('check command error, cannot process this profile task. reason: %s', error_reason)
# add task to list
delay_millis = task.start_time - current_milli_time()
# schedule to start task
self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext
if execution_context is None:
return ProfileStatusReference.create_with_none()
return execution_context.attempt_profiling(context, segment_id, first_span_opname)
def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
Re-check current trace need profiling, in case that third-party plugins change the operation name.
execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext
if execution_context is None:
execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
# using reentrant lock for process_profile_task and stop_current_profile_task,
# to make sure thread safe.
def process_profile_task(self, task: ProfileTask):
with self._rlock:
# make sure prev profile task already stopped
# make stop task schedule and task context
current_context = ProfileTaskExecutionContext(task)
# start profiling this task
if logger_debug_enabled:
logger.debug('profile task [%s] for endpoint [%s] started', task.task_id, task.first_span_op_name)
millis = task.duration * self.MINUTE_TO_MILLIS
self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
with self._rlock:
# need_stop is None or task_execution_context is not need_stop context
if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
if logger_debug_enabled:
logger.debug('profile task [%s] for endpoint [%s] stopped', need_stop.task.task_id,
# notify profiling task has finished
def _check_profile_task(self, task: ProfileTask) -> Tuple[bool, str]:
# endpoint name
if len(task.first_span_op_name) == 0:
return (False, f'endpoint name [{task.first_span_op_name}] error, '
f'should be str and not empty')
# duration
if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
return (False, f'monitor duration must be greater '
f'than {ProfileConstants.TASK_DURATION_MIN_MINUTE} minutes')
if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
return (False, f'monitor duration must be less '
f'than {ProfileConstants.TASK_DURATION_MAX_MINUTE} minutes')
# min duration threshold
if task.min_duration_threshold < 0:
return False, 'min duration threshold must be greater than or equals zero'
# dump period
if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
return (False,
f'dump period must be greater than or equals to '
f'{ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS} milliseconds')
# max sampling count
if task.max_sampling_count <= 0:
return False, 'max sampling count must be greater than zero'
if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
return (False, f'max sampling count must be less than '
# check task queue
task_finish_time = self._cal_profile_task_finish_time(task)
# lock the self._profile_task_list.queue when check the item in it, avoid concurrency errors
with self._profile_task_list.mutex:
for profile_task in self._profile_task_list.queue: # type: ProfileTask
# if the end time of the task to be added is during the execution of any data, means is a error data
if task.start_time <= task_finish_time <= self._cal_profile_task_finish_time(profile_task):
return (False,
f'there already have processing task in time range, '
f'could not add a new task again. processing task '
f'monitor endpoint name: {profile_task.first_span_op_name}')
return True, ''
except TypeError:
return False, 'ProfileTask attributes have a type error'
def _cal_profile_task_finish_time(self, task: ProfileTask) -> int:
return task.start_time + task.duration * self.MINUTE_TO_MILLIS