blob: c2d1e89ac577cc6bf33788e20e1e9544ac0af6c1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import queue
from collections import deque
from skywalking.protocol.common.Command_pb2 import Commands, Command
from skywalking.command.base_command import BaseCommand
from skywalking.command.executors import noop_command_executor_instance
from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor
from skywalking.command.profile_task_command import ProfileTaskCommand
from skywalking.loggings import logger
class CommandService:
def __init__(self):
self._commands = queue.Queue() # type: queue.Queue
# don't execute same command twice
self._command_serial_number_cache = CommandSerialNumberCache()
def dispatch(self):
while True:
# block until a command is available
command = self._commands.get() # type: BaseCommand
if not self.__is_command_executed(command):
command_executor_service.execute(command)
self._command_serial_number_cache.add(command.serial_number)
def __is_command_executed(self, command: BaseCommand):
return self._command_serial_number_cache.contains(command.serial_number)
def receive_command(self, commands: Commands):
for command in commands.commands:
try:
base_command = CommandDeserializer.deserialize(command)
logger.debug('received command [{%s} {%s}]', base_command.command, base_command.serial_number)
if self.__is_command_executed(base_command):
logger.warning('command[{%s}] is executed, ignored.', base_command.command)
continue
try:
self._commands.put(base_command)
except queue.Full:
logger.warning('command[{%s}, {%s}] cannot add to command list. because the command list is full.',
base_command.command, base_command.serial_number)
except UnsupportedCommandException as e:
logger.warning('received unsupported command[{%s}].', e.command.command)
class CommandSerialNumberCache:
def __init__(self, maxlen=64):
self.queue = deque(maxlen=maxlen)
def add(self, number: str):
# Once a bounded length deque is full, when new items are added,
# a corresponding number of items are discarded from the opposite end.
self.queue.append(number)
def contains(self, number: str) -> bool:
try:
_ = self.queue.index(number)
return True
except ValueError:
return False
class CommandExecutorService:
"""
route commands to appropriate executor
"""
def __init__(self):
self.__command_executor_map = {ProfileTaskCommand.NAME: ProfileTaskCommandExecutor()}
def execute(self, command: BaseCommand):
self.__executor_for_command(command).execute(command)
def __executor_for_command(self, command: BaseCommand):
executor = self.__command_executor_map.get(command.command)
if not executor:
return noop_command_executor_instance
return executor
class CommandDeserializer:
@staticmethod
def deserialize(command: Command) -> BaseCommand:
command_name = command.command
if ProfileTaskCommand.NAME == command_name:
return ProfileTaskCommand.deserialize(command)
else:
raise UnsupportedCommandException(command)
class UnsupportedCommandException(Exception):
def __init__(self, command):
self.command = command
# init
command_executor_service = CommandExecutorService()