blob: 5caf0ee37f418f252f0c2374fdbd5f230627e673 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from threading import Timer, RLock, Lock
from typing import Tuple
from skywalking.agent import agent
from skywalking.loggings import logger, logger_debug_enabled
from skywalking.profile.profile_constants import ProfileConstants
from skywalking.profile.profile_context import ProfileTaskExecutionContext
from skywalking.profile.profile_status import ProfileStatusReference
from skywalking.profile.profile_task import ProfileTask
from skywalking.trace.context import SpanContext
from skywalking.utils.atomic_ref import AtomicRef
from skywalking.utils.time import current_milli_time
class Scheduler:
@staticmethod
def schedule(milliseconds, func, *args, **kwargs):
seconds = milliseconds / 1000
if seconds < 0:
seconds = 0
t = Timer(seconds, func, *args, **kwargs)
t.daemon = True
t.start()
class ProfileTaskExecutionService:
MINUTE_TO_MILLIS = 60000
def __init__(self):
# Queue is thread safe
self._profile_task_list = Queue() # type: Queue
# queue_lock for making sure complex operation of this profile_task_list is thread safe
self.queue_lock = Lock()
self._last_command_create_time = -1 # type: int
# single thread executor
self.profile_executor = ThreadPoolExecutor(thread_name_prefix='profile-executor', max_workers=1)
self.task_execution_context = AtomicRef(None)
self.profile_task_scheduler = Scheduler()
# rlock for process_profile_task and stop_current_profile_task
self._rlock = RLock()
def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
"""
Remove a task from profile_task_list in a thread safe state
"""
with self.queue_lock:
item_left = []
result = False
while not self._profile_task_list.empty():
item = self._profile_task_list.get()
if item == task:
result = True
# not put in item_list for removing it
continue
item_left.append(item)
for item in item_left:
self._profile_task_list.put(item)
return result
def get_last_command_create_time(self) -> int:
return self._last_command_create_time
def add_profile_task(self, task: ProfileTask):
# update last command create time, which will be used in command query
if task.create_time > self._last_command_create_time:
self._last_command_create_time = task.create_time
# check profile task object
success, error_reason = self._check_profile_task(task)
if not success:
logger.warning('check command error, cannot process this profile task. reason: %s', error_reason)
return
# add task to list
self._profile_task_list.put(task)
delay_millis = task.start_time - current_milli_time()
# schedule to start task
self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext
if execution_context is None:
return ProfileStatusReference.create_with_none()
return execution_context.attempt_profiling(context, segment_id, first_span_opname)
def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
"""
Re-check current trace need profiling, in case that third-party plugins change the operation name.
"""
execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext
if execution_context is None:
return
execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
# using reentrant lock for process_profile_task and stop_current_profile_task,
# to make sure thread safe.
def process_profile_task(self, task: ProfileTask):
with self._rlock:
# make sure prev profile task already stopped
self.stop_current_profile_task(self.task_execution_context.get())
# make stop task schedule and task context
current_context = ProfileTaskExecutionContext(task)
self.task_execution_context.set(current_context)
# start profiling this task
current_context.start_profiling()
if logger_debug_enabled:
logger.debug('profile task [%s] for endpoint [%s] started', task.task_id, task.first_span_op_name)
millis = task.duration * self.MINUTE_TO_MILLIS
self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
with self._rlock:
# need_stop is None or task_execution_context is not need_stop context
if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
return
need_stop.stop_profiling()
if logger_debug_enabled:
logger.debug('profile task [%s] for endpoint [%s] stopped', need_stop.task.task_id,
need_stop.task.first_span_op_name)
self.remove_from_profile_task_list(need_stop.task)
# notify profiling task has finished
agent.notify_profile_finish(need_stop.task)
def _check_profile_task(self, task: ProfileTask) -> Tuple[bool, str]:
try:
# endpoint name
if len(task.first_span_op_name) == 0:
return (False, f'endpoint name [{task.first_span_op_name}] error, '
f'should be str and not empty')
# duration
if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
return (False, f'monitor duration must be greater '
f'than {ProfileConstants.TASK_DURATION_MIN_MINUTE} minutes')
if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
return (False, f'monitor duration must be less '
f'than {ProfileConstants.TASK_DURATION_MAX_MINUTE} minutes')
# min duration threshold
if task.min_duration_threshold < 0:
return False, 'min duration threshold must be greater than or equals zero'
# dump period
if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
return (False,
f'dump period must be greater than or equals to '
f'{ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS} milliseconds')
# max sampling count
if task.max_sampling_count <= 0:
return False, 'max sampling count must be greater than zero'
if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
return (False, f'max sampling count must be less than '
f'{ProfileConstants.TASK_MAX_SAMPLING_COUNT}')
# check task queue
task_finish_time = self._cal_profile_task_finish_time(task)
# lock the self._profile_task_list.queue when check the item in it, avoid concurrency errors
with self._profile_task_list.mutex:
for profile_task in self._profile_task_list.queue: # type: ProfileTask
# if the end time of the task to be added is during the execution of any data, means is a error data
if task.start_time <= task_finish_time <= self._cal_profile_task_finish_time(profile_task):
return (False,
f'there already have processing task in time range, '
f'could not add a new task again. processing task '
f'monitor endpoint name: {profile_task.first_span_op_name}')
return True, ''
except TypeError:
return False, 'ProfileTask attributes have a type error'
def _cal_profile_task_finish_time(self, task: ProfileTask) -> int:
return task.start_time + task.duration * self.MINUTE_TO_MILLIS