blob: 778978e4e90e5a2d700699321868ad854e02e615 [file] [log] [blame]
#!/usr/bin/env python
import json
import os
import signal
import sys
import urllib2
from optparse import OptionParser
from urllib2 import HTTPError
from mesos import http
from mesos.cli import *
from mesos.futures import *
if sys.version_info < (2,6,0):
fatal('Expecting Python >= 2.6')
def read(slave, task, file):
framework_id = task['framework_id']
executor_id = task['executor_id']
# An executorless task has an empty executor ID in the master but
# uses the same executor ID as task ID in the slave.
if executor_id == '': executor_id = task['id']
# Get 'state' json to determine the executor directory.
try:
state = json.loads(http.get(slave['pid'], '/state'))
except:
fatal('Failed to get state from slave')
directory = None
for framework in state['frameworks']:
if framework['id'] == framework_id:
for executor in framework['executors']:
if executor['id'] == executor_id:
directory = executor['directory']
break
for executor in framework['completed_executors']:
if executor['id'] == executor_id:
directory = executor['directory']
break
for framework in state['completed_frameworks']:
if framework['id'] == framework_id:
for executor in framework['completed_executors']:
if executor['id'] == executor_id:
directory = executor['directory']
break
if directory is None:
fatal('File not found')
path = os.path.join(directory, file)
# Determine the current length of the file.
try:
result = json.loads(http.get(
slave['pid'],
'/files/read',
{'path': path,
'offset': -1}))
except HTTPError as error:
if error.code == 404:
fatal('No such file or directory')
else:
fatal('Failed to determine length of file')
length = result['offset']
# Start streaming "pages" up to length.
PAGE_LENGTH = 1024
offset = 0
while True:
try:
result = json.loads(http.get(
slave['pid'],
'/files/read',
{'path': path,
'offset': offset,
'length': PAGE_LENGTH}))
offset += len(result['data'])
yield result['data']
if offset == length:
return
except:
fatal('Failed to read file from slave')
def main():
# Parse options for this script.
parser = OptionParser()
parser.add_option('--master')
parser.add_option('--framework')
parser.add_option('--task')
parser.add_option('--file')
(options, args) = parser.parse_args(sys.argv)
if options.master is None:
usage('Missing --master', parser)
if options.framework is None:
usage('Missing --framework', parser)
if options.task is None:
usage('Missing --task', parser)
if options.file is None:
usage('Missing --file', parser)
# Get master info.
try:
master = resolve(options.master)
except Exception as e:
fatal('Failed to get the master: %s' % str(e))
# Get the master's state.
try:
state = json.loads(http.get(master, '/master/state'))
except Exception as e:
fatal('Failed to get the master state: %s' % str(e))
# Build a dict from slave ID to slaves.
slaves = {}
for slave in state['slaves']:
slaves[slave['id']] = slave
def cat(slave, task):
for data in read(slave, task, options.file):
sys.stdout.write(data)
for framework in state['frameworks']:
if framework['id'] == options.framework:
for task in framework['tasks']:
if task['id'] == options.task:
cat(slaves[task['slave_id']], task)
sys.exit(0)
for task in framework['completed_tasks']:
if task['id'] == options.task:
cat(slaves[task['slave_id']], task)
sys.exit(0)
for framework in state['completed_frameworks']:
if framework['id'] == options.framework:
for task in framework['completed_tasks']:
if task['id'] == options.task:
cat(slaves[task['slave_id']], task)
sys.exit(0)
fatal('No task found!')
if __name__ == '__main__':
def signal_handler(signal, frame):
sys.stdout.write('\n')
sys.exit(130)
signal.signal(signal.SIGINT, signal_handler)
main()