blob: 35ab65d0549c31e2f8fdffe250ab4cb731ec9bb8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from queue import Queue
from skywalking.loggings import logger
from skywalking.profile.profile_constants import ProfileConstants
from skywalking.profile.profile_task import ProfileTask
class ProfileTaskExecutionService:
def __init__(self):
# Queue is thread safe
self.__profile_task_list = Queue() # type: Queue
self.__last_command_create_time = -1 # type: int
def get_last_command_create_time(self) -> int:
return self.__last_command_create_time
def add_profile_task(self, task: ProfileTask):
# update last command create time, which will be used in command query
if task.create_time > self.__last_command_create_time:
self.__last_command_create_time = task.create_time
# check profile task object
result = self.__check_profile_task(task)
if not result.success:
logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
# add task to list
class CheckResult:
def __init__(self, success: bool, error_reason: str):
self.success = success # type: bool
self.error_reason = error_reason # type: str
def __check_profile_task(self, task: ProfileTask) -> CheckResult:
# endpoint name
if len(task.first_span_op_name) == 0:
return self.CheckResult(False, "endpoint name [{}] error, "
"should be str and not empty".format(task.first_span_op_name))
# duration
if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
return self.CheckResult(False, "monitor duration must greater"
" than {} minutes".format(ProfileConstants.TASK_DURATION_MIN_MINUTE))
if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
return self.CheckResult(False, "monitor duration must less"
" than {} minutes".format(ProfileConstants.TASK_DURATION_MAX_MINUTE))
# min duration threshold
if task.min_duration_threshold < 0:
return self.CheckResult(False, "min duration threshold must greater than or equals zero")
# dump period
if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
return self.CheckResult(False, "dump period must be greater than or equals to {}"
" milliseconds".format(ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS))
# max sampling count
if task.max_sampling_count <= 0:
return self.CheckResult(False, "max sampling count must greater than zero")
if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
return self.CheckResult(False, "max sampling count must less"
" than {}".format(ProfileConstants.TASK_MAX_SAMPLING_COUNT))
# check task queue
task_finish_time = self.__cal_profile_task_finish_time(task)
# lock the self.__profile_task_list.queue when check the item in it, avoid concurrency errors
with self.__profile_task_list.mutex:
for profile_task in self.__profile_task_list.queue: # type: ProfileTask
# if the end time of the task to be added is during the execution of any data, means is a error data
if task.start_time <= task_finish_time <= self.__cal_profile_task_finish_time(profile_task):
return self.CheckResult(False, "there already have processing task in time range, "
"could not add a new task again. processing task monitor "
"endpoint name: {}".format(profile_task.first_span_op_name))
return self.CheckResult(True, "")
except TypeError:
return self.CheckResult(False, "ProfileTask attributes has type error")
def __cal_profile_task_finish_time(self, task: ProfileTask) -> int:
return task.start_time + task.duration * self.MINUTE_TO_MILLIS