| #!/usr/bin/env python |
| |
| # Uses 'scp' to copy local files to all slaves reported by the master. |
| |
| import json |
| import signal |
| import subprocess |
| import sys |
| |
| from optparse import OptionParser |
| |
| from mesos import http |
| from mesos.cli import * |
| from mesos.futures import * |
| |
| |
| if sys.version_info < (2,6,0): |
| fatal('Expecting Python >= 2.6') |
| |
| |
| def scp(host, src, dst): |
| cmd = 'scp -pr %s %s' % (src, host + ':' + dst) |
| try: |
| process = subprocess.Popen( |
| cmd, |
| stdin=None, |
| stdout=None, |
| stderr=None, |
| shell=True) |
| return process.wait() == 0 |
| except Exception as e: |
| sys.stderr.write('Exception %s when doing %s\n' % (e, cmd)) |
| return False |
| |
| |
| def main(): |
| # Parse options for this script. |
| parser = OptionParser() |
| parser.add_option('--master') |
| parser.usage = '%prog [options] local-file(s) remote-directory' |
| parser.epilog = ('This command uploads the specifeid local file(s) ' |
| 'to a remote directory on all slaves known by the ' |
| 'master. The current implementation assumes ' |
| 'passwordless scp') |
| (options, args) = parser.parse_args(sys.argv) |
| |
| if options.master is None: |
| usage('Missing --master', parser) |
| |
| # Get master info. |
| try: |
| master = resolve(options.master) |
| except Exception as e: |
| fatal('Failed to get the master: %s' % str(e)) |
| |
| # Get the master's state. |
| try: |
| state = json.loads(http.get(master, '/master/state')) |
| except Exception as e: |
| fatal('Failed to get the master state: %s' % str(e)) |
| |
| # all slaves that the master is aware of |
| slaves = set(slave['hostname'] for slave in state['slaves']) |
| |
| if len(args) < 3: |
| usage('Missing arguments', parser) |
| |
| # All arguments after args[0] until the last argument are the |
| # local files. |
| src = " ".join(args[1:-1]) |
| |
| # Remote directory is the very last argument. |
| dst = args[-1] |
| |
| success = set() |
| with ThreadingExecutor() as executor: |
| futures = dict((executor.submit(scp, slave, src, dst), slave) |
| for slave in slaves) |
| for future in as_completed(futures): |
| slave = futures[future] |
| try: |
| status = future.result() |
| if status: |
| success.add(slave) |
| except Exception as e: |
| sys.stderr.write('Failed to copy to %s: %s\n' % (slave, e)) |
| |
| print |
| |
| for slave in success: |
| print '%s\t%s' % (slave, 'uploaded') |
| |
| print |
| |
| failed = slaves - success |
| for slave in failed: |
| print '%s\t%s' % (slave, 'failed') |
| |
| print |
| |
| print ('----- %d uploaded, %d failed of total %d slaves' |
| % (len(success), len(failed), len(slaves))) |
| |
| print |
| |
| |
| if __name__ == '__main__': |
| def handler(signal, frame): |
| sys.stdout.write('\n') |
| sys.exit(130) |
| |
| signal.signal(signal.SIGINT, handler) |
| |
| main() |