blob: d8aa8fb695c5f4e99979d7d024ec88ec2721c408 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
``ctx`` proxy server implementation.
"""
import json
import socket
import Queue
import StringIO
import threading
import traceback
import wsgiref.simple_server
import bottle
from aria import modeling
from .. import exceptions
class CtxProxy(object):
def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
self.ctx = ctx
self._ctx_patcher = ctx_patcher
self.port = _get_unused_port()
self.socket_url = 'http://localhost:{0:d}'.format(self.port)
self.server = None
self._started = Queue.Queue(1)
self.thread = self._start_server()
self._started.get(timeout=5)
def _start_server(self):
class BottleServerAdapter(bottle.ServerAdapter):
proxy = self
def close_session(self):
self.proxy.ctx.model.log._session.remove()
def run(self, app):
class Server(wsgiref.simple_server.WSGIServer):
allow_reuse_address = True
bottle_server = self
def handle_error(self, request, client_address):
pass
def serve_forever(self, poll_interval=0.5):
try:
wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
finally:
# Once shutdown is called, we need to close the session.
# If the session is not closed properly, it might raise warnings,
# or even lock the database.
self.bottle_server.close_session()
class Handler(wsgiref.simple_server.WSGIRequestHandler):
def address_string(self):
return self.client_address[0]
def log_request(*args, **kwargs): # pylint: disable=no-method-argument
if not self.quiet:
return wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
**kwargs)
server = wsgiref.simple_server.make_server(
host=self.host,
port=self.port,
app=app,
server_class=Server,
handler_class=Handler)
self.proxy.server = server
self.proxy._started.put(True)
server.serve_forever(poll_interval=0.1)
def serve():
# Since task is a thread_local object, we need to patch it inside the server thread.
self._ctx_patcher(self.ctx)
bottle_app = bottle.Bottle()
bottle_app.post('/', callback=self._request_handler)
bottle.run(
app=bottle_app,
host='localhost',
port=self.port,
quiet=True,
server=BottleServerAdapter)
thread = threading.Thread(target=serve)
thread.daemon = True
thread.start()
return thread
def close(self):
if self.server:
self.server.shutdown()
self.server.server_close()
def _request_handler(self):
request = bottle.request.body.read() # pylint: disable=no-member
response = self._process(request)
return bottle.LocalResponse(
body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
status=200,
headers={'content-type': 'application/json'}
)
def _process(self, request):
try:
with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS):
payload = _process_request(self.ctx, request)
result_type = 'result'
if isinstance(payload, exceptions.ScriptException):
payload = dict(message=str(payload))
result_type = 'stop_operation'
result = {'type': result_type, 'payload': payload}
except Exception as e:
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
payload = {
'type': type(e).__name__,
'message': str(e),
'traceback': traceback_out.getvalue()
}
result = {'type': 'error', 'payload': payload}
return result
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
class CtxError(RuntimeError):
pass
class CtxParsingError(CtxError):
pass
def _process_request(ctx, request):
request = json.loads(request)
args = request['args']
return _process_arguments(ctx, args)
def _process_arguments(obj, args):
# Modifying?
try:
# TODO: should there be a way to escape "=" in case it is needed as real argument?
equals_index = args.index('=') # raises ValueError if not found
except ValueError:
equals_index = None
if equals_index is not None:
if equals_index == 0:
raise CtxParsingError('The "=" argument cannot be first')
elif equals_index != len(args) - 2:
raise CtxParsingError('The "=" argument must be penultimate')
modifying = True
modifying_key = args[-3]
modifying_value = args[-1]
args = args[:-3]
else:
modifying = False
modifying_key = None
modifying_value = None
# Parse all arguments
while len(args) > 0:
obj, args = _process_next_operation(obj, args, modifying)
if modifying:
if hasattr(obj, '__setitem__'):
# Modify item value (dict, list, and similar)
if isinstance(obj, (list, tuple)):
modifying_key = int(modifying_key)
obj[modifying_key] = modifying_value
elif hasattr(obj, modifying_key):
# Modify object attribute
setattr(obj, modifying_key, modifying_value)
else:
raise CtxError(u'Cannot modify `{0}` of `{1!r}`'.format(modifying_key, obj))
return obj
def _process_next_operation(obj, args, modifying):
args = list(args)
arg = args.pop(0)
# Call?
if arg == '[':
# TODO: should there be a way to escape "[" and "]" in case they are needed as real
# arguments?
try:
closing_index = args.index(']') # raises ValueError if not found
except ValueError:
raise CtxParsingError('Opening "[" without a closing "]')
callable_args = args[:closing_index]
args = args[closing_index + 1:]
if not callable(obj):
raise CtxError('Used "[" and "] on an object that is not callable')
return obj(*callable_args), args
# Attribute?
if isinstance(arg, basestring):
if hasattr(obj, arg):
return getattr(obj, arg), args
token_sugared = arg.replace('-', '_')
if hasattr(obj, token_sugared):
return getattr(obj, token_sugared), args
# Item? (dict, lists, and similar)
if hasattr(obj, '__getitem__'):
if modifying and (arg not in obj) and hasattr(obj, '__setitem__'):
# Create nested dict
obj[arg] = {}
return obj[arg], args
raise CtxParsingError(u'Cannot parse argument: `{0!r}`'.format(arg))
def _get_unused_port():
sock = socket.socket()
sock.bind(('127.0.0.1', 0))
_, port = sock.getsockname()
sock.close()
return port