blob: 142e7ca6059f4f91b7c41a01419be14a7ef3009f [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import Queue
import logging
import multiprocessing
import os
import pprint
import threading
import time
import signal
from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
logger = logging.getLogger(__name__)
class StatusCommandsExecutor(object):
def put_commands(self, commands):
raise NotImplemented()
def process_results(self):
raise NotImplemented()
def relaunch(self, reason=None):
raise NotImplemented()
def kill(self, reason=None, can_relaunch=True):
raise NotImplemented()
class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
def __init__(self, config, actionQueue):
self.config = config
self.actionQueue = actionQueue
self.statusCommandQueue = Queue.Queue()
self.need_relaunch = (False, None) # tuple (bool, str|None) with flag to relaunch and reason of relaunch
def put_commands(self, commands):
with self.statusCommandQueue.mutex:
qlen = len(self.statusCommandQueue.queue)
if qlen:
logger.info("Removing %s stale status commands from queue", qlen)
self.statusCommandQueue.queue.clear()
for command in commands:
logger.info("Adding " + command['commandType'] + " for component " + \
command['componentName'] + " of service " + \
command['serviceName'] + " of cluster " + \
command['clusterName'] + " to the queue.")
self.statusCommandQueue.put(command)
logger.debug(pprint.pformat(command))
def process_results(self):
"""
Execute a single command from the queue and process it
"""
while not self.statusCommandQueue.empty():
try:
command = self.statusCommandQueue.get(False)
self.actionQueue.process_status_command_result(self.actionQueue.execute_status_command_and_security_status(command))
except Queue.Empty:
pass
def relaunch(self, reason=None):
pass
def kill(self, reason=None, can_relaunch=True):
pass
class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
def __init__(self, config, actionQueue):
self.config = config
self.actionQueue = actionQueue
self.can_relaunch = True
# used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
# old and new queues
self.usage_lock = threading.RLock()
# protects against simultaneous killing/creating from different threads.
self.kill_lock = threading.RLock()
self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
self.worker_process = None
self.mustDieEvent = multiprocessing.Event()
self.timedOutEvent = multiprocessing.Event()
# multiprocessing stuff that need to be cleaned every time
self.mp_result_queue = multiprocessing.Queue()
self.mp_result_logs = multiprocessing.Queue()
self.mp_task_queue = multiprocessing.Queue()
def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001):
"""
Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains
extremely dumb protection against blocking too much at this method: will try to get all possible items for not more
than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised
``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read
~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast.
:param target_queue: queue to read from
:param max_time: maximum time to spend in this method call
:param max_empty_count: maximum allowed ``Queue.Empty`` in a row
:param read_break: time to wait before next read cycle iteration
:return: list of resulting objects
"""
results = []
_empty = 0
_start = time.time()
with self.usage_lock:
try:
while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count:
try:
results.append(target_queue.get(False))
_empty = 0
time.sleep(read_break) # sleep a little to get more accurate empty and qsize results
except Queue.Empty:
_empty += 1
except IOError:
pass
except UnicodeDecodeError:
pass
except IOError:
pass
return results
def _log_message(self, level, message, exception=None):
"""
Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
:param level:
:param message:
:param exception:
:return:
"""
result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
self.mp_result_logs.put((level, result_message, exception))
def _process_logs(self):
"""
Get all available at this moment logs and prints them to logger.
"""
for level, message, exception in self._drain_queue(self.mp_result_logs):
if level == logging.ERROR:
logger.debug(message, exc_info=exception)
if level == logging.WARN:
logger.warn(message)
if level == logging.INFO:
logger.info(message)
def _worker_process_target(self):
"""
Internal method that running in separate process.
"""
# cleanup monkey-patching results in child process, as it causing problems
import subprocess
reload(subprocess)
import multiprocessing
reload(multiprocessing)
bind_debug_signal_handlers()
self._log_message(logging.INFO, "StatusCommandsExecutor process started")
# region StatusCommandsExecutor process internals
internal_in_queue = Queue.Queue()
internal_out_queue = Queue.Queue()
def _internal_worker():
"""
thread that actually executes status commands
"""
while True:
_cmd = internal_in_queue.get()
internal_out_queue.put(self.actionQueue.execute_status_command_and_security_status(_cmd))
worker = threading.Thread(target=_internal_worker)
worker.daemon = True
worker.start()
def _internal_process_command(_command):
internal_in_queue.put(_command)
start_time = time.time()
result = None
while not self.mustDieEvent.is_set() and not result and time.time() - start_time < self.status_command_timeout:
try:
result = internal_out_queue.get(timeout=1)
except Queue.Empty:
pass
if result:
self.mp_result_queue.put(result)
return True
else:
# do not set timed out event twice
if not self.timedOutEvent.is_set():
self._set_timed_out(_command)
return False
# endregion
try:
while not self.mustDieEvent.is_set():
try:
command = self.mp_task_queue.get(False)
except Queue.Empty:
# no command, lets try in other loop iteration
time.sleep(.1)
continue
self._log_message(logging.DEBUG, "Running status command for {0}".format(command['componentName']))
if _internal_process_command(command):
self._log_message(logging.DEBUG, "Completed status command for {0}".format(command['componentName']))
except Exception as e:
self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:", e)
raise
self._log_message(logging.INFO, "StatusCommandsExecutor subprocess finished")
def _set_timed_out(self, command):
"""
Set timeout event and adding log entry for given command.
:param command:
:return:
"""
msg = "Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(
command['commandType'],
command['componentName'],
self.status_command_timeout
)
self._log_message(logging.WARN, msg)
self.timedOutEvent.set()
def put_commands(self, commands):
"""
Put given commands to command executor.
:param commands: status commands to execute
:return:
"""
with self.usage_lock:
for command in commands:
logger.info("Adding " + command['commandType'] + " for component " + \
command['componentName'] + " of service " + \
command['serviceName'] + " of cluster " + \
command['clusterName'] + " to the queue.")
self.mp_task_queue.put(command)
logger.debug(pprint.pformat(command))
def process_results(self):
"""
Process all the results from the SCE worker process.
"""
self._process_logs()
results = self._drain_queue(self.mp_result_queue)
logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize())
for result in results:
try:
self.actionQueue.process_status_command_result(result)
except UnicodeDecodeError:
pass
@property
def need_relaunch(self):
"""
Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
:return: tuple (bool, str|None) with flag to relaunch and reason of relaunch
"""
if not self.worker_process or not self.worker_process.is_alive():
return True, "WORKER_DEAD"
elif self.timedOutEvent.is_set():
return True, "COMMAND_TIMEOUT"
return False, None
def relaunch(self, reason=None):
"""
Restart status command executor internal process.
:param reason: reason of restart
:return:
"""
with self.kill_lock:
logger.info("Relaunching child process reason:" + str(reason))
if self.can_relaunch:
self.kill(reason)
self.worker_process = multiprocessing.Process(target=self._worker_process_target)
self.worker_process.start()
logger.info("Started process with pid {0}".format(self.worker_process.pid))
else:
logger.debug("Relaunch does not allowed, can not relaunch")
def kill(self, reason=None, can_relaunch=True):
"""
Tries to stop command executor internal process for sort time, otherwise killing it. Closing all possible queues to
unblock threads that probably blocked on read or write operations to queues. Must be called from threads different
from threads that calling read or write methods(get_log_messages, get_results, put_commands).
:param can_relaunch: indicates if StatusCommandsExecutor can be relaunched after this kill
:param reason: reason of killing
:return:
"""
with self.kill_lock:
self.can_relaunch = can_relaunch
if not self.can_relaunch:
logger.info("Killing without possibility to relaunch...")
# try graceful stop, otherwise hard-kill
if self.worker_process and self.worker_process.is_alive():
self.mustDieEvent.set()
self.worker_process.join(timeout=3)
if self.worker_process.is_alive():
os.kill(self.worker_process.pid, signal.SIGKILL)
logger.info("Child process killed by -9")
else:
# get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
# this call will do nothing, as all logs will be processed in ActionQueue loop
self._process_logs()
logger.info("Child process died gracefully")
else:
logger.info("Child process already dead")
# close queues and acquire usage lock
# closing both sides of pipes here, we need this hack in case of blocking on recv() call
self.mp_result_queue.close()
self.mp_result_queue._writer.close()
self.mp_result_logs.close()
self.mp_result_logs._writer.close()
self.mp_task_queue.close()
self.mp_task_queue._writer.close()
with self.usage_lock:
self.mp_result_queue.join_thread()
self.mp_result_queue = multiprocessing.Queue()
self.mp_task_queue.join_thread()
self.mp_task_queue = multiprocessing.Queue()
self.mp_result_logs.join_thread()
self.mp_result_logs = multiprocessing.Queue()
self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
self.mustDieEvent.clear()
self.timedOutEvent.clear()