| #!/usr/bin/env python |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| |
| import json |
| import logging |
| import threading |
| from Grep import Grep |
| import Constants |
| |
| logger = logging.getLogger() |
| |
| class CommandStatusDict(): |
| """ |
| Holds results for all commands that are being executed or have finished |
| execution (but are not yet reported). Implementation is thread-safe. |
| Dict format: |
| task_id -> (command, cmd_report) |
| """ |
| |
| def __init__(self, callback_action): |
| """ |
| callback_action is called every time when status of some command is |
| updated |
| """ |
| self.current_state = {} # Contains all statuses |
| self.callback_action = callback_action |
| self.lock = threading.RLock() |
| |
| |
| def put_command_status(self, command, new_report, wakeupController=True): |
| """ |
| Stores new version of report for command (replaces previous) |
| """ |
| if 'taskId' in command: |
| key = command['taskId'] |
| else: # Status command reports has no task id |
| key = id(command) |
| with self.lock: # Synchronized |
| self.current_state[key] = (command, new_report) |
| |
| # Usually, status commands are not reported immediately |
| if wakeupController: |
| self.callback_action() |
| |
| |
| def generate_report(self): |
| """ |
| Generates status reports about commands that are IN_PROGRESS, COMPLETE or |
| FAILED. Statuses for COMPLETE or FAILED commands are forgotten after |
| generation |
| """ |
| from ActionQueue import ActionQueue |
| with self.lock: # Synchronized |
| resultReports = [] |
| resultComponentStatus = [] |
| for key, item in self.current_state.items(): |
| command = item[0] |
| report = item[1] |
| if command ['commandType'] == ActionQueue.EXECUTION_COMMAND: |
| if (report['status']) != ActionQueue.IN_PROGRESS_STATUS: |
| resultReports.append(report) |
| # Removing complete/failed command status from dict |
| del self.current_state[key] |
| else: |
| in_progress_report = \ |
| self.generate_in_progress_report(command, report) |
| resultReports.append(in_progress_report) |
| elif command ['commandType'] == ActionQueue.STATUS_COMMAND: |
| resultComponentStatus.append(report) |
| # Component status is useful once, removing it |
| del self.current_state[key] |
| result = { |
| 'reports': resultReports, |
| 'componentStatus': resultComponentStatus |
| } |
| return result |
| |
| |
| def generate_in_progress_report(self, command, report): |
| """ |
| Reads stdout/stderr for IN_PROGRESS command from disk file |
| and populates other fields of report. |
| """ |
| from ActionQueue import ActionQueue |
| try: |
| tmpout = open(report['tmpout'], 'r').read() |
| tmperr = open(report['tmperr'], 'r').read() |
| except Exception, err: |
| logger.warn(err) |
| tmpout = '...' |
| tmperr = '...' |
| try: |
| with open(report['structuredOut'], 'r') as fp: |
| tmpstructuredout = json.load(fp) |
| except Exception: |
| tmpstructuredout = '{}' |
| grep = Grep() |
| output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES) |
| inprogress = self.generate_report_template(command) |
| reportResult = CommandStatusDict.shouldReportResult(command) |
| inprogress.update({ |
| 'stdout': grep.filterMarkup(output), |
| 'stderr': tmperr, |
| 'structuredOut': tmpstructuredout, |
| Constants.EXIT_CODE: 777, |
| 'status': ActionQueue.IN_PROGRESS_STATUS, |
| 'reportResult': reportResult |
| }) |
| return inprogress |
| |
| |
| def generate_report_template(self, command): |
| """ |
| Generates stub dict for command. |
| Other fields should be populated manually |
| """ |
| stub = { |
| 'role': command['role'], |
| 'actionId': command['commandId'], |
| 'taskId': command['taskId'], |
| 'clusterName': command['clusterName'], |
| 'serviceName': command['serviceName'], |
| 'roleCommand': command['roleCommand'] |
| } |
| return stub |
| |
| |
| @staticmethod |
| def shouldReportResult(command): |
| return not (Constants.AUTO_GENERATED in command and command[Constants.AUTO_GENERATED]) |