| #!/usr/bin/env python |
| |
| import datetime |
| import json |
| import signal |
| import sys |
| |
| from optparse import OptionParser |
| |
| from mesos import http |
| from mesos.cli import * |
| from mesos.futures import * |
| |
| |
| if sys.version_info < (2,6,0): |
| fatal('Expecting Python >= 2.6') |
| |
| |
| USER_COLUMN_PADDING = 4 |
| FRAMEWORK_COLUMN_PADDING = 4 |
| TASK_COLUMN_PADDING = 6 |
| SLAVE_COLUMN_PADDING = 14 |
| MEM_COLUMN_PADDING = 16 |
| TIME_COLUMN_PADDING = 14 |
| |
| # Defines the column structure for printing to the terminal. |
| class Column: |
| def __init__(self, title, padding): |
| self.title = title |
| self.padding = padding |
| |
| def width(self): |
| return len(self.title) + self.padding |
| |
| def truncate(self, text): |
| if text is None: |
| return ' ' * self.width() |
| |
| text = str(text) |
| |
| # If 'text' is less than the width then add spaces. |
| # Otherwise, abbreviate and add a space. |
| if len(text) < self.width(): |
| spaces = ' ' * (self.width() - len(text)) |
| text += spaces |
| else: |
| text = text[:self.width() - 4] |
| text += '... ' |
| return text |
| |
| # Helper for formatting the CPU column for a task. |
| # TODO(everpeace): Support to display not only CPU limit but CPU usage. |
| def cpus(task, statistics): |
| if statistics is None: |
| return None |
| |
| framework_id = task['framework_id'] |
| executor_id = task['executor_id'] |
| |
| # An executorless task has an empty executor ID in the master but |
| # uses the same executor ID as task ID in the slave. |
| if executor_id == '': executor_id = task['id'] |
| |
| cpus_limit = None |
| for entry in statistics: |
| if (entry['framework_id'] == framework_id and |
| entry['executor_id'] == executor_id): |
| cpus_limit = entry['statistics'].get('cpus_limit', None) |
| break |
| |
| if cpus_limit is not None: |
| return str(cpus_limit) |
| |
| return None |
| |
| # Helper for formatting the MEM column for a task. |
| def mem(task, statistics): |
| if statistics is None: |
| return None |
| |
| framework_id = task['framework_id'] |
| executor_id = task['executor_id'] |
| |
| # An executorless task has an empty executor ID in the master but |
| # uses the same executor ID as task ID in the slave. |
| if executor_id == '': executor_id = task['id'] |
| |
| mem_rss_bytes = None |
| mem_limit_bytes = None |
| for entry in statistics: |
| if (entry['framework_id'] == framework_id and |
| entry['executor_id'] == executor_id): |
| mem_rss_bytes = entry['statistics'].get('mem_rss_bytes', None) |
| mem_limit_bytes = entry['statistics'].get('mem_limit_bytes', None) |
| break |
| |
| if mem_rss_bytes is not None and mem_limit_bytes is not None: |
| MB = 1024.0 * 1024.0 |
| return '{usage}/{limit}' \ |
| .format(usage = data_size(mem_rss_bytes, "%.1f"), |
| limit = data_size(mem_limit_bytes, "%.1f")) |
| |
| return None |
| |
| def data_size(bytes, format): |
| # Ensure bytes is treated as floating point for the math below. |
| bytes = float(bytes) |
| |
| if bytes < 1024: |
| return (format % bytes) + ' B' |
| elif bytes < (1024 * 1024): |
| return (format % (bytes / 1024)) + ' KB' |
| elif bytes < (1024 * 1024 * 1024): |
| return (format % (bytes / (1024 * 1024))) + ' MB' |
| else: |
| return (format % (bytes / (1024 * 1024 * 1024))) + ' GB' |
| |
| |
| # Helper for formatting the TIME column for a task. |
| def time(task, statistics): |
| if statistics is None: |
| return None |
| |
| framework_id = task['framework_id'] |
| executor_id = task['executor_id'] |
| |
| # An executorless task has an empty executor ID in the master but |
| # uses the same executor ID as task ID in the slave. |
| if executor_id == '': executor_id = task['id'] |
| |
| cpus_time_secs = None |
| for entry in statistics: |
| if (entry['framework_id'] == framework_id and |
| entry['executor_id'] == executor_id): |
| cpus_system_time_secs = entry['statistics'].get('cpus_system_time_secs', None) |
| cpus_user_time_secs = entry['statistics'].get('cpus_user_time_secs', None) |
| break |
| |
| if cpus_system_time_secs is not None and cpus_user_time_secs is not None: |
| return (datetime.datetime |
| .utcfromtimestamp(cpus_system_time_secs + cpus_user_time_secs) |
| .strftime('%H:%M:%S.%f')) |
| |
| return None |
| |
| |
| def main(): |
| # Parse options for this script. |
| parser = OptionParser() |
| parser.add_option('--master') |
| parser.add_option('--timeout', default=5.0) |
| parser.add_option('--verbose', default=False) |
| (options, args) = parser.parse_args(sys.argv) |
| |
| if options.master is None: |
| usage('Missing --master', parser) |
| |
| try: |
| timeout = float(options.timeout) |
| except: |
| fatal('Expecting --timeout to be a floating point number') |
| |
| # Get the master's state. |
| try: |
| state = json.loads(http.get(resolve(options.master), |
| '/master/state')) |
| except: |
| fatal('Failed to get the master state') |
| |
| # Collect all the active frameworks and tasks by slave ID. |
| active = {} |
| for framework in state['frameworks']: |
| for task in framework['tasks']: |
| if task['slave_id'] not in active.keys(): |
| active[task['slave_id']] = [] |
| active[task['slave_id']].append((framework, task)) |
| |
| # Now set up the columns. |
| columns = {} |
| |
| columns[0] = Column('USER', USER_COLUMN_PADDING) |
| columns[1] = Column('FRAMEWORK', FRAMEWORK_COLUMN_PADDING) |
| columns[2] = Column('TASK', TASK_COLUMN_PADDING) |
| columns[3] = Column('SLAVE', SLAVE_COLUMN_PADDING) |
| columns[4] = Column('MEM', MEM_COLUMN_PADDING) |
| columns[5] = Column('TIME', TIME_COLUMN_PADDING) |
| columns[6] = Column('CPU (allocated)', 0) # padding is not needed for right most column |
| |
| # Output the header. |
| for i in columns: |
| sys.stdout.write(columns[i].title) |
| sys.stdout.write(' ' * columns[i].padding) |
| |
| with ThreadingExecutor() as executor: |
| # Grab all the slaves with active tasks. |
| slaves = [slave for slave in state['slaves'] if slave['id'] in active] |
| |
| # Now submit calls to get the statistics for each slave. |
| path = '/monitor/statistics' |
| futures = dict((executor.submit(http.get, slave['pid'], path), slave) |
| for slave in slaves) |
| |
| # And wait for each future to complete! |
| for future in as_completed(futures, timeout): |
| slave = futures[future] |
| try: |
| statistics = json.loads(future.result()) |
| except TimeoutError: |
| fatal('Timed out while waiting for slaves') |
| except Exception as e: |
| # TODO(benh): Print error if 'verbose'. |
| pass |
| finally: |
| for framework, task in active[slave['id']]: |
| sys.stdout.write('\n') |
| sys.stdout.write(columns[0].truncate(framework['user'])) |
| sys.stdout.write(columns[1].truncate(framework['name'])) |
| sys.stdout.write(columns[2].truncate(task['name'])) |
| sys.stdout.write(columns[3].truncate(slave['hostname'])) |
| sys.stdout.write(columns[4].truncate(mem(task, statistics))) |
| sys.stdout.write(columns[5].truncate(time(task, statistics))) |
| sys.stdout.write(columns[6].truncate(cpus(task, statistics))) |
| |
| sys.stdout.write('\n') |
| sys.exit(0) |
| |
| |
| if __name__ == '__main__': |
| def handler(signal, frame): |
| sys.stdout.write('\n') |
| sys.exit(130) |
| |
| signal.signal(signal.SIGINT, handler) |
| |
| main() |