| #!/usr/bin/env python |
| # |
| # Very bare bones shell for driving a Livy session. Usage: |
| # |
| # livy-shell url [option=value ...] |
| # |
| # Options are set directly in the session creation request, so they must match the names of fields |
| # in the CreateInteractiveRequest structure. Option values should be python-like objects (should be |
| # parseable by python's "eval" function; naked strings are allowed). For example: |
| # |
| # kind=spark |
| # jars=[ '/foo.jar', '/bar.jar' ] |
| # conf={ foo : bar, 'spark.option' : opt_value } |
| # |
| # By default, a Spark (Scala) session is created. |
| # |
| |
| import httplib |
| import json |
| import readline |
| import sys |
| import time |
| import urlparse |
| |
| def check(condition, msg, *args): |
| if not condition: |
| if args: |
| msg = msg % args |
| print >> sys.stderr, msg |
| sys.exit(1) |
| |
| |
| def message(msg, *args): |
| if args: |
| msg = msg % args |
| print msg |
| |
| |
| class LiteralDict(dict): |
| def __missing__(self, name): |
| return name |
| |
| |
| def request(conn, method, uri, body): |
| body = json.dumps(body) if body else None |
| headers = { 'Content-Type' : 'application/json' } |
| conn.request(method, uri, body=body, headers=headers) |
| |
| resp = conn.getresponse() |
| data = resp.read() |
| if resp.status < 200 or resp.status >= 400: |
| raise httplib.HTTPException, (resp.status, resp.reason, data) |
| if resp.status < 300 and resp.status != httplib.NO_CONTENT: |
| return json.loads(data) |
| return None |
| |
| |
| def get(conn, uri): |
| return request(conn, 'GET', uri, None) |
| |
| |
| def post(conn, uri, body): |
| return request(conn, 'POST', uri, body) |
| |
| |
| def delete(conn, uri): |
| return request(conn, 'DELETE', uri, None) |
| |
| |
| def create_session(conn): |
| request = { |
| "kind" : "spark" |
| } |
| for opt in sys.argv[2:]: |
| check(opt.find('=') > 0, "Invalid option: %s.", opt) |
| key, value = opt.split('=', 1) |
| request[key] = eval(value, LiteralDict()) |
| |
| return post(conn, "/sessions", request) |
| |
| |
| def wait_for_idle(sid): |
| session = get(conn, "/sessions/%d" % (sid, )) |
| while session['state'] == 'starting': |
| message("Session not ready yet (%s)", session['state']) |
| time.sleep(5) |
| session = get(conn, "/sessions/%d" % (sid, )) |
| |
| if session['state'] != 'idle': |
| raise Exception, "Session failed to start." |
| |
| |
| def monitor_statement(conn, sid, s): |
| cnt = 0 |
| while True: |
| state = s['state'] |
| if state == 'available': |
| output = s['output'] |
| status = output['status'] |
| if status == 'ok': |
| result = output['data'] |
| text = result.get('text/plain', None) |
| if text is None: |
| message("Success (non-text result).") |
| elif text.rstrip(): |
| message("%s", text) |
| elif status == 'error': |
| ename = output['ename'] |
| evalue = output['evalue'] |
| traceback = "\n".join(output.get('traceback', [])) |
| message("%s: %s", ename, evalue) |
| if traceback: |
| message("%s", traceback) |
| else: |
| message("Statement finished with unknown status '%s'.", status) |
| break |
| elif state == 'error': |
| message("%s", s['error']) |
| break |
| else: |
| if cnt == 10: |
| message("(waiting for result...)") |
| cnt = 0 |
| else: |
| cnt += 1 |
| time.sleep(1) |
| s = get(conn, "/sessions/%d/statements/%s" % (sid, s['id'])) |
| |
| |
| def run_shell(conn, sid): |
| while True: |
| cmd = raw_input('> ') |
| if cmd == "quit()": |
| break |
| |
| statement = post(conn, "/sessions/%d/statements" % (sid, ), { 'code' : cmd }) |
| monitor_statement(conn, sid, statement) |
| |
| def open_connection(url): |
| if url.scheme == "https": |
| return httplib.HTTPSConnection(url.netloc) |
| else: |
| return httplib.HTTPConnection(url.netloc) |
| |
| # |
| # main() |
| # |
| |
| check(len(sys.argv) > 1, "Missing arguments.") |
| |
| url = urlparse.urlparse(sys.argv[1]) |
| conn = open_connection(url) |
| |
| sid = -1 |
| try: |
| message("Creating new session...") |
| session = create_session(conn) |
| sid = int(session['id']) |
| message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind']) |
| wait_for_idle(sid) |
| message("Session ready.") |
| run_shell(conn,sid) |
| except EOFError: |
| pass |
| finally: |
| conn.close() |
| if sid != -1: |
| conn = open_connection(url) |
| try: |
| delete(conn, "/sessions/%d" % (sid, )) |
| finally: |
| conn.close() |