blob: eede227bfafaeb74e406446a25a95c0aad5d9ec1 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import Queue
import logging
import threading
import pprint
import os
import ambari_simplejson as json
import time
import signal
import re
from AgentException import AgentException
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_agent.models.commands import AgentCommand, CommandStatus
from ambari_commons.str_utils import split_on_chunks
logger = logging.getLogger()
installScriptHash = -1
MAX_SYMBOLS_PER_LOG_MESSAGE = 7900
PASSWORD_REPLACEMENT = '[PROTECTED]'
PASSWORD_PATTERN = re.compile(r"('\S*password':\s*u?')(\S+)(')")
def hide_passwords(text):
""" Replaces the matching passwords with **** in the given text """
return None if text is None else PASSWORD_PATTERN.sub(r'\1{}\3'.format(PASSWORD_REPLACEMENT), text)
class ActionQueue(threading.Thread):
""" Action Queue for the agent. We pick one command at a time from the queue
and execute it
Note: Action and command terms in this and related classes are used interchangeably
"""
# How many actions can be performed in parallel. Feel free to change
MAX_CONCURRENT_ACTIONS = 5
# How much time(in seconds) we need wait for new incoming execution command before checking status command queue
EXECUTION_COMMAND_WAIT_TIME = 2
# key name in command dictionary
IS_RECOVERY_COMMAND = "isRecoveryCommand"
def __init__(self, initializer_module):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = initializer_module.commandStatuses
self.config = initializer_module.config
self.recovery_manager = initializer_module.recovery_manager
self.configTags = {}
self.stop_event = initializer_module.stop_event
self.tmpdir = self.config.get('agent', 'prefix')
self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
self.parallel_execution = self.config.get_parallel_exec_option()
self.taskIdsToCancel = set()
self.cancelEvent = threading.Event()
self.component_status_executor = initializer_module.component_status_executor
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
self.lock = threading.Lock()
def put(self, commands):
for command in commands:
if "serviceName" not in command:
command["serviceName"] = "null"
if "clusterId" not in command:
command["clusterId"] = "null"
logger.info("Adding {commandType} for role {role} for service {serviceName} of cluster_id {clusterId} to the queue".format(**command))
if command['commandType'] == AgentCommand.background_execution:
self.backgroundCommandQueue.put(self.create_command_handle(command))
else:
self.commandQueue.put(command)
def interrupt(self):
self.commandQueue.put(None)
def cancel(self, commands):
for command in commands:
logger.info("Canceling command with taskId = {tid}".format(tid = str(command['target_task_id'])))
if logger.isEnabledFor(logging.DEBUG):
logger.debug(pprint.pformat(command))
task_id = command['target_task_id']
reason = command['reason']
# Remove from the command queue by task_id
queue = self.commandQueue
self.commandQueue = Queue.Queue()
while not queue.empty():
queued_command = queue.get(False)
if queued_command['taskId'] != task_id:
self.commandQueue.put(queued_command)
else:
logger.info("Canceling {commandType} for service {serviceName} and role {role} with taskId {taskId}".format(
**queued_command
))
# Kill if in progress
self.customServiceOrchestrator.cancel_command(task_id, reason)
self.taskIdsToCancel.add(task_id)
self.cancelEvent.set()
def run(self):
while not self.stop_event.is_set():
try:
self.process_background_queue_safe_empty()
self.fill_recovery_commands()
try:
if self.parallel_execution == 0:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
if command is None:
break
# Recovery commands should be run in parallel (since we don't know the ordering on agent)
if self.IS_RECOVERY_COMMAND in command and command[self.IS_RECOVERY_COMMAND]:
self.start_parallel_command(command)
else:
self.process_command(command)
else:
# If parallel execution is enabled, just kick off all available
# commands using separate threads
while not self.stop_event.is_set():
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
if command is None:
break
# If command is not retry_enabled then do not start them in parallel
# checking just one command is enough as all commands for a stage is sent
# at the same time and retry is only enabled for initial start/install
retry_able = False
if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
retry_able = command['commandParams']['command_retry_enabled'] == "true"
if retry_able:
self.start_parallel_command(command)
else:
self.process_command(command)
break
pass
pass
except Queue.Empty:
pass
except Exception:
logger.exception("ActionQueue thread failed with exception. Re-running it")
logger.info("ActionQueue thread has successfully finished")
def start_parallel_command(self, command):
logger.info("Kicking off a thread for the command, id={} taskId={}".format(command['commandId'], command['taskId']))
t = threading.Thread(target=self.process_command, args=(command,))
t.daemon = True
t.start()
def fill_recovery_commands(self):
if self.recovery_manager.enabled() and not self.tasks_in_progress_or_pending():
self.put(self.recovery_manager.get_recovery_commands())
def process_background_queue_safe_empty(self):
while not self.backgroundCommandQueue.empty():
try:
command = self.backgroundCommandQueue.get(False)
if "__handle" in command and command["__handle"].status is None:
self.process_command(command)
except Queue.Empty:
pass
def create_command_handle(self, command):
if "__handle" in command:
raise AgentException("Command already has __handle")
command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback)
return command
def process_command(self, command):
# make sure we log failures
command_type = command['commandType']
logger.debug("Took an element of Queue (command type = %s).", command_type)
try:
if command_type in AgentCommand.AUTO_EXECUTION_COMMAND_GROUP:
try:
if self.recovery_manager.enabled():
self.recovery_manager.on_execution_command_start()
self.recovery_manager.process_execution_command(command)
self.execute_command(command)
finally:
if self.recovery_manager.enabled():
self.recovery_manager.on_execution_command_finish()
else:
logger.error("Unrecognized command %s", pprint.pformat(command))
except Exception:
logger.exception("Exception while processing {0} command".format(command_type))
def tasks_in_progress_or_pending(self):
return not self.commandQueue.empty() or self.recovery_manager.has_active_command()
def execute_command(self, command):
"""
Executes commands of type EXECUTION_COMMAND
"""
cluster_id = command['clusterId']
command_id = command['commandId']
command_type = command['commandType']
num_attempts = 0
retry_duration = 0 # even with 0 allow one attempt
retry_able = False
delay = 1
log_command_output = True
command_canceled = False
command_result = {}
message = "Executing command with id = {commandId}, taskId = {taskId} for role = {role} of " \
"cluster_id {cluster}.".format(commandId=str(command_id), taskId=str(command['taskId']),
role=command['role'], cluster=cluster_id)
logger.info(message)
taskId = command['taskId']
# Preparing 'IN_PROGRESS' report
in_progress_status = self.commandStatuses.generate_report_template(command)
# The path of the files that contain the output log and error log use a prefix that the agent advertises to the
# server. The prefix is defined in agent-config.ini
if command_type != AgentCommand.auto_execution:
in_progress_status.update({
'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
'structuredOut': self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
'status': CommandStatus.in_progress
})
else:
in_progress_status.update({
'tmpout': self.tmpdir + os.sep + 'auto_output-' + str(taskId) + '.txt',
'tmperr': self.tmpdir + os.sep + 'auto_errors-' + str(taskId) + '.txt',
'structuredOut': self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
'status': CommandStatus.in_progress
})
self.commandStatuses.put_command_status(command, in_progress_status)
if 'commandParams' in command:
if 'max_duration_for_retries' in command['commandParams']:
retry_duration = int(command['commandParams']['max_duration_for_retries'])
if 'command_retry_enabled' in command['commandParams'] and command_type != AgentCommand.auto_execution:
# for AgentCommand.auto_execution command retry_able should be always false
retry_able = command['commandParams']['command_retry_enabled'] == "true"
if 'log_output' in command['commandParams']:
log_command_output = command['commandParams']['log_output'] != "false"
logger.info("Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec)"
" = {retryDuration}, log_output = {log_command_output}".format(
taskId=taskId, retryAble=retry_able, retryDuration=retry_duration, log_command_output=log_command_output))
self.cancelEvent.clear()
# for case of command reschedule (e.g. command and cancel for the same taskId are send at the same time)
self.taskIdsToCancel.discard(taskId)
while retry_duration >= 0:
if taskId in self.taskIdsToCancel:
logger.info('Command with taskId = {0} canceled'.format(taskId))
command_canceled = True
self.taskIdsToCancel.discard(taskId)
break
num_attempts += 1
start = 0
if retry_able:
start = int(time.time())
# running command
command_result = self.customServiceOrchestrator.runCommand(command,
in_progress_status['tmpout'],
in_progress_status['tmperr'],
override_output_files=num_attempts == 1,
retry=num_attempts > 1)
end = 1
if retry_able:
end = int(time.time())
retry_duration -= (end - start)
# dumping results
if command_type == AgentCommand.background_execution:
logger.info("Command is background command, quit retrying. Exit code: {exitCode}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}"
.format(cid=taskId, exitCode=command_result['exitcode'], retryAble=retry_able, retryDuration=retry_duration, delay=delay))
return
else:
if command_result['exitcode'] == 0:
status = CommandStatus.completed
else:
status = CommandStatus.failed
if (command_result['exitcode'] == -signal.SIGTERM) or (command_result['exitcode'] == -signal.SIGKILL):
logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId))
command_canceled = True
self.taskIdsToCancel.discard(taskId)
break
if status != CommandStatus.completed and retry_able and retry_duration > 0:
delay = self.get_retry_delay(delay)
if delay > retry_duration:
delay = retry_duration
retry_duration -= delay # allow one last attempt
command_result['stderr'] += "\n\nCommand failed. Retrying command execution ...\n\n"
logger.info("Retrying command with taskId = {cid} after a wait of {delay}".format(cid=taskId, delay=delay))
if 'agentLevelParams' not in command:
command['agentLevelParams'] = {}
command['agentLevelParams']['commandBeingRetried'] = "true"
self.cancelEvent.wait(delay) # wake up if something was canceled
continue
else:
logger.info("Quit retrying for command with taskId = {cid}. Status: {status}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}"
.format(cid=taskId, status=status, retryAble=retry_able, retryDuration=retry_duration, delay=delay))
break
self.taskIdsToCancel.discard(taskId)
# do not fail task which was rescheduled from server
if command_canceled:
with self.lock, self.commandQueue.mutex:
for com in self.commandQueue.queue:
if com['taskId'] == command['taskId']:
logger.info("Command with taskId = {cid} was rescheduled by server. "
"Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId))
self.commandStatuses.delete_command_data(command['taskId'])
return
# final result to stdout
command_result['stdout'] += '\n\nCommand completed successfully!\n' if status == CommandStatus.completed else '\n\nCommand failed after ' + str(num_attempts) + ' tries\n'
logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == CommandStatus.completed else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=num_attempts))
role_result = self.commandStatuses.generate_report_template(command)
role_result.update({
'stdout': command_result['stdout'],
'stderr': command_result['stderr'],
'exitCode': command_result['exitcode'],
'status': status,
})
if self.config.has_option("logging", "log_command_executes") \
and int(self.config.get("logging", "log_command_executes")) == 1 \
and log_command_output:
if role_result['stdout'] != '':
logger.info("Begin command output log for command with id = " + str(command['taskId']) + ", role = "
+ command['role'] + ", roleCommand = " + command['roleCommand'])
self.log_command_output(role_result['stdout'], str(command['taskId']))
logger.info("End command output log for command with id = " + str(command['taskId']) + ", role = "
+ command['role'] + ", roleCommand = " + command['roleCommand'])
if role_result['stderr'] != '':
logger.info("Begin command stderr log for command with id = " + str(command['taskId']) + ", role = "
+ command['role'] + ", roleCommand = " + command['roleCommand'])
self.log_command_output(role_result['stderr'], str(command['taskId']))
logger.info("End command stderr log for command with id = " + str(command['taskId']) + ", role = "
+ command['role'] + ", roleCommand = " + command['roleCommand'])
if role_result['stdout'] == '':
role_result['stdout'] = 'None'
if role_result['stderr'] == '':
role_result['stderr'] = 'None'
# let ambari know name of custom command
if 'commandParams' in command and command['commandParams'].has_key('custom_command'):
role_result['customCommand'] = command['commandParams']['custom_command']
if 'structuredOut' in command_result:
role_result['structuredOut'] = str(json.dumps(command_result['structuredOut']))
else:
role_result['structuredOut'] = ''
self.recovery_manager.process_execution_command_result(command, status)
self.commandStatuses.put_command_status(command, role_result)
cluster_id = str(command['clusterId'])
if cluster_id != '-1' and cluster_id != 'null':
service_name = command['serviceName']
if service_name != 'null':
component_name = command['role']
self.component_status_executor.check_component_status(cluster_id, service_name, component_name, "STATUS", report=True)
def log_command_output(self, text, taskId):
"""
Logs a message as multiple enumerated log messages every of which is not larger than MAX_SYMBOLS_PER_LOG_MESSAGE.
If logs are redirected to syslog (syslog_enabled=1), this is very useful for logging big messages.
As syslog usually truncates long messages.
"""
chunks = split_on_chunks(hide_passwords(text), MAX_SYMBOLS_PER_LOG_MESSAGE)
if len(chunks) > 1:
for i in range(len(chunks)):
logger.info("Cmd log for taskId={0} and chunk {1}/{2} of log for command: \n".format(taskId, i+1, len(chunks)) + chunks[i])
else:
logger.info("Cmd log for taskId={0}: ".format(taskId) + chunks[0])
def get_retry_delay(self, last_delay):
"""
Returns exponentially growing delay. The idea being if number of retries is high then the reason to retry
is probably a host or environment specific issue requiring longer waits
"""
return last_delay * 2
def on_background_command_complete_callback(self, process_condensed_result, handle):
logger.debug('Start callback: %s', process_condensed_result)
logger.debug('The handle is: %s', handle)
status = CommandStatus.completed if handle.exitCode == 0 else CommandStatus.failed
aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId'])
if aborted_postfix:
status = CommandStatus.failed
logger.debug('Set status to: %s , reason = %s', status, aborted_postfix)
else:
aborted_postfix = ''
role_result = self.commandStatuses.generate_report_template(handle.command)
role_result.update({
'stdout': process_condensed_result['stdout'] + aborted_postfix,
'stderr': process_condensed_result['stderr'] + aborted_postfix,
'exitCode': process_condensed_result['exitcode'],
'structuredOut': str(json.dumps(process_condensed_result['structuredOut'])) if 'structuredOut' in process_condensed_result else '',
'status': status,
})
self.commandStatuses.put_command_status(handle.command, role_result)
def reset(self):
with self.commandQueue.mutex:
self.commandQueue.queue.clear()