blob: 00339ccf6a2af94ac739ac3f2f7e8eac48c80cf6 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import functools
import json
import marshal
import base64
import types
import os
import inspect
import composer
import re
import requests
import traceback
from conductor import __version__
def escape(str):
return re.sub(r'(\n|\t|\r|\f|\v|\\|\')', lambda m:{'\n':'\\n','\t':'\\t','\r':'\\r','^\f':'\\f','\v':'\\v','\\':'\\\\','\'':'\\\''}[m.group()], str)
def synthesize(composition): # dict
code = '# generated by composer v'+composition['version']+' and conductor v'+__version__+'\n\nimport os\nimport functools\nimport json\nimport inspect\nimport re\nimport base64\nimport marshal\nimport types\nimport requests\nimport urllib.parse'
code += '\n\n' + inspect.getsource(composer.ComposerError)
code += '\ncomposition=json.loads(\''+escape(json.dumps(composition['composition'], default=composer.serialize, ensure_ascii=True))+'\')'
code += '\n' + inspect.getsource(conductor)
code += '\n' + inspect.getsource(openwhisk)
code += '\n' + inspect.getsource(Compositions)
code += '\n' + inspect.getsource(composer.serialize)
code += '\n' + inspect.getsource(composer.Composition)
code += '\n' + inspect.getsource(composer.get_value)
code += '\n' + inspect.getsource(composer.get_params)
code += '\n' + inspect.getsource(composer.set_params)
code += '\n' + inspect.getsource(composer.retain_result)
code += '\n' + inspect.getsource(composer.retain_nested_result)
code += '\n' + inspect.getsource(composer.dec_count)
code += '\n' + inspect.getsource(composer.set_nested_params)
code += '\n' + inspect.getsource(composer.get_nested_params)
code += '\n' + inspect.getsource(composer.set_nested_result)
code += '\n' + inspect.getsource(composer.get_nested_result)
code += '\n' + inspect.getsource(composer.retry_cond)
import openwhisk as ow
code += '\n' + inspect.getsource(ow.Client)
code += '\n' + inspect.getsource(ow.BaseOperation)
code += '\n' + inspect.getsource(ow.Resource)
code += '\n' + inspect.getsource(ow.Action)
code += '\n' + inspect.getsource(ow.parse_id_and_ns)
code += '\n' + inspect.getsource(ow.parse_id)
code += '\n' + inspect.getsource(ow.parse_namespace)
code += 'def main(args):'
code += '\n return conductor(composition)(args)'
annotations = [
{ 'key': 'conductor', 'value': str(composition['ast']) },
{ 'key': 'composerVersion', 'value': composition['version'] },
{ 'key': 'conductorVersion', 'value': __version__ },
{ 'key': 'provide-api-key', 'value': True },
*composition["annotations"]
]
return { 'name': composition['name'], 'action': { 'exec': { 'kind': 'python:3', 'code':code }, 'annotations': annotations, 'limits': composition['limits'] } }
def openwhisk(options):
''' return enhanced openwhisk client capable of deploying compositions '''
# try to extract apihost and key first from whisk property file file and then from os.environ
try:
wskpropsPath = os.environ['WSK_CONFIG_FILE'] if 'WSK_CONFIG_FILE' in os.environ else os.path.expanduser('~/.wskprops')
with open(wskpropsPath) as f:
lines = f.readlines()
options = dict(options)
for line in lines:
parts = line.strip().split('=')
if len(parts) == 2:
if parts[0] == 'APIHOST':
options['apihost'] = parts[1]
elif parts[0] == 'AUTH':
options['api_key'] = parts[1]
except:
pass
if '__OW_API_HOST' in os.environ:
options['apihost'] = os.environ['__OW_API_HOST']
if '__OW_API_KEY' in os.environ:
options['api_key'] = os.environ['__OW_API_KEY']
try:
import openwhisk
wsk = openwhisk.Client(options)
except:
wsk = Client(options)
wsk.compositions = Compositions(wsk)
return wsk
class Compositions:
''' management class for compositions '''
def __init__(self, wsk):
self.actions = wsk.actions
def deploy(self, composition, overwrite):
actions = composition.get('actions', [])
actions.append(synthesize(composition))
for action in actions:
if overwrite:
try:
self.actions.delete(action)
except Exception:
pass
self.actions.create(action)
return actions
def conductor(composition): # main.
wsk = None
isObject = lambda x: isinstance(x, dict)
# compile AST to FSM
compiler = {}
astnode = lambda f: compiler.setdefault(f.__name__[1:], f)
@astnode
def _sequence(parent, node):
fsm = [{ 'parent': parent, 'type': 'pass' }]
fsm.extend(compile(parent, *node['components']))
return fsm
@astnode
def _action(parent, node):
return [{ 'parent': parent, 'type': 'action', 'name': node['name'] }]
@astnode
def _asynchronous(parent, node):
body = compile(parent, *node['components'])
return [{ 'parent': parent, 'type': 'async', 'return': len(body) + 2}, *body, {'parent': parent, 'type': 'stop' }, {'parent': parent, 'type': 'pass' }]
@astnode
def _function(parent, node):
return [{ 'parent': parent, 'type': 'function', 'exec': node['function']['exec'] }]
@astnode
def _ensure(parent, node):
body = compile(parent, node['body'])
finalizer = compile(parent, node['finalizer'])
fsm = [{ 'parent': parent, 'type': 'try'}, *body, { 'parent': parent, 'type': 'exit' }, *finalizer]
fsm[0]['catch'] = len(fsm) - len(finalizer)
return fsm
@astnode
def _let(parent, node):
body = compile(parent, *node['components'])
return [{'parent': parent, 'type': 'let', 'let': node['declarations']}, *body, { 'parent': parent, 'type': 'exit' }]
@astnode
def _mask(parent, node):
body = compile(parent, *node['components'])
return [{'parent': parent, 'type': 'let', 'let': None}, *body, { 'parent': parent, 'type': 'exit' }]
@astnode
def _do(parent, node):
handler = [ *compile(parent, node['handler']), {'parent': parent, 'type': 'pass'}]
body = compile(parent, node['body'])
fsm = [{ 'parent': parent, 'type': 'try' }, *body, { 'parent': parent, 'type': 'exit' }, *handler]
fsm[0]['catch'] = len(fsm) - len(handler)
fsm[len(fsm) - len(handler) - 1]['next'] = len(handler)
return fsm
@astnode
def _when_nosave(parent, node):
consequent = compile(parent, node['consequent'])
alternate = [ *compile(parent, node['alternate']), { parent: 'parent', 'type': 'pass' }]
fsm = [{ 'parent': parent, 'type': 'pass' },
*compile(parent, node['test']),
{ 'parent': parent, 'type': 'choice', 'then': 1, 'else': len(consequent) + 1 },
*consequent,
*alternate]
fsm[len(fsm) - len(alternate) - 1]['next'] = len(alternate)
return fsm
@astnode
def _loop_nosave(parent, node):
body = compile(parent, node['body'])
test = compile(parent, node['test'])
fsm = [{ 'parent': parent, 'type': 'pass' }, *test,
{ 'parent': parent, 'type': 'choice', 'then': 1, 'else': len(body) + 1 },
*body, { parent: 'parent', 'type': 'pass' }]
fsm[len(fsm) - 2]['next'] = 2 - len(fsm)
return fsm
@astnode
def _doloop_nosave(parent, node):
body = compile(parent, node['body'])
test = compile(parent, node['test'])
fsm = [{ 'parent': parent, 'type': 'pass' }, *body, *test,
{ 'parent': parent, 'type': 'choice', 'else': 1}, { parent: 'parent', 'type': 'pass' }]
fsm[len(fsm) - 2]['then'] = 2 - len(fsm)
return fsm
def compile(parent, *node):
nonlocal compiler
if len(node) == 0:
return [{'parent': parent, 'type': 'empty'}]
if len(node) == 1:
return compiler[node[0]['type']](node[0]['path'] if 'path' in node[0] else parent, node[0])
return functools.reduce(lambda fsm, node: extends(fsm, compile(parent, node)), node, [])
def extends(l, items):
l.extend(items)
return l
fsm = compile('', composition)
conductor = {}
operator = lambda f: conductor.setdefault(f.__name__[1:], f)
@operator
def _choice(p, node, index, inspect, step):
p['s']['state'] = index + (node['then'] if p['params']['value'] else node['else'])
return None
@operator
def _try(p, node, index, inspect, step):
p['s']['stack'].insert(0, { 'catch': index + node['catch'] })
@operator
def _let(p, node, index, inspect, step):
p['s']['stack'].insert(0, { 'let': node['let'] }) # JSON.parse(JSON.stringify(jsonv.let))
@operator
def _exit(p, node, index, inspect, step):
if len(p['s']['stack']) == 0:
return internalError('pop from an empty stack')
p['s']['stack'].pop(0)
@operator
def _action(p, node, index, inspect, step):
return { 'method': 'action', 'action': node['name'], 'params': p['params'], 'state': { '$composer': p['s'] } }
@operator
def _function(p, node, index, inspect, step):
result = None
try:
functionName = node['exec']['functionName'] if 'functionName' in node['exec'] else None
result = run(node['exec']['code'], p, node['exec']['kind'], functionName)
except Exception as error:
result = { 'error': 'Function combinator threw an exception at AST node root'+node['parent']+' (see log for details)' }
if callable(result):
result = { 'error': 'Function combinator evaluated to a function type at AST node root'+node['parent']}
# if a function has only side effects and no return value (or return None), return params
p['params'] = p['params'] if result is None else result
inspect_errors(p)
return step(p)
@operator
def _empty(p, node, index, inspect, step):
inspect_errors(p)
@operator
def _pass(p, node, index, inspect, step):
pass
@operator
def _async(p, node, index, inspect, step):
nonlocal wsk
p['params']['$composer'] = { 'state': p['s']['state'], 'stack': [{ 'marker': True }] + p['s']['stack'] }
p['s']['state'] = index + node['return']
if wsk is None:
wsk = openwhisk({ 'ignore_certs': True })
try:
response = wsk.actions.invoke({ 'name': os.getenv('__OW_ACTION_NAME'), 'params': p['params'] })
result = { 'method': 'async', 'activationId': response['activationId'], 'sessionId': p['s']['session'] }
except Exception as err:
print(err) # invoke failed
result = { 'error': 'Async combinator failed to invoke composition at AST node root'+node['parent']+' (see log for details)' }
p['params'] = result
inspect_errors(p)
return step(p)
def finish(q):
return q['params'] if 'error' in q['params'] else { 'params': q['params'] }
def encodeError(error):
if isinstance(error, str) or not hasattr(error, "__getitem__"):
return {
'code': 500,
'error': error
}
else:
return {
'code': error['code'] if isinstance(error['code'], int) else 500,
'error': error['error'] if isinstance(error['error'], str) else (error['message'] if 'message' in error else 'An internal error occurred')
}
# error status codes
#badRequest = lambda error: { 'code': 400, 'error': error }
internalError = lambda error: encodeError(error)
def inspect_errors(p):
if not isObject(p['params']):
p['params'] = { 'value': p['params'] }
if 'error' in p['params']:
p['params'] = { 'error': p['params']['error'] } # discard all fields but the error field
p['s']['state'] = -1 # abort unless there is a handler in the stack
while len(p['s']['stack']) > 0 and 'marker' not in p['s']['stack'][0]:
first = p['s']['stack'][0]
p['s']['stack'] = p['s']['stack'][1:]
if 'catch' in first:
p['s']['state'] = first['catch']
if p['s']['state'] >= 0:
break
def reduceRight(func, init, seq):
if not seq:
return init
else:
return func(reduceRight(func, init, seq[1:]), seq[0])
def update(dict, dict2):
dict.update(dict2)
return dict
# run function f on current stack
def run(f, p, kind, functionName=None):
# handle let/mask pairs
view = []
n = 0
for frame in p['s']['stack']:
if 'let' in frame and frame['let'] is None:
n += 1
elif 'let' in frame:
if n == 0:
view.append(frame)
else:
n -= 1
# update value of topmost matching symbol on stack if any
def set(symbol, value):
lets = [element for element in view if 'let' in element and symbol in element['let']]
if len(lets) > 0:
element = lets[0]
element['let'][symbol] = value # TODO: JSON.parse(JSON.stringify(value))
# collapse stack for invocation
env = reduceRight(lambda acc, cur: update(acc, cur['let']) if 'let' in cur and isinstance(cur['let'], dict) else acc, {}, view)
if kind == 'python:3':
main = '''exec(code + "\\n__out__['value'] = ''' + functionName + '''(env, args)", {'env': env, 'args': args, '__out__':__out__})'''
code = f
else: # lambda
main = '''__out__['value'] = code(env, args)'''
code = types.FunctionType(marshal.loads(base64.b64decode(bytearray(f, 'ASCII'))), {})
try:
out = {'value': None}
exec(main, {'env': env, 'args': p['params'], 'code': code, '__out__': out})
return out['value']
finally:
for name in env:
set(name, env[name])
def step(p):
# final state, return composition result
if p['s']['state'] < 0 or p['s']['state'] >= len(fsm):
print('Entering final state')
print(json.dumps(p['params']))
return None
# process one state
node = fsm[p['s']['state']] # json definition for current state
if 'path' in node:
print('Entering composition'+node['path'])
index = p['s']['state']
p['s']['state'] = p['s']['state'] + node.get('next', 1)
if not callable(conductor[node['type']]):
return internalError('unexpected '+node['type']+' combinator')
result = conductor[node['type']](p, node, index, inspect, step)
return result if result is not None else step(p)
def invoke(params):
''' do invocation '''
pcomposer = params.get('$composer', {})
if '$composer' in params:
del params['$composer']
pcomposer['session'] = pcomposer.get('session', os.getenv('__OW_ACTIVATION_ID'))
# current state
s = { 'state': 0, 'stack': [], 'resuming': True }
s.update(pcomposer)
p = { 's': s, 'params': params }
if not isinstance(p['s']['state'], int):
return internalError('state parameter is not a number')
if not isinstance(p['s']['stack'], list):
return internalError('stack parameter is not an array')
if 'resuming' in pcomposer:
inspect_errors(p) # handle error objects when resuming
result = None
try:
result = step(p)
except Exception as err:
p['params'] = {'error': internalError(err)}
return result if result is not None else finish(p)
return invoke