| #!/usr/bin/env python |
| |
| import json |
| import os |
| import signal |
| import sys |
| import urllib2 |
| |
| from optparse import OptionParser |
| from urllib2 import HTTPError |
| |
| from mesos import http |
| from mesos.cli import * |
| from mesos.futures import * |
| |
| |
| if sys.version_info < (2,6,0): |
| fatal('Expecting Python >= 2.6') |
| |
| |
| def read(slave, task, file): |
| framework_id = task['framework_id'] |
| executor_id = task['executor_id'] |
| |
| # An executorless task has an empty executor ID in the master but |
| # uses the same executor ID as task ID in the slave. |
| if executor_id == '': executor_id = task['id'] |
| |
| # Get 'state' json to determine the executor directory. |
| try: |
| state = json.loads(http.get(slave['pid'], '/state')) |
| except: |
| fatal('Failed to get state from slave') |
| |
| directory = None |
| |
| for framework in state['frameworks']: |
| if framework['id'] == framework_id: |
| for executor in framework['executors']: |
| if executor['id'] == executor_id: |
| directory = executor['directory'] |
| break |
| for executor in framework['completed_executors']: |
| if executor['id'] == executor_id: |
| directory = executor['directory'] |
| break |
| |
| for framework in state['completed_frameworks']: |
| if framework['id'] == framework_id: |
| for executor in framework['completed_executors']: |
| if executor['id'] == executor_id: |
| directory = executor['directory'] |
| break |
| |
| if directory is None: |
| fatal('File not found') |
| |
| path = os.path.join(directory, file) |
| |
| # Determine the current length of the file. |
| try: |
| result = json.loads(http.get( |
| slave['pid'], |
| '/files/read', |
| {'path': path, |
| 'offset': -1})) |
| except HTTPError as error: |
| if error.code == 404: |
| fatal('No such file or directory') |
| else: |
| fatal('Failed to determine length of file') |
| |
| length = result['offset'] |
| |
| # Start streaming "pages" up to length. |
| PAGE_LENGTH = 1024 |
| offset = 0 |
| |
| while True: |
| try: |
| result = json.loads(http.get( |
| slave['pid'], |
| '/files/read', |
| {'path': path, |
| 'offset': offset, |
| 'length': PAGE_LENGTH})) |
| offset += len(result['data']) |
| yield result['data'] |
| if offset == length: |
| return |
| except: |
| fatal('Failed to read file from slave') |
| |
| |
| def main(): |
| # Parse options for this script. |
| parser = OptionParser() |
| parser.add_option('--master') |
| parser.add_option('--framework') |
| parser.add_option('--task') |
| parser.add_option('--file') |
| (options, args) = parser.parse_args(sys.argv) |
| |
| if options.master is None: |
| usage('Missing --master', parser) |
| |
| if options.framework is None: |
| usage('Missing --framework', parser) |
| |
| if options.task is None: |
| usage('Missing --task', parser) |
| |
| if options.file is None: |
| usage('Missing --file', parser) |
| |
| # Get master info. |
| try: |
| master = resolve(options.master) |
| except Exception as e: |
| fatal('Failed to get the master: %s' % str(e)) |
| |
| # Get the master's state. |
| try: |
| state = json.loads(http.get(master, '/master/state')) |
| except Exception as e: |
| fatal('Failed to get the master state: %s' % str(e)) |
| |
| # Build a dict from slave ID to slaves. |
| slaves = {} |
| for slave in state['slaves']: |
| slaves[slave['id']] = slave |
| |
| def cat(slave, task): |
| for data in read(slave, task, options.file): |
| sys.stdout.write(data) |
| |
| for framework in state['frameworks']: |
| if framework['id'] == options.framework: |
| for task in framework['tasks']: |
| if task['id'] == options.task: |
| cat(slaves[task['slave_id']], task) |
| sys.exit(0) |
| |
| for task in framework['completed_tasks']: |
| if task['id'] == options.task: |
| cat(slaves[task['slave_id']], task) |
| sys.exit(0) |
| |
| for framework in state['completed_frameworks']: |
| if framework['id'] == options.framework: |
| for task in framework['completed_tasks']: |
| if task['id'] == options.task: |
| cat(slaves[task['slave_id']], task) |
| sys.exit(0) |
| |
| fatal('No task found!') |
| |
| |
| if __name__ == '__main__': |
| def signal_handler(signal, frame): |
| sys.stdout.write('\n') |
| sys.exit(130) |
| |
| signal.signal(signal.SIGINT, signal_handler) |
| |
| main() |