[LIVY-409] Livy shell UX improvements
I'm not sure how large of an audience the livy-shell has, but I've come to use it quite a bit in my testing/use of Livy and added a few small improvements that have made it a bit easier to use for me (and may help others):
* Replaced the use of httplib with requests to fix dropped connection issues (and a few other robustness issues).
* Added some context to the REPL prompt, by including the session `kind` and ID.
* Ignore `SIGINT` signal so that users can type `CTRL-C` to cancel out a command.
I know the use of `requests` is pretty ubiquitous but I included `livy-shell-requirements.txt` just in case 1) someone doesn't have it installed or 2) more dependencies are added down the road.
Author: Eric Perry <eric@ericjperry.com>
Closes #55 from ericjperry/feature/livy-shell-improvements.
diff --git a/dev/livy-shell b/dev/livy-shell
index 0e00cf8..06c631c 100755
--- a/dev/livy-shell
+++ b/dev/livy-shell
@@ -30,13 +30,18 @@
# By default, a Spark (Scala) session is created.
#
-import httplib
import json
import readline
+import signal
import sys
import time
import urlparse
+
+class ControlCInterrupt(Exception):
+ pass
+
+
def check(condition, msg, *args):
if not condition:
if args:
@@ -48,7 +53,14 @@
def message(msg, *args):
if args:
msg = msg % args
- print msg
+ print(msg)
+
+
+try:
+ import requests
+except ImportError:
+ message("Unable to import 'requests' module, which is required by livy-shell.")
+ sys.exit(1)
class LiteralDict(dict):
@@ -56,33 +68,30 @@
return name
-def request(conn, method, uri, body):
- body = json.dumps(body) if body else None
- headers = { 'Content-Type' : 'application/json' }
- conn.request(method, uri, body=body, headers=headers)
-
- resp = conn.getresponse()
- data = resp.read()
- if resp.status < 200 or resp.status >= 400:
- raise httplib.HTTPException, (resp.status, resp.reason, data)
- if resp.status < 300 and resp.status != httplib.NO_CONTENT:
- return json.loads(data)
+def request(method, uri, body):
+ kwargs = { 'headers': { 'Content-Type' : 'application/json', 'X-Requested-By': 'livy' } }
+ if body:
+ kwargs['json'] = body
+ resp = requests.request(method.upper(), urlparse.urljoin(url.geturl(), uri), **kwargs)
+ resp.raise_for_status()
+ if resp.status_code < requests.codes.multiple_choices and resp.status_code != requests.codes.no_content:
+ return resp.json()
return None
-def get(conn, uri):
- return request(conn, 'GET', uri, None)
+def get(uri):
+ return request('GET', uri, None)
-def post(conn, uri, body):
- return request(conn, 'POST', uri, body)
+def post(uri, body):
+ return request('POST', uri, body)
-def delete(conn, uri):
- return request(conn, 'DELETE', uri, None)
+def delete(uri):
+ return request('DELETE', uri, None)
-def create_session(conn):
+def create_session():
request = {
"kind" : "spark"
}
@@ -91,21 +100,21 @@
key, value = opt.split('=', 1)
request[key] = eval(value, LiteralDict())
- return post(conn, "/sessions", request)
+ return post("/sessions", request)
def wait_for_idle(sid):
- session = get(conn, "/sessions/%d" % (sid, ))
+ session = get("/sessions/%d" % (sid, ))
while session['state'] == 'starting':
message("Session not ready yet (%s)", session['state'])
time.sleep(5)
- session = get(conn, "/sessions/%d" % (sid, ))
+ session = get("/sessions/%d" % (sid, ))
if session['state'] != 'idle':
raise Exception, "Session failed to start."
-def monitor_statement(conn, sid, s):
+def monitor_statement(sid, s):
cnt = 0
while True:
state = s['state']
@@ -139,23 +148,27 @@
else:
cnt += 1
time.sleep(1)
- s = get(conn, "/sessions/%d/statements/%s" % (sid, s['id']))
+ s = get("/sessions/%d/statements/%s" % (sid, s['id']))
-def run_shell(conn, sid):
+def run_shell(sid, session_kind):
+ prompt = "{} ({}) > ".format(session_kind, sid)
+ def ctrl_c_handler(signal, frame):
+ message("\nPlease type quit() to exit the livy shell.")
+ raise ControlCInterrupt()
+ signal.signal(signal.SIGINT, ctrl_c_handler)
+
while True:
- cmd = raw_input('> ')
- if cmd == "quit()":
- break
+ try:
+ cmd = raw_input(prompt)
+ if cmd == "quit()":
+ break
+ except ControlCInterrupt:
+ continue
- statement = post(conn, "/sessions/%d/statements" % (sid, ), { 'code' : cmd })
- monitor_statement(conn, sid, statement)
+ statement = post("/sessions/%d/statements" % (sid, ), { 'code' : cmd })
+ monitor_statement(sid, statement)
-def open_connection(url):
- if url.scheme == "https":
- return httplib.HTTPSConnection(url.netloc)
- else:
- return httplib.HTTPConnection(url.netloc)
#
# main()
@@ -164,24 +177,18 @@
check(len(sys.argv) > 1, "Missing arguments.")
url = urlparse.urlparse(sys.argv[1])
-conn = open_connection(url)
-
sid = -1
+
try:
message("Creating new session...")
- session = create_session(conn)
+ session = create_session()
sid = int(session['id'])
message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind'])
wait_for_idle(sid)
message("Session ready.")
- run_shell(conn,sid)
+ run_shell(sid, session.get('kind', 'spark'))
except EOFError:
pass
finally:
- conn.close()
if sid != -1:
- conn = open_connection(url)
- try:
- delete(conn, "/sessions/%d" % (sid, ))
- finally:
- conn.close()
+ delete("/sessions/%d" % (sid, ))