blob: bcbce9e0cb98fb338db0f43f7946067225cf729d [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import json
import logging
import threading
from Grep import Grep
import Constants
logger = logging.getLogger()
class CommandStatusDict():
"""
Holds results for all commands that are being executed or have finished
execution (but are not yet reported). Implementation is thread-safe.
Dict format:
task_id -> (command, cmd_report)
"""
def __init__(self, callback_action):
"""
callback_action is called every time when status of some command is
updated
"""
self.current_state = {} # Contains all statuses
self.callback_action = callback_action
self.lock = threading.RLock()
def put_command_status(self, command, new_report, wakeupController=True):
"""
Stores new version of report for command (replaces previous)
"""
if 'taskId' in command:
key = command['taskId']
else: # Status command reports has no task id
key = id(command)
with self.lock: # Synchronized
self.current_state[key] = (command, new_report)
# Usually, status commands are not reported immediately
if wakeupController:
self.callback_action()
def generate_report(self):
"""
Generates status reports about commands that are IN_PROGRESS, COMPLETE or
FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
generation
"""
from ActionQueue import ActionQueue
with self.lock: # Synchronized
resultReports = []
resultComponentStatus = []
for key, item in self.current_state.items():
command = item[0]
report = item[1]
if command ['commandType'] == ActionQueue.EXECUTION_COMMAND:
if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
resultReports.append(report)
# Removing complete/failed command status from dict
del self.current_state[key]
else:
in_progress_report = \
self.generate_in_progress_report(command, report)
resultReports.append(in_progress_report)
elif command ['commandType'] == ActionQueue.STATUS_COMMAND:
resultComponentStatus.append(report)
# Component status is useful once, removing it
del self.current_state[key]
result = {
'reports': resultReports,
'componentStatus': resultComponentStatus
}
return result
def generate_in_progress_report(self, command, report):
"""
Reads stdout/stderr for IN_PROGRESS command from disk file
and populates other fields of report.
"""
from ActionQueue import ActionQueue
try:
tmpout = open(report['tmpout'], 'r').read()
tmperr = open(report['tmperr'], 'r').read()
except Exception, err:
logger.warn(err)
tmpout = '...'
tmperr = '...'
try:
with open(report['structuredOut'], 'r') as fp:
tmpstructuredout = json.load(fp)
except Exception:
tmpstructuredout = '{}'
grep = Grep()
output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
inprogress = self.generate_report_template(command)
reportResult = CommandStatusDict.shouldReportResult(command)
inprogress.update({
'stdout': grep.filterMarkup(output),
'stderr': tmperr,
'structuredOut': tmpstructuredout,
Constants.EXIT_CODE: 777,
'status': ActionQueue.IN_PROGRESS_STATUS,
'reportResult': reportResult
})
return inprogress
def generate_report_template(self, command):
"""
Generates stub dict for command.
Other fields should be populated manually
"""
stub = {
'role': command['role'],
'actionId': command['commandId'],
'taskId': command['taskId'],
'clusterName': command['clusterName'],
'serviceName': command['serviceName'],
'roleCommand': command['roleCommand']
}
return stub
@staticmethod
def shouldReportResult(command):
return not (Constants.AUTO_GENERATED in command and command[Constants.AUTO_GENERATED])