blob: 141bbd75cf4bdc82300e60bcc13d0851d2e2e102 [file] [log] [blame]
import json
import os
import sys
import inspect
import re
import base64
import marshal
import types
from .composer import __version__
from .fqn import parse_action_name, ComposerError
undefined = object() # special undefined value
composer = types.SimpleNamespace() # Simple object with attributes
composer.util = {
'version': __version__
}
# Utility functions
def get_value(env, args):
return env['value']
def set_params(env, args):
env['params'] = args
def get_params(env, args):
return env['params']
def retain_result(env, args):
return { 'params': env['params'], 'result': args }
def retain_nested_result(env, args):
return { 'params': args['params'], 'result': args['result']['result'] }
def dec_count(env, args):
c = env['count']
env['count'] -= 1
return c > 0
def set_nested_params(env, args):
return { 'params': args }
def get_nested_params(env, args):
return args['params']
def set_nested_result(env, args):
return { 'result': args }
def get_nested_result(env, args):
return args['result']
def retry_cond(env, args):
result = args['result']
count = env['count']
env['count'] -= 1
return 'error' in result and count > 0
# lowerer
lowerer = types.SimpleNamespace()
lowerer.literal = lambda value: composer.let({ 'value': value }, lambda env, args: env['value'])
def retain(*components):
return composer.let(
{ 'params': None },
composer.ensure(
set_params,
composer.seq(composer.mask(*components), retain_result)))
lowerer.retain = retain
def retain_catch(*components):
return composer.seq(
composer.retain(
composer.ensure(
composer.seq(*components),
lambda env, args: { 'result' : args })),
retain_nested_result)
lowerer.retain_catch = retain_catch
def when(test, consequent, alternate):
return composer.let(
{ 'params': None },
set_params,
composer.ensure(
lambda env, args: { 'params' : args },
composer.when_nosave(
composer.mask(test),
composer.ensure(get_params, composer.mask(consequent)),
composer.ensure(get_params, composer.mask(alternate)))))
lowerer.when = when
def loop(test, body):
return composer.let(
{ 'params': None },
composer.ensure(
set_params,
composer.seq(composer.loop_nosave(
composer.mask(test),
composer.ensure(get_params, composer.seq(composer.mask(body), set_params)),
get_params))))
lowerer.loop = loop
def dowhile(body, test):
return composer.let(
{ 'params': None },
composer.ensure(
set_params,
composer.seq(composer.dowhile_nosave(
composer.ensure(get_params, composer.seq(composer.mask(body), set_params)),
composer.mask(test)),
get_params)))
lowerer.dowhile = dowhile
def repeat(count, *components):
return composer.let(
{ 'count': count },
composer.loop(
dec_count,
composer.mask(*components)))
lowerer.repeat = repeat
def retry(count, *components):
return composer.let(
{ 'count': count },
set_nested_params,
composer.dowhile(
composer.ensure(get_nested_params, composer.mask(composer.retain_catch(*components))),
retry_cond,
get_nested_result))
lowerer.retry = retry
def prepare_payload(env, args):
envs = env['req'].copy()
del envs['action']
return { 'action': env['req']['action'], 'args': envs, 'payload': args['payload'] , 'timeout': envs['timeout'] }
def invoke (req, timeout=None):
return composer.let(
{ 'req': req, 'timeout': timeout },
prepare_payload,
composer.execute())
lowerer.invoke = invoke
def sleep(ms):
return composer.invoke({ 'action': 'sleep', 'ms': ms })
lowerer.sleep = sleep
def merge (*components):
return composer.seq(composer.retain(*components), lambda env, args: args['params'].update(args['result']))
lowerer.merge = merge
# == Done lowerer
def visit(composition, f):
''' apply f to all fields of type composition '''
combinator = getattr(composition, '.combinator')
if 'components' in combinator:
composition.components = composition.components.map(f)
if 'args' in combinator:
for arg in combinator['args']:
if 'type' not in arg and arg.name in composition:
setattr(composition, arg.name, f(getattr(composition, arg.name), arg.name))
return Composition(composition)
def label(composition):
''' recursively label combinators with the json path '''
def label(path):
def labeler(composition, name=None, array=False):
nonlocal path
segment = ''
if name is not None:
if array:
segment = '['+name+']'
else:
segment = '.'+name
p = path + segment
composition = visit(composition, label(p))
composition.path = p
return composition
return labeler
return label('')(composition)
def declare(combinators, prefix=None):
'''
derive combinator methods from combinator table
check argument count and map argument positions to argument names
delegate to Composition constructor for the rest of the validation
'''
if not isinstance(combinators, dict):
raise ComposerError('Invalid argument "combinators" in "declare"', combinators)
if prefix is not None and not isinstance(prefix, str):
raise ComposerError('Invalid argument "prefix" in "declare"', prefix)
composer = types.SimpleNamespace()
for key in combinators:
type_ = prefix + '.' + key if prefix is not None else key
combinator = combinators[key]
if not isinstance(combinator, dict) or ('args' in combinator and not isinstance(combinator['args'], list)):
raise ComposerError('Invalid "'+type_+'" combinator specification in "declare"', combinator)
if 'args' in combinator:
for arg in combinator.args:
if not isinstance(arg['name'], str):
raise ComposerError('Invalid "'+type_+'" combinator specification in "declare"', combinator)
def combine(*arguments):
composition = { 'type': type_, '.combinator': lambda : combinator }
skip = len(combinator['args']) if 'args' in combinator else 0
if 'components' not in combinator and len(arguments) > skip:
raise ComposerError('Too many arguments in "'+type_+'" combinator')
for i in range(skip):
composition[combinator['args'][i]['name']] = arguments[i]
if 'components' in combinator:
composition['components'] = arguments[skip:]
return Composition(composition)
setattr(composer, key, combine)
return composer
def serialize(obj):
return obj.__dict__
class Composition:
def __init__(self, composition):
''' construct a composition object with the specified fields '''
combinator = getattr(composition, '.combinator')()
# shallow copy of obj attributes
items = composition.items() if isinstance(composition, dict) else composition.__dict__.items() if isinstance(composition, Composition) else None
if items is None:
raise ComposerError('Invalid argument', composition)
for k, v in items:
setattr(self, k, v)
if 'args' in combinator:
for arg in combinator.args:
if arg.name not in composition and optional and 'type' in arg:
continue
optional = getattr(arg, 'optional', False)
if 'type' not in arg:
try:
value = getattr(composition, arg.name, None if optional else undefined)
setattr(self, arg.name, composer.task(value))
except Exception:
raise ComposerError('Invalid argument "'+arg.name+'" in "'+composition.type+' combinator"', value)
elif arg.type == 'name':
try:
setattr(self, arg.name, parse_action_name(getattr(composition, arg.name)))
except ComposerError as ce:
raise ComposerError(ce.message + 'in "'+composition.type+' combinator"', getattr(composition, arg.name))
elif arg.type == 'value':
if arg.name not in composition or callable(getattr(composition, arg.name)):
raise ComposerError('Invalid argument "' + arg.name+'" in "'+ composition.type+'combinator"', getattr(composition, arg.name))
elif arg.type == 'object':
if arg.name not in composition or not isinstance(getattr(composition, arg.name), Composition):
raise ComposerError('Invalid argument "' + arg.name+'" in "'+ composition.type+'combinator"', getattr(composition, arg.name))
else:
if type(getattr(composition, arg.name)) != arg.type:
raise ComposerError('Invalid argument "' + arg.name+'" in "'+ composition.type+'combinator"', getattr(composition, arg.name))
if 'components' in combinator:
self.components = map(composer.task, getattr(composition, 'components', []))
def __str__(self):
return json.dumps(self.__dict__, default=serialize, ensure_ascii=True)
def compile(self):
''' compile composition. Returns a dictionary '''
actions = []
def flatten(composition, _=None):
composition = visit(composition, flatten)
if composition.type == 'action' and hasattr(composition, 'action'): # pylint: disable=E1101
actions.append({ 'name': composition.name, 'action': composition.action })
del composition.action # pylint: disable=E1101
return composition
obj = { 'composition': label(flatten(self)).lower(), 'ast': self, 'version': __version__ }
if len(actions) > 0:
obj['actions'] = actions
return obj
def lower(self, combinators = []):
''' recursively lower combinators to the desired set of combinators (including primitive combinators) '''
if not isinstance(combinators, list) and not isinstance(combinators, str):
raise ComposerError('Invalid argument "combinators" in "lower"', combinators)
def lower(composition, _):
# repeatedly lower root combinator
while getattr(getattr(composition, '.combinator')(), 'def', False):
path = composition.path if hasattr(composition, 'path') else None
combinator = getattr(composition, '.combinator')()
if isinstance(combinator, list) and combinator.indexOf(composition.type) >= 0:
break
# no semver in openwhisk python runtime
# if isinstance(combinator, str) and getattr(combinator, 'since', False):
# break;
# map argument names to positions
args = []
skip = len(getattr(combinator, 'args', []))
for i in range(skip):
args.append(getattr(composition, combinator.args[i].name))
if 'components' in combinator:
args.extend(composition.components)
composition = combinator['def'](args)
# preserve path
if path is not None:
composition.path = path
return visit(composition, lower)
return lower(self, None)
# primitive combinators
combinators = {
'sequence': { 'components': True, 'since': '0.4.0' },
'if_nosave': { 'args': [{ 'name': 'test' }, { 'name': 'consequent' }, { 'name': 'alternate', 'optional': True }], 'since': '0.4.0' },
'while_nosave': { 'args': [{ 'name': 'test' }, { 'name': 'body' }], 'since': '0.4.0' },
'dowhile_nosave': { 'args': [{ 'name': 'body' }, { 'name': 'test' }], 'since': '0.4.0' },
'try': { 'args': [{ 'name': 'body' }, { 'name': 'handler' }], 'since': '0.4.0' },
'finally': { 'args': [{ 'name': 'body' }, { 'name': 'finalizer' }], 'since': '0.4.0' },
'let': { 'args': [{ 'name': 'declarations', 'type': 'object' }], 'components': True, 'since': '0.4.0' },
'mask': { 'components': True, 'since': '0.4.0' },
'action': { 'args': [{ 'name': 'name', 'type': 'name' }, { 'name': 'action', 'type': 'object', 'optional': True }], 'since': '0.4.0' },
'function': { 'args': [{ 'name': 'function', 'type': 'object' }], 'since': '0.4.0' },
'async': { 'components': True, 'since': '0.6.0' },
'execute': { 'since': '0.5.2' },
'parallel': { 'components': True, 'since': '0.6.0' },
'map': { 'components': True, 'since': '0.6.0' },
'composition': { 'args': [{ 'name': 'name', 'type': 'name' }], 'since': '0.6.0' }
}
composer.__dict__.update(declare(combinators).__dict__)
# derived combinators
extra = {
'empty': { 'since': '0.4.0', 'def': composer.sequence },
'seq': { 'components': True, 'since': '0.4.0', 'def': composer.sequence },
'if': { 'args': [{ 'name': 'test' }, { 'name': 'consequent' }, { 'name': 'alternate', 'optional': True }], 'since': '0.4.0', 'def': lowerer.when },
'while': { 'args': [{ 'name': 'test' }, { 'name': 'body' }], 'since': '0.4.0', 'def': lowerer.loop },
'dowhile': { 'args': [{ 'name': 'body' }, { 'name': 'test' }], 'since': '0.4.0', 'def': lowerer.doloop },
'repeat': { 'args': [{ 'name': 'count', 'type': 'number' }], 'components': True, 'since': '0.4.0', 'def': lowerer.repeat },
'retry': { 'args': [{ 'name': 'count', 'type': 'number' }], 'components': True, 'since': '0.4.0', 'def': lowerer.retry },
'retain': { 'components': True, 'since': '0.4.0', 'def': lowerer.retain },
'retain_catch': { 'components': True, 'since': '0.4.0', 'def': lowerer.retain_catch },
'value': { 'args': [{ 'name': 'value', 'type': 'value' }], 'since': '0.4.0', 'def': lowerer.literal },
'literal': { 'args': [{ 'name': 'value', 'type': 'value' }], 'since': '0.4.0', 'def': lowerer.literal },
'sleep': { 'args': [{ 'name': 'ms', 'type': 'number' }], 'since': '0.5.0', 'def': lowerer.sleep },
'invoke': { 'args': [{ 'name': 'req', 'type': 'object' }, { 'name': 'timeout', 'type': 'number', 'optional': True }], 'since': '0.5.0', 'def': lowerer.invoke },
'par': { 'components': True, 'since': '0.8.2', 'def': composer.parallel },
'merge': { 'components': True, 'since': '0.13.0', 'def': lowerer.merge }
}
composer.__dict__.update(declare(extra).__dict__)
# add or override definitions of some combinators
def task(task):
''' detect task type and create corresponding composition object '''
if task is undefined:
raise ComposerError('Invalid argument in "task" combinator', task)
if task is None:
return composer.empty()
if isinstance(task, Composition):
return task
if callable(task):
return composer.function(task)
if isinstance(task, str): # python3 only
return composer.action(task)
raise ComposerError('Invalid argument "task" in "task" combinator', task)
composer.task = task
def function(fun):
''' function combinator: stringify def/lambda code '''
if fun.__name__ == '<lambda>':
exc = str(base64.b64encode(marshal.dumps(fun.__code__)), 'ASCII')
elif callable(fun):
try:
exc = inspect.getsource(fun)
except OSError:
raise ComposerError('Invalid argument', fun)
else:
exc = fun
if isinstance(exc, str):
if exc.startswith('def'):
# standardize function name
pattern = re.compile(r'def\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*\(')
match = pattern.match(exc)
functionName = match.group(1)
exc = { 'kind': 'python:3', 'code': exc, 'functionName': functionName }
else: # lambda
exc = { 'kind': 'python:3+lambda', 'code': exc }
if not isinstance(exc, dict) or exc is None:
raise ComposerError('Invalid argument "function" in "function" combinator', fun)
return Composition({'type':'function', 'function':{ 'exec': exc }, '.combinator': lambda: combinators['function'] })
composer.function = function
def action(name, options = {}):
''' action combinator '''
if not isinstance(options, dict):
raise ComposerError('Invalid argument "options" in "action" combinator', options)
exc = None
if 'sequence' in options and isinstance(options['sequence'], list): # native sequence
exc = { 'kind': 'sequence', 'components': tuple(map(parse_action_name, options['sequence'])) }
elif 'filename' in options and isinstance(options['filename'], str): # read action code from file
raise ComposerError('read from file not implemented')
# exc = fs.readFileSync(options.filename, { encoding: 'utf8' })
elif 'action' in options and callable(options['action']):
if options['action'].__name__ == '<lambda>':
exc = str(base64.b64encode(marshal.dumps(options['action'].__code__)), 'ASCII')
else:
try:
exc = inspect.getsource(options['action'])
except OSError:
raise ComposerError('Invalid argument "options" in "action" combinator', options['action'])
elif 'action' in options and (isinstance(options['action'], str) or isinstance(options['action'], dict)):
exc = options['action']
if isinstance(exc, str):
exc = { 'kind': 'python:3', 'code': exc }
composition = { 'type': 'action', 'name': name, '.combinator': lambda: combinators['action']}
if exc is not None:
composition.action = exc
return Composition(composition)