blob: 627795c41005003f0cb89f0ba3a888fce3a037cc [file] [log] [blame]
#!/usr/bin/env python
import datetime
import json
import signal
import sys
from optparse import OptionParser
from mesos import http
from mesos.cli import *
from mesos.futures import *
if sys.version_info < (2,6,0):
fatal('Expecting Python >= 2.6')
USER_COLUMN_PADDING = 4
FRAMEWORK_COLUMN_PADDING = 4
TASK_COLUMN_PADDING = 6
SLAVE_COLUMN_PADDING = 14
MEM_COLUMN_PADDING = 16
TIME_COLUMN_PADDING = 14
# Defines the column structure for printing to the terminal.
class Column:
def __init__(self, title, padding):
self.title = title
self.padding = padding
def width(self):
return len(self.title) + self.padding
def truncate(self, text):
if text is None:
return ' ' * self.width()
text = str(text)
# If 'text' is less than the width then add spaces.
# Otherwise, abbreviate and add a space.
if len(text) < self.width():
spaces = ' ' * (self.width() - len(text))
text += spaces
else:
text = text[:self.width() - 4]
text += '... '
return text
# Helper for formatting the CPU column for a task.
# TODO(everpeace): Support to display not only CPU limit but CPU usage.
def cpus(task, statistics):
if statistics is None:
return None
framework_id = task['framework_id']
executor_id = task['executor_id']
# An executorless task has an empty executor ID in the master but
# uses the same executor ID as task ID in the slave.
if executor_id == '': executor_id = task['id']
cpus_limit = None
for entry in statistics:
if (entry['framework_id'] == framework_id and
entry['executor_id'] == executor_id):
cpus_limit = entry['statistics'].get('cpus_limit', None)
break
if cpus_limit is not None:
return str(cpus_limit)
return None
# Helper for formatting the MEM column for a task.
def mem(task, statistics):
if statistics is None:
return None
framework_id = task['framework_id']
executor_id = task['executor_id']
# An executorless task has an empty executor ID in the master but
# uses the same executor ID as task ID in the slave.
if executor_id == '': executor_id = task['id']
mem_rss_bytes = None
mem_limit_bytes = None
for entry in statistics:
if (entry['framework_id'] == framework_id and
entry['executor_id'] == executor_id):
mem_rss_bytes = entry['statistics'].get('mem_rss_bytes', None)
mem_limit_bytes = entry['statistics'].get('mem_limit_bytes', None)
break
if mem_rss_bytes is not None and mem_limit_bytes is not None:
MB = 1024.0 * 1024.0
return '{usage}/{limit}' \
.format(usage = data_size(mem_rss_bytes, "%.1f"),
limit = data_size(mem_limit_bytes, "%.1f"))
return None
def data_size(bytes, format):
# Ensure bytes is treated as floating point for the math below.
bytes = float(bytes)
if bytes < 1024:
return (format % bytes) + ' B'
elif bytes < (1024 * 1024):
return (format % (bytes / 1024)) + ' KB'
elif bytes < (1024 * 1024 * 1024):
return (format % (bytes / (1024 * 1024))) + ' MB'
else:
return (format % (bytes / (1024 * 1024 * 1024))) + ' GB'
# Helper for formatting the TIME column for a task.
def time(task, statistics):
if statistics is None:
return None
framework_id = task['framework_id']
executor_id = task['executor_id']
# An executorless task has an empty executor ID in the master but
# uses the same executor ID as task ID in the slave.
if executor_id == '': executor_id = task['id']
cpus_time_secs = None
cpus_system_time_secs = None
cpus_user_time_secs = None
for entry in statistics:
if (entry['framework_id'] == framework_id and
entry['executor_id'] == executor_id):
cpus_system_time_secs = entry['statistics'].get('cpus_system_time_secs', None)
cpus_user_time_secs = entry['statistics'].get('cpus_user_time_secs', None)
break
if cpus_system_time_secs is not None and cpus_user_time_secs is not None:
return (datetime.datetime
.utcfromtimestamp(cpus_system_time_secs + cpus_user_time_secs)
.strftime('%H:%M:%S.%f'))
return None
def main():
# Parse options for this script.
parser = OptionParser()
parser.add_option('--master')
parser.add_option('--timeout', default=5.0)
parser.add_option('--verbose', default=False)
(options, args) = parser.parse_args(sys.argv)
if options.master is None:
usage('Missing --master', parser)
try:
timeout = float(options.timeout)
except:
fatal('Expecting --timeout to be a floating point number')
# Get master info.
try:
master = resolve(options.master)
except Exception as e:
fatal('Failed to get the master: %s' % str(e))
# Get the master's state.
try:
state = json.loads(http.get(master, '/master/state'))
except Exception as e:
fatal('Failed to get the master state: %s' % str(e))
# Collect all the active frameworks and tasks by slave ID.
active = {}
for framework in state['frameworks']:
for task in framework['tasks']:
if task['slave_id'] not in active.keys():
active[task['slave_id']] = []
active[task['slave_id']].append((framework, task))
# Now set up the columns.
columns = {}
columns[0] = Column('USER', USER_COLUMN_PADDING)
columns[1] = Column('FRAMEWORK', FRAMEWORK_COLUMN_PADDING)
columns[2] = Column('TASK', TASK_COLUMN_PADDING)
columns[3] = Column('SLAVE', SLAVE_COLUMN_PADDING)
columns[4] = Column('MEM', MEM_COLUMN_PADDING)
columns[5] = Column('TIME', TIME_COLUMN_PADDING)
columns[6] = Column('CPU (allocated)', 0) # padding is not needed for right most column
# Output the header.
for i in columns:
sys.stdout.write(columns[i].title)
sys.stdout.write(' ' * columns[i].padding)
with ThreadingExecutor() as executor:
# Grab all the slaves with active tasks.
slaves = [slave for slave in state['slaves'] if slave['id'] in active]
# Now submit calls to get the statistics for each slave.
path = '/monitor/statistics'
futures = dict((executor.submit(http.get, slave['pid'], path), slave)
for slave in slaves)
# And wait for each future to complete!
for future in as_completed(futures, timeout):
slave = futures[future]
try:
statistics = json.loads(future.result())
except TimeoutError:
fatal('Timed out while waiting for slaves')
except Exception as e:
# TODO(benh): Print error if 'verbose'.
pass
finally:
for framework, task in active[slave['id']]:
sys.stdout.write('\n')
sys.stdout.write(columns[0].truncate(framework['user']))
sys.stdout.write(columns[1].truncate(framework['name']))
sys.stdout.write(columns[2].truncate(task['name']))
sys.stdout.write(columns[3].truncate(slave['hostname']))
sys.stdout.write(columns[4].truncate(mem(task, statistics)))
sys.stdout.write(columns[5].truncate(time(task, statistics)))
sys.stdout.write(columns[6].truncate(cpus(task, statistics)))
sys.stdout.write('\n')
sys.exit(0)
if __name__ == '__main__':
def handler(signal, frame):
sys.stdout.write('\n')
sys.exit(130)
signal.signal(signal.SIGINT, handler)
main()