WIP: synchronize with composer master
diff --git a/src/composer/__init__.py b/src/composer/__init__.py
index d0b3c60..7356318 100644
--- a/src/composer/__init__.py
+++ b/src/composer/__init__.py
@@ -1,80 +1,82 @@
__version__ = '0.1.0'
-from .composer import Composer, ComposerError, parse_action_name
-_composer = Composer()
-def composition(name, task):
- return _composer.composition(name, task)
+# from .composer import Composer, ComposerError, parse_action_name
-def seq(*arguments):
- return _composer.sequence(*arguments)
+# _composer = Composer()
-def sequence(*arguments):
- return _composer.sequence(*arguments)
+# def composition(name, task):
+# return _composer.composition(name, task)
-def literal(value):
- return _composer.literal(value)
+# def seq(*arguments):
+# return _composer.sequence(*arguments)
-def action(name, action=None):
- return _composer.action(name, action)
+# def sequence(*arguments):
+# return _composer.sequence(*arguments)
-def task(task):
- return _composer.task(task)
+# def literal(value):
+# return _composer.literal(value)
-def function(value):
- return _composer.function(value)
+# def action(name, action=None):
+# return _composer.action(name, action)
-def when(test, consequent, alternate=None):
- return _composer.when(test, consequent, alternate)
+# def task(task):
+# return _composer.task(task)
-def when_nosave(test, consequent, alternate=None):
- return _composer.when_nosave(test, consequent, alternate)
+# def function(value):
+# return _composer.function(value)
-def loop(test, body):
- return _composer.loop(test, body)
+# def when(test, consequent, alternate=None):
+# return _composer.when(test, consequent, alternate)
-def loop_nosave(test, body):
- return _composer.loop_nosave(test, body)
+# def when_nosave(test, consequent, alternate=None):
+# return _composer.when_nosave(test, consequent, alternate)
-def do(body, handler):
- return _composer._compose('try', (body, handler))
+# def loop(test, body):
+# return _composer.loop(test, body)
-def doloop(body, test):
- return _composer.doloop(body, test)
+# def loop_nosave(test, body):
+# return _composer.loop_nosave(test, body)
-def doloop_nosave(body, test):
- return _composer.doloop_nosave(body, test)
+# def do(body, handler):
+# return _composer._compose('try', (body, handler))
-def ensure(body, finalizer):
- return _composer._compose('finally', (body, finalizer))
+# def doloop(body, test):
+# return _composer.doloop(body, test)
-def let(declarations, *arguments):
- return _composer._compose('let', (declarations, *arguments))
+# def doloop_nosave(body, test):
+# return _composer.doloop_nosave(body, test)
-def mask(*arguments):
- return _composer._compose('mask', arguments)
+# def ensure(body, finalizer):
+# return _composer._compose('finally', (body, finalizer))
-def retain(*arguments):
- return _composer._compose('retain', arguments)
+# def let(declarations, *arguments):
+# return _composer._compose('let', (declarations, *arguments))
-def retain_catch(*arguments):
- return _composer.retain_catch(*arguments)
+# def mask(*arguments):
+# return _composer._compose('mask', arguments)
-def repeat(count, *arguments):
- return _composer._compose('repeat', (count, *arguments))
+# def retain(*arguments):
+# return _composer._compose('retain', arguments)
-def retry(count, *arguments):
- return _composer._compose('retry', (count, *arguments))
+# def retain_catch(*arguments):
+# return _composer.retain_catch(*arguments)
-def deserialize(composition):
- return _composer.deserialize(composition)
+# def repeat(count, *arguments):
+# return _composer._compose('repeat', (count, *arguments))
-def encode(composition, localcombinators=[]):
- return _composer.encode(composition, localcombinators)
+# def retry(count, *arguments):
+# return _composer._compose('retry', (count, *arguments))
-def lower(composition, combinators = []):
- return _composer.lower(composition, combinators)
+# def deserialize(composition):
+# return _composer.deserialize(composition)
-def openwhisk(options):
- return _composer.openwhisk(options)
+# def encode(composition, localcombinators=[]):
+# return _composer.encode(composition, localcombinators)
+
+# def lower(composition, combinators = []):
+# return _composer.lower(composition, combinators)
+
+# def openwhisk(options):
+# return _composer.openwhisk(options)
diff --git a/src/composer/composer.py b/src/composer/composer.py
index 57ec5e1..141bbd7 100644
--- a/src/composer/composer.py
+++ b/src/composer/composer.py
@@ -1,17 +1,3 @@
-# Copyright 2018 IBM Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
import json
import os
import sys
@@ -19,68 +5,19 @@
import re
import base64
import marshal
+import types
+
+from .composer import __version__
+from .fqn import parse_action_name, ComposerError
-from composer import conductor
-from composer import __version__
+undefined = object() # special undefined value
+composer = types.SimpleNamespace() # Simple object with attributes
-# standard combinators
-
-combinators = {
- 'empty': {'since': '0.4.0'},
- 'seq': {'components': True, 'since': '0.4.0'},
- 'sequence': {'components': True, 'since': '0.4.0'},
- 'if': {'args': [{'_': 'test'}, {'_': 'consequent'}, {'_': 'alternate', 'optional': True}], 'since': '0.4.0'},
- 'if_nosave': {'args': [{'_': 'test'}, {'_': 'consequent'}, {'_': 'alternate', 'optional': True}], 'since': '0.4.0'},
- 'while': {'args': [{'_': 'test'}, {'_': 'body'}], 'since': '0.4.0'},
- 'while_nosave': {'args': [{'_': 'test'}, {'_': 'body'}], 'since': '0.4.0'},
- 'dowhile': {'args': [{'_': 'body'}, {'_': 'test'}], 'since': '0.4.0'},
- 'dowhile_nosave': {'args': [{'_': 'body'}, {'_': 'test'}], 'since': '0.4.0'},
- 'try': {'args': [{'_': 'body'}, {'_': 'handler'}], 'since': '0.4.0'},
- 'finally': {'args': [{'_': 'body'}, {'_': 'finalizer'}], 'since': '0.4.0'},
- 'retain': {'components': True, 'since': '0.4.0'},
- 'retain_catch': {'components': True, 'since': '0.4.0'},
- 'let': {'args': [{'_': 'declarations', 'type': 'dict'}], 'components': True, 'since': '0.4.0'},
- 'mask': {'components': True, 'since': '0.4.0'},
- 'action': {'args': [{'_': 'name', 'type': 'string'}, {'_': 'action', 'type': 'dict', 'optional': True}], 'since': '0.4.0'},
- 'composition': {'args': [{'_': 'name', 'type': 'string'}, {'_': 'composition'}], 'since': '0.4.0'},
- 'repeat': {'args': [{'_': 'count', 'type': 'int'}], 'components': True, 'since': '0.4.0'},
- 'retry': {'args': [{'_': 'count', 'type': 'int'}], 'components': True, 'since': '0.4.0'},
- 'value': {'args': [{'_': 'value', 'type': 'value'}], 'since': '0.4.0'},
- 'literal': {'args': [{'_': 'value', 'type': 'value'}], 'since': '0.4.0'},
- 'function': {'args': [{'_': 'function', 'type': 'dict'}], 'since': '0.4.0'}
+composer.util = {
+ 'version': __version__
}
-class ComposerError(Exception):
- def __init__(self, message, *arguments):
- self.message = message
- self.argument = arguments
-
-def serialize(obj):
- return obj.__dict__
-
-class Composition:
- def __init__(self, obj):
- items = obj.items() if isinstance(obj, dict) else obj.__dict__.items() if isinstance(obj, Composition) else None
- if items is None:
- raise ComposerError('Invalid argument', obj)
- for k, v in items:
- setattr(self, k, v)
-
- def __str__(self):
- return json.dumps(self.__dict__, default=serialize, ensure_ascii=True)
-
- def visit(self, f):
- ''' apply f to all fields of type composition '''
-
- combinator = combinators[getattr(self, 'type')]
- if 'components' in combinator:
- self.components = [f(c, str(idx), True) for idx,c in enumerate(self.components)]
-
- if 'args' in combinator:
- for arg in combinator['args']:
- if 'type' not in arg:
- setattr(self, arg['_'], f(getattr(self, arg['_']), arg['_'], False))
-
+# Utility functions
def get_value(env, args):
return env['value']
@@ -120,453 +57,418 @@
env['count'] -= 1
return 'error' in result and count > 0
-class Compiler:
+# lowerer
- def empty(self):
- return self._compose('empty', ())
+lowerer = types.SimpleNamespace()
+lowerer.literal = lambda value: composer.let({ 'value': value }, lambda env, args: env['value'])
- def literal(self, value):
- return self._compose('literal', (value,))
-
- def seq(self, *arguments):
- return self._compose('seq', arguments)
-
- def sequence(self, *arguments):
- return self._compose('sequence', arguments)
-
- def action(self, name, action=None):
- return self._compose('action', (name, action))
-
- def when(self, test, consequent, alternate=None):
- return self._compose('if', (test, consequent, alternate))
-
- def when_nosave(self, test, consequent, alternate=None):
- return self._compose('if_nosave', (test, consequent, alternate))
-
- def loop(self, test, body):
- return self._compose('while', (test, body))
-
- def loop_nosave(self, test, body):
- return self._compose('while_nosave', (test, body))
-
- def doloop(self, body, test):
- return self._compose('dowhile', (body, test))
-
- def doloop_nosave(self, body, test):
- return self._compose('dowhile_nosave', (body, test))
-
- def ensure(self, body, finalizer):
- return self._compose('finally', (body, finalizer))
-
- def retain(self, *arguments):
- return self._compose('retain', arguments)
-
- def retain_catch(self, *arguments):
- return self._compose('retain_catch', arguments)
-
- def mask(self, *arguments):
- return self._compose('mask', arguments)
-
- def let(self, declarations, *arguments):
- return self._compose('let', (declarations, *arguments))
-
- def task(self, task):
- '''detect task type and create corresponding composition object'''
- if task is None:
- return self.empty()
-
- if isinstance(task, Composition):
- return task
-
- if callable(task):
- return self.function(task)
-
- if isinstance(task, str): # python3 only
- return self.action(task)
-
- raise ComposerError('Invalid argument', task)
-
- def function(self, fun):
- ''' function combinator: stringify def/lambda code '''
- if fun.__name__ == '<lambda>':
- exc = str(base64.b64encode(marshal.dumps(fun.__code__)), 'ASCII')
- elif callable(fun):
- try:
- exc = inspect.getsource(fun)
- except OSError:
- raise ComposerError('Invalid argument', fun)
- else:
- exc = fun
-
- if isinstance(exc, str):
- if exc.startswith('def'):
- # standardize function name
- pattern = re.compile('def\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*\(')
- match = pattern.match(exc)
- functionName = match.group(1)
-# exc = pattern.sub('def func(', exc)
- exc = { 'kind': 'python:3', 'code': exc, 'functionName': functionName }
- else: # lambda
- exc = { 'kind': 'python:3+lambda', 'code': exc }
-
- if not isinstance(exc, dict) or exc is None:
- raise ComposerError('Invalid argument', fun)
-
- return Composition({'type':'function', 'function':{ 'exec': exc }})
-
- # lowering
-
- def _empty(self, composition):
- return self.sequence()
-
- def _seq(self, composition):
- return self.sequence(*composition.components)
-
- def _value(self, composition):
- return self._literal(composition)
-
- def _literal(self, composition):
- return self.let({ 'value': composition.value }, get_value)
-
- def _retain(self, composition):
- return self.let(
- { 'params': None },
+def retain(*components):
+ return composer.let(
+ { 'params': None },
+ composer.ensure(
set_params,
- self.mask(*composition.components),
- retain_result)
+ composer.seq(composer.mask(*components), retain_result)))
- def _retain_catch(self, composition):
- return self.seq(
- self.retain(
- self.ensure(
- self.seq(*composition.components),
- set_nested_result)),
- retain_nested_result)
+lowerer.retain = retain
- def _if(self, composition):
- return self.let(
- { 'params': None },
+def retain_catch(*components):
+ return composer.seq(
+ composer.retain(
+ composer.ensure(
+ composer.seq(*components),
+ lambda env, args: { 'result' : args })),
+ retain_nested_result)
+
+lowerer.retain_catch = retain_catch
+
+def when(test, consequent, alternate):
+ return composer.let(
+ { 'params': None },
+ set_params,
+ composer.ensure(
+ lambda env, args: { 'params' : args },
+ composer.when_nosave(
+ composer.mask(test),
+ composer.ensure(get_params, composer.mask(consequent)),
+ composer.ensure(get_params, composer.mask(alternate)))))
+
+lowerer.when = when
+
+def loop(test, body):
+ return composer.let(
+ { 'params': None },
+ composer.ensure(
set_params,
- self.when_nosave(
- self.mask(composition.test),
- self.seq(get_params, self.mask(composition.consequent)),
- self.seq(get_params, self.mask(composition.alternate))))
+ composer.seq(composer.loop_nosave(
+ composer.mask(test),
+ composer.ensure(get_params, composer.seq(composer.mask(body), set_params)),
+ get_params))))
- def _while(self, composition):
- return self.let(
- { 'params': None },
+lowerer.loop = loop
+
+def dowhile(body, test):
+ return composer.let(
+ { 'params': None },
+ composer.ensure(
set_params,
- self.loop_nosave(
- self.mask(composition.test),
- self.seq(get_params, self.mask(composition.body), set_params)),
- get_params)
+ composer.seq(composer.dowhile_nosave(
+ composer.ensure(get_params, composer.seq(composer.mask(body), set_params)),
+ composer.mask(test)),
+ get_params)))
- def _dowhile(self, composition):
- return self.let(
- { 'params': None },
- set_params,
- self.doloop_nosave(
- self.seq(get_params, self.mask(composition.body), set_params),
- self.mask(composition.test)),
- get_params)
+lowerer.dowhile = dowhile
+def repeat(count, *components):
+ return composer.let(
+ { 'count': count },
+ composer.loop(
+ dec_count,
+ composer.mask(*components)))
- def _repeat(self, composition):
- return self.let(
- { 'count': composition.count },
- self.loop(
- dec_count,
- self.mask(self.seq(*composition.components))))
+lowerer.repeat = repeat
- def _retry(self, composition):
- return self.let(
- { 'count': composition.count },
- set_nested_params,
- self.doloop(
- self.ensure(get_nested_params, self.mask(self.retain_catch(*composition.components))),
- retry_cond),
- get_nested_result)
+def retry(count, *components):
+ return composer.let(
+ { 'count': count },
+ set_nested_params,
+ composer.dowhile(
+ composer.ensure(get_nested_params, composer.mask(composer.retain_catch(*components))),
+ retry_cond,
+ get_nested_result))
- def _compose(self, type_, arguments):
- combinator = combinators[type_]
- skip = len(combinator['args']) if 'args' in combinator else 0
- composition = Composition({'type':type_})
+lowerer.retry = retry
- # process declared arguments
- for i in range(skip):
- arg = combinator['args'][i]
- argument = arguments[i] if len(arguments) > i else None
+def prepare_payload(env, args):
+ envs = env['req'].copy()
+ del envs['action']
+ return { 'action': env['req']['action'], 'args': envs, 'payload': args['payload'] , 'timeout': envs['timeout'] }
- if 'type' not in arg:
- setattr(composition, arg['_'], self.task(argument))
- elif arg['type'] == 'value':
- if type(argument).__name__ == 'function':
- raise ComposerError('Invalid argument', argument)
- setattr(composition, arg['_'], argument)
- else:
- if type(argument).__name__ != arg['type']:
- raise ComposerError('Invalid argument', argument)
+def invoke (req, timeout=None):
+ return composer.let(
+ { 'req': req, 'timeout': timeout },
+ prepare_payload,
+ composer.execute())
- setattr(composition, arg['_'], argument)
+lowerer.invoke = invoke
- if 'components' in combinator:
- setattr(composition, 'components', tuple(map(lambda obj: self.task(obj), arguments[skip:])))
+def sleep(ms):
+ return composer.invoke({ 'action': 'sleep', 'ms': ms })
- return composition
+lowerer.sleep = sleep
- def deserialize(self, composition):
- ''' recursively deserialize composition '''
+def merge (*components):
+ return composer.seq(composer.retain(*components), lambda env, args: args['params'].update(args['result']))
- composition = Composition(composition)
- composition.visit(lambda composition, name, ignore=False: self.deserialize(composition))
- return composition
+lowerer.merge = merge
- def label(self, composition):
- ''' label combinators with the json path '''
+# == Done lowerer
- if not isinstance(composition, Composition):
- raise ComposerError('Invalid argument', composition)
+def visit(composition, f):
+ ''' apply f to all fields of type composition '''
- def label(path):
- def labeler(composition, name, array=False):
- nonlocal path
- composition = Composition(composition)
- segment = ''
- if name is not None:
- if array:
- segment = '['+name+']'
- else:
- segment = '.'+name
+ combinator = getattr(composition, '.combinator')
+ if 'components' in combinator:
+ composition.components = composition.components.map(f)
- composition.path = path + segment
+ if 'args' in combinator:
+ for arg in combinator['args']:
+ if 'type' not in arg and arg.name in composition:
+ setattr(composition, arg.name, f(getattr(composition, arg.name), arg.name))
+ return Composition(composition)
- # label nested combinators
- composition.visit(label(composition.path))
- return composition
+def label(composition):
+ ''' recursively label combinators with the json path '''
+ def label(path):
+ def labeler(composition, name=None, array=False):
+ nonlocal path
+ segment = ''
+ if name is not None:
+ if array:
+ segment = '['+name+']'
+ else:
+ segment = '.'+name
- return labeler
-
- return label('')(composition, None, None)
-
-
- def lower(self, composition, combinators = []):
- ''' recursively label and lower combinators to the desired set of combinators (including primitive combinators) '''
- if not isinstance(composition, Composition):
- raise ComposerError('Invalid argument', composition)
-
- if combinators is False:
- return composition # no lowering
-
- if combinators is True or combinators == '':
- combinators = [] # maximal lowering
-
- # no semver in openwhisk python runtime
- # if isinstance(combinators, str): # lower to combinators of specific composer version
- # combinators = Object.keys(this.combinators).filter(key => semver.gte(combinators, this.combinators[key].since))
-
- def lower(composition, name, ignore=False):
- composition = Composition(composition) # copy
- # repeatedly lower root combinator
-
- while composition.type not in combinators and hasattr(self, '_'+composition.type):
- path = composition.path if hasattr(composition, 'path') else None
- composition = getattr(self, '_'+composition.type)(composition)
- if path is not None:
- composition.path = path
-
- # lower nested combinators
- composition.visit(lower)
+ p = path + segment
+ composition = visit(composition, label(p))
+ composition.path = p
return composition
- return lower(composition, None)
+ return labeler
-def parse_action_name(name):
+ return label('')(composition)
+
+
+def declare(combinators, prefix=None):
+ '''
+ derive combinator methods from combinator table
+ check argument count and map argument positions to argument names
+ delegate to Composition constructor for the rest of the validation
'''
- Parses a (possibly fully qualified) resource name and validates it. If it's not a fully qualified name,
- then attempts to qualify it.
+ if not isinstance(combinators, dict):
+ raise ComposerError('Invalid argument "combinators" in "declare"', combinators)
+
+ if prefix is not None and not isinstance(prefix, str):
+ raise ComposerError('Invalid argument "prefix" in "declare"', prefix)
- Examples string to namespace, [package/]action name
- foo => /_/foo
- pkg/foo => /_/pkg/foo
- /ns/foo => /ns/foo
- /ns/pkg/foo => /ns/pkg/foo
- '''
- if not isinstance(name, str):
- raise ComposerError('Name is not valid')
- name = name.strip()
- if len(name) == 0:
- raise ComposerError('Name is not specified')
+ composer = types.SimpleNamespace()
+ for key in combinators:
+ type_ = prefix + '.' + key if prefix is not None else key
+ combinator = combinators[key]
- delimiter = '/'
- parts = name.split(delimiter)
- n = len(parts)
- leadingSlash = name[0] == delimiter if len(name) > 0 else False
- # no more than /ns/p/a
- if n < 1 or n > 4 or (leadingSlash and n == 2) or (not leadingSlash and n == 4):
- raise ComposerError('Name is not valid')
+ if not isinstance(combinator, dict) or ('args' in combinator and not isinstance(combinator['args'], list)):
+ raise ComposerError('Invalid "'+type_+'" combinator specification in "declare"', combinator)
- # skip leading slash, all parts must be non empty (could tighten this check to match EntityName regex)
- for part in parts[1:]:
- if len(part.strip()) == 0:
- raise ComposerError('Name is not valid')
+ if 'args' in combinator:
+ for arg in combinator.args:
+ if not isinstance(arg['name'], str):
+ raise ComposerError('Invalid "'+type_+'" combinator specification in "declare"', combinator)
+
+ def combine(*arguments):
+ composition = { 'type': type_, '.combinator': lambda : combinator }
+ skip = len(combinator['args']) if 'args' in combinator else 0
- newName = delimiter.join(parts)
- if leadingSlash:
- return newName
- elif n < 3:
- return delimiter+'_'+delimiter+newName
- else:
- return delimiter+newName
+ if 'components' not in combinator and len(arguments) > skip:
+ raise ComposerError('Too many arguments in "'+type_+'" combinator')
+
+ for i in range(skip):
+ composition[combinator['args'][i]['name']] = arguments[i]
+
+ if 'components' in combinator:
+ composition['components'] = arguments[skip:]
+
+ return Composition(composition)
-class Compositions:
- ''' management class for compositions '''
- def __init__(self, wsk, composer):
- self.actions = wsk.actions
- self.composer = composer
+ setattr(composer, key, combine)
+
+ return composer
- def deploy(self, composition, combinators=[]):
- if not isinstance(composition, Composition):
+def serialize(obj):
+ return obj.__dict__
+
+class Composition:
+
+ def __init__(self, composition):
+ ''' construct a composition object with the specified fields '''
+ combinator = getattr(composition, '.combinator')()
+
+ # shallow copy of obj attributes
+ items = composition.items() if isinstance(composition, dict) else composition.__dict__.items() if isinstance(composition, Composition) else None
+ if items is None:
raise ComposerError('Invalid argument', composition)
-
- if composition.type != 'composition':
- raise ComposerError('Cannot deploy anonymous composition')
-
- obj = self.composer.encode(composition, combinators)
-
- if 'actions' in obj:
- for action in obj['actions']:
- try:
- self.actions.delete(action)
- except Exception:
- pass
- self.actions.update(action)
- return obj
-
-class Composer(Compiler):
- def action(self, name, options = {}):
- ''' enhanced action combinator: mangle name, capture code '''
- name = parse_action_name(name) # raise ComposerError if name is not valid
- exec = None
- if hasattr(options, 'sequence'): # native sequence
- exec = { 'kind': 'sequence', 'components': tuple(map(parse_action_name, options['sequence'])) }
-
- if hasattr(options, 'filename') and isinstance(options['filename'], str): # read action code from file
- raise ComposerError('read from file not implemented')
- # exec = fs.readFileSync(options.filename, { encoding: 'utf8' })
-
- # if (typeof options.action === 'function') { // capture function
- # exec = `const main = ${options.action}`
- # if (exec.indexOf('[native code]') !== -1) throw new ComposerError('Cannot capture native function', options.action)
- # }
-
- composition = {'type':'action', 'name':name}
-
- if hasattr(options, 'action') and (isinstance(options['action'], str) or isinstance(options['action'], dict)):
- exec = options['action']
-
- if isinstance(exec, str):
- exec = { 'kind': 'python:3', 'code': exec }
-
- if exec is not None:
- composition['exec'] = exec
-
- return Composition(composition)
-
- def openwhisk(self, options):
- ''' return enhanced openwhisk client capable of deploying compositions '''
- # try to extract apihost and key first from whisk property file file and then from os.environ
-
- wskpropsPath = os.environ['WSK_CONFIG_FILE'] if 'WSK_CONFIG_FILE' in os.environ else os.path.expanduser('~/.wskprops')
- with open(wskpropsPath) as f:
- lines = f.readlines()
-
- options = dict(options)
-
- for line in lines:
- parts = line.strip().split('=')
- if len(parts) == 2:
- if parts[0] == 'APIHOST':
- options['apihost'] = parts[1]
- elif parts[0] == 'AUTH':
- options['api_key'] = parts[1]
+ for k, v in items:
+ setattr(self, k, v)
+
+ if 'args' in combinator:
+ for arg in combinator.args:
+ if arg.name not in composition and optional and 'type' in arg:
+ continue
+ optional = getattr(arg, 'optional', False)
+ if 'type' not in arg:
+ try:
+ value = getattr(composition, arg.name, None if optional else undefined)
+ setattr(self, arg.name, composer.task(value))
+ except Exception:
+ raise ComposerError('Invalid argument "'+arg.name+'" in "'+composition.type+' combinator"', value)
+ elif arg.type == 'name':
+ try:
+ setattr(self, arg.name, parse_action_name(getattr(composition, arg.name)))
+ except ComposerError as ce:
+ raise ComposerError(ce.message + 'in "'+composition.type+' combinator"', getattr(composition, arg.name))
+ elif arg.type == 'value':
+ if arg.name not in composition or callable(getattr(composition, arg.name)):
+ raise ComposerError('Invalid argument "' + arg.name+'" in "'+ composition.type+'combinator"', getattr(composition, arg.name))
+ elif arg.type == 'object':
+ if arg.name not in composition or not isinstance(getattr(composition, arg.name), Composition):
+ raise ComposerError('Invalid argument "' + arg.name+'" in "'+ composition.type+'combinator"', getattr(composition, arg.name))
+ else:
+ if type(getattr(composition, arg.name)) != arg.type:
+ raise ComposerError('Invalid argument "' + arg.name+'" in "'+ composition.type+'combinator"', getattr(composition, arg.name))
+
+ if 'components' in combinator:
+ self.components = map(composer.task, getattr(composition, 'components', []))
- if '__OW_API_HOST' in os.environ:
- options['apihost'] = os.environ['__OW_API_HOST']
+ def __str__(self):
+ return json.dumps(self.__dict__, default=serialize, ensure_ascii=True)
- if '__OW_API_KEY' in os.environ:
- options['api_key'] = os.environ['__OW_API_KEY']
-
- import openwhisk
- wsk = openwhisk.Client(options)
- wsk.compositions = Compositions(wsk, self)
- return wsk
-
-
- def composition(self, name, composition):
- ''' enhanced composition combinator: mangle name '''
-
- if not isinstance(name, str):
- raise ComposerError('Invalid argument', name)
-
- name = parse_action_name(name)
- return Composition({'type':'composition', 'name':name, 'composition': self.task(composition)})
-
-
- def encode(self, composition, localcombinators=[]):
- ''' recursively encode composition into { composition, actions }
- by encoding nested compositions into actions and extracting nested action definitions '''
-
- if not isinstance(composition, Composition):
- raise ComposerError('Invalid argument', composition)
-
- composition = self.lower(composition, localcombinators)
-
+ def compile(self):
+ ''' compile composition. Returns a dictionary '''
actions = []
- def escape(str):
- return re.sub(r'(\n|\t|\r|\f|\v|\\|\')', lambda m:{'\n':'\\n','\t':'\\t','\r':'\\r','^\f':'\\f','\v':'\\v','\\':'\\\\','\'':'\\\''}[m.group()], str)
-
- def encode(composition, name, ignore=False):
- composition = Composition(composition)
- composition.visit(encode)
- if composition.type == 'composition':
- code = '# generated by composer v'+__version__+'\n\nimport functools\nimport json\nimport inspect\nimport re\nimport base64\nimport marshal\nimport types'
- code += '\n\n' + inspect.getsource(ComposerError)
- code += '\ncomposition=json.loads(\''+escape(str(encode(composition.composition, '')))+'\')'
-
- src = inspect.getsource(conductor)
- code += '\n'+ src[src.index('def conductor'):]
- code += '\ncombinators ='+ str(combinators)
- code += '\n' + inspect.getsource(serialize)
- code += '\n' + inspect.getsource(Composition)
- code += '\n' + inspect.getsource(get_value)
- code += '\n' + inspect.getsource(get_params)
- code += '\n' + inspect.getsource(set_params)
- code += '\n' + inspect.getsource(retain_result)
- code += '\n' + inspect.getsource(retain_nested_result)
- code += '\n' + inspect.getsource(dec_count)
- code += '\n' + inspect.getsource(set_nested_params)
- code += '\n' + inspect.getsource(get_nested_params)
- code += '\n' + inspect.getsource(set_nested_result)
- code += '\n' + inspect.getsource(get_nested_result)
- code += '\n' + inspect.getsource(retry_cond)
-
- code += '\n' + inspect.getsource(Compiler)
- code += 'def main(args):'
- code += '\n return conductor()(args)'
-
- composition.action = { 'exec': { 'kind': 'python:3', 'code':code }, 'annotations': [{ 'key': 'conductor', 'value': str(composition.composition) }, { 'key': 'composer', 'value': __version__ }] }
-
- del composition.composition
- composition.type = 'action'
-
- if composition.type == 'action' and hasattr(composition, 'action'):
+ def flatten(composition, _=None):
+ composition = visit(composition, flatten)
+ if composition.type == 'action' and hasattr(composition, 'action'): # pylint: disable=E1101
actions.append({ 'name': composition.name, 'action': composition.action })
- del composition.action
-
+ del composition.action # pylint: disable=E1101
return composition
+
+ obj = { 'composition': label(flatten(self)).lower(), 'ast': self, 'version': __version__ }
+ if len(actions) > 0:
+ obj['actions'] = actions
+ return obj
+
+ def lower(self, combinators = []):
+ ''' recursively lower combinators to the desired set of combinators (including primitive combinators) '''
+ if not isinstance(combinators, list) and not isinstance(combinators, str):
+ raise ComposerError('Invalid argument "combinators" in "lower"', combinators)
+
+ def lower(composition, _):
+ # repeatedly lower root combinator
+
+ while getattr(getattr(composition, '.combinator')(), 'def', False):
+ path = composition.path if hasattr(composition, 'path') else None
+ combinator = getattr(composition, '.combinator')()
+ if isinstance(combinator, list) and combinator.indexOf(composition.type) >= 0:
+ break
+
+ # no semver in openwhisk python runtime
+ # if isinstance(combinator, str) and getattr(combinator, 'since', False):
+ # break;
+
+ # map argument names to positions
+ args = []
+ skip = len(getattr(combinator, 'args', []))
+ for i in range(skip):
+ args.append(getattr(composition, combinator.args[i].name))
+
+ if 'components' in combinator:
+ args.extend(composition.components)
+
+ composition = combinator['def'](args)
+
+ # preserve path
+ if path is not None:
+ composition.path = path
+
+ return visit(composition, lower)
+
+ return lower(self, None)
+
+
+# primitive combinators
+combinators = {
+ 'sequence': { 'components': True, 'since': '0.4.0' },
+ 'if_nosave': { 'args': [{ 'name': 'test' }, { 'name': 'consequent' }, { 'name': 'alternate', 'optional': True }], 'since': '0.4.0' },
+ 'while_nosave': { 'args': [{ 'name': 'test' }, { 'name': 'body' }], 'since': '0.4.0' },
+ 'dowhile_nosave': { 'args': [{ 'name': 'body' }, { 'name': 'test' }], 'since': '0.4.0' },
+ 'try': { 'args': [{ 'name': 'body' }, { 'name': 'handler' }], 'since': '0.4.0' },
+ 'finally': { 'args': [{ 'name': 'body' }, { 'name': 'finalizer' }], 'since': '0.4.0' },
+ 'let': { 'args': [{ 'name': 'declarations', 'type': 'object' }], 'components': True, 'since': '0.4.0' },
+ 'mask': { 'components': True, 'since': '0.4.0' },
+ 'action': { 'args': [{ 'name': 'name', 'type': 'name' }, { 'name': 'action', 'type': 'object', 'optional': True }], 'since': '0.4.0' },
+ 'function': { 'args': [{ 'name': 'function', 'type': 'object' }], 'since': '0.4.0' },
+ 'async': { 'components': True, 'since': '0.6.0' },
+ 'execute': { 'since': '0.5.2' },
+ 'parallel': { 'components': True, 'since': '0.6.0' },
+ 'map': { 'components': True, 'since': '0.6.0' },
+ 'composition': { 'args': [{ 'name': 'name', 'type': 'name' }], 'since': '0.6.0' }
+}
+
+composer.__dict__.update(declare(combinators).__dict__)
+
+# derived combinators
+extra = {
+ 'empty': { 'since': '0.4.0', 'def': composer.sequence },
+ 'seq': { 'components': True, 'since': '0.4.0', 'def': composer.sequence },
+ 'if': { 'args': [{ 'name': 'test' }, { 'name': 'consequent' }, { 'name': 'alternate', 'optional': True }], 'since': '0.4.0', 'def': lowerer.when },
+ 'while': { 'args': [{ 'name': 'test' }, { 'name': 'body' }], 'since': '0.4.0', 'def': lowerer.loop },
+ 'dowhile': { 'args': [{ 'name': 'body' }, { 'name': 'test' }], 'since': '0.4.0', 'def': lowerer.doloop },
+ 'repeat': { 'args': [{ 'name': 'count', 'type': 'number' }], 'components': True, 'since': '0.4.0', 'def': lowerer.repeat },
+ 'retry': { 'args': [{ 'name': 'count', 'type': 'number' }], 'components': True, 'since': '0.4.0', 'def': lowerer.retry },
+ 'retain': { 'components': True, 'since': '0.4.0', 'def': lowerer.retain },
+ 'retain_catch': { 'components': True, 'since': '0.4.0', 'def': lowerer.retain_catch },
+ 'value': { 'args': [{ 'name': 'value', 'type': 'value' }], 'since': '0.4.0', 'def': lowerer.literal },
+ 'literal': { 'args': [{ 'name': 'value', 'type': 'value' }], 'since': '0.4.0', 'def': lowerer.literal },
+ 'sleep': { 'args': [{ 'name': 'ms', 'type': 'number' }], 'since': '0.5.0', 'def': lowerer.sleep },
+ 'invoke': { 'args': [{ 'name': 'req', 'type': 'object' }, { 'name': 'timeout', 'type': 'number', 'optional': True }], 'since': '0.5.0', 'def': lowerer.invoke },
+ 'par': { 'components': True, 'since': '0.8.2', 'def': composer.parallel },
+ 'merge': { 'components': True, 'since': '0.13.0', 'def': lowerer.merge }
+}
+
+composer.__dict__.update(declare(extra).__dict__)
+
+# add or override definitions of some combinators
+
+def task(task):
+ ''' detect task type and create corresponding composition object '''
+ if task is undefined:
+ raise ComposerError('Invalid argument in "task" combinator', task)
+
+ if task is None:
+ return composer.empty()
+
+ if isinstance(task, Composition):
+ return task
+
+ if callable(task):
+ return composer.function(task)
+
+ if isinstance(task, str): # python3 only
+ return composer.action(task)
+
+ raise ComposerError('Invalid argument "task" in "task" combinator', task)
+
+composer.task = task
- composition = encode(composition, None)
- return { 'composition': composition, 'actions': actions }
+def function(fun):
+ ''' function combinator: stringify def/lambda code '''
+
+ if fun.__name__ == '<lambda>':
+ exc = str(base64.b64encode(marshal.dumps(fun.__code__)), 'ASCII')
+ elif callable(fun):
+ try:
+ exc = inspect.getsource(fun)
+ except OSError:
+ raise ComposerError('Invalid argument', fun)
+ else:
+ exc = fun
+
+ if isinstance(exc, str):
+ if exc.startswith('def'):
+ # standardize function name
+ pattern = re.compile(r'def\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*\(')
+ match = pattern.match(exc)
+ functionName = match.group(1)
+
+ exc = { 'kind': 'python:3', 'code': exc, 'functionName': functionName }
+ else: # lambda
+ exc = { 'kind': 'python:3+lambda', 'code': exc }
+
+ if not isinstance(exc, dict) or exc is None:
+ raise ComposerError('Invalid argument "function" in "function" combinator', fun)
+
+ return Composition({'type':'function', 'function':{ 'exec': exc }, '.combinator': lambda: combinators['function'] })
+
+composer.function = function
+
+def action(name, options = {}):
+ ''' action combinator '''
+ if not isinstance(options, dict):
+ raise ComposerError('Invalid argument "options" in "action" combinator', options)
+ exc = None
+ if 'sequence' in options and isinstance(options['sequence'], list): # native sequence
+ exc = { 'kind': 'sequence', 'components': tuple(map(parse_action_name, options['sequence'])) }
+ elif 'filename' in options and isinstance(options['filename'], str): # read action code from file
+ raise ComposerError('read from file not implemented')
+ # exc = fs.readFileSync(options.filename, { encoding: 'utf8' })
+
+ elif 'action' in options and callable(options['action']):
+ if options['action'].__name__ == '<lambda>':
+ exc = str(base64.b64encode(marshal.dumps(options['action'].__code__)), 'ASCII')
+ else:
+ try:
+ exc = inspect.getsource(options['action'])
+ except OSError:
+ raise ComposerError('Invalid argument "options" in "action" combinator', options['action'])
+ elif 'action' in options and (isinstance(options['action'], str) or isinstance(options['action'], dict)):
+ exc = options['action']
+
+ if isinstance(exc, str):
+ exc = { 'kind': 'python:3', 'code': exc }
+
+ composition = { 'type': 'action', 'name': name, '.combinator': lambda: combinators['action']}
+ if exc is not None:
+ composition.action = exc
+
+ return Composition(composition)
diff --git a/src/composer/conductor.py b/src/composer/conductor.py
deleted file mode 100644
index 139212d..0000000
--- a/src/composer/conductor.py
+++ /dev/null
@@ -1,262 +0,0 @@
-# Copyright 2018 IBM Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import functools
-import json
-import marshal
-import base64
-import types
-
-# dummy composition and compiler
-composition = {} # will be overridden
-class Compiler:
- def lower(self, composition, combinators = []):
- pass
- def deserialize(self, composition):
- pass
- def label(self, composition):
- pass
-
-def conductor():
- global composition
- compiler = Compiler()
-
- def chain(front, back):
- front[-1]['next'] = 1
- front.extend(back)
- return front
-
- def sequence(components):
- if len(components) == 0:
- return [{ 'type': 'empty' }]
- return functools.reduce(chain, map(compile, components))
-
- def compile(json):
- path = json.path if hasattr(json, 'path') else None
- type_ = json.type
- if type_ == 'sequence':
- return chain([{ 'type': 'pass', 'path':path }], sequence(json.components))
- elif type_ == 'action':
- return [{ 'type': 'action', 'name': json.name, 'path': path }]
- elif type_ == 'function':
- return [{ 'type': 'function', 'exec': json.function['exec'], 'path':path }]
- elif type_ == 'finally':
- body = compile(json.body)
- finalizer = compile(json.finalizer)
- fsm = functools.reduce(chain, [[{'type': 'try', 'path': path}], body, [{ 'type': 'exit' }], finalizer])
- fsm[0]['catch'] = len(fsm) - len(finalizer)
- return fsm
- elif type_ == 'let':
- body = sequence(json.components)
- return functools.reduce(chain, [[{ 'type': 'let', 'let': json.declarations, 'path':path }], body, [{ 'type': 'exit' }]])
- elif type_ == 'mask':
- body = sequence(json.components)
- return functools.reduce(chain, [[{ 'type': 'let', 'let': None, 'path': path }], body, [{ 'type': 'exit' }]])
- elif type_ == 'try':
- body = compile(json.body)
- handler = chain(compile(json.handler), [{ 'type': 'pass' }])
- fsm = functools.reduce(chain, [[{ 'type': 'try', 'path':path }], body, [{ 'type': 'exit' }]])
- fsm[0]['catch'] = len(fsm)
- fsm[-1]['next'] = len(handler)
- fsm.extend(handler)
- return fsm
- elif type_ == 'if_nosave':
- consequent = compile(json.consequent)
- alternate = chain(compile(json.alternate), [{ 'type': 'pass' }])
- fsm = functools.reduce(chain, [[{ 'type': 'pass', 'path':path }], compile(json.test), [{ 'type': 'choice', 'then': 1, 'else': len(consequent) + 1 }]])
- consequent[-1]['next'] = len(alternate)
- fsm.extend(consequent)
- fsm.extend(alternate)
- return fsm
- elif type_ == 'while_nosave':
- consequent = compile(json.body)
- alternate = [{ 'type': 'pass' }]
- fsm = functools.reduce(chain, [[{ 'type': 'pass', 'path':path }], compile(json.test), [{ 'type': 'choice', 'then': 1, 'else': len(consequent) + 1 }]])
- consequent[-1]['next'] = 1 - len(fsm) - len(consequent)
- fsm.extend(consequent)
- fsm.extend(alternate)
- return fsm
- elif type_ == 'dowhile_nosave':
- test = compile(json.test)
- fsm = functools.reduce(chain, [[{ 'type': 'pass', 'path':path }], compile(json.body), test, [{ 'type': 'choice', 'then': 1, 'else': 2 }]])
- fsm[-1]['then'] = 1 - len(fsm)
- fsm[-1]['else'] = 1
- alternate = [{ 'type': 'pass' }]
- fsm.extend(alternate)
- return fsm
-
- fsm = compile(compiler.lower(compiler.label(compiler.deserialize(composition))))
-
- isObject = lambda x: isinstance(x, dict)
-
- def encodeError(error):
- if isinstance(error, str) or not hasattr(error, "__getitem__"):
- return {
- 'code': 500,
- 'error': error
- }
- else:
- return {
- 'code': error['code'] if isinstance(error['code'], int) else 500,
- 'error': error['error'] if isinstance(error['error'], str) else (error['message'] if 'message' in error else 'An internal error occurred')
- }
-
- # error status codes
- badRequest = lambda error: { 'code': 400, 'error': error }
- internalError = lambda error: encodeError(error)
-
- def guarded_invoke(params):
- try:
- return invoke(params)
- except Exception as err:
- return internalError(err)
-
- def invoke(params):
- ''' do invocation '''
- # initial state and stack
- state = 0
- stack = []
-
- # wrap params if not a dictionary, branch to error handler if error
- def inspect_errors():
- nonlocal params
- nonlocal state
- nonlocal stack
- params = params if isObject(params) else { 'value': params }
- if 'error' in params:
- params = { 'error': params['error'] } # discard all fields but the error field
- state = None # abort unless there is a handler in the stack
- while len(stack) > 0:
- first = stack[0]
- stack = stack[1:]
- if 'catch' in first:
- state = first['catch']
- if isinstance(state, int):
- break
-
-
- # restore state and stack when resuming
- if '$resume' in params:
- if not isObject(params['$resume']):
- return badRequest('The type of optional $resume parameter must be object')
- if not 'state' in params['$resume'] and not isinstance(params['$resume']['state'], int):
- return badRequest('The type of optional $resume["state"] parameter must be number')
- state = params['$resume']['state']
- stack = params['$resume']['stack']
- if not isinstance(stack, list):
- return badRequest('The type of $resume["stack"] must be an array')
- del params['$resume']
- inspect_errors() # handle error objects when resuming
-
- # run function f on current stack
- def run(f, kind, functionName=None):
- # handle let/mask pairs
- view = []
- n = 0
- for frame in stack:
- if 'let' in frame and frame['let'] is None:
- n += 1
- elif 'let' in frame:
- if n == 0:
- view.append(frame)
- else:
- n -= 1
-
-
- # update value of topmost matching symbol on stack if any
- def set(symbol, value):
- lets = [element for element in view if 'let' in element and symbol in element['let']]
- if len(lets) > 0:
- element = lets[0]
- element['let'][symbol] = value # TODO: JSON.parse(JSON.stringify(value))
-
-
- def reduceRight(func, init, seq):
- if not seq:
- return init
- else:
- return func(reduceRight(func, init, seq[1:]), seq[0])
-
- def update(dict, dict2):
- dict.update(dict2)
- return dict
-
- # collapse stack for invocation
- env = reduceRight(lambda acc, cur: update(acc, cur['let']) if 'let' in cur and isinstance(cur['let'], dict) else acc, {}, view)
- if kind == 'python:3':
- main = '''exec(code + "\\n__out__['value'] = ''' + functionName + '''(env, args)", {'env': env, 'args': args, '__out__':__out__})'''
- code = f
- else: # lambda
- main = '''__out__['value'] = code(env, args)'''
- code = types.FunctionType(marshal.loads(base64.b64decode(bytearray(f, 'ASCII'))), {})
-
- try:
- out = {'value': None}
- exec(main, {'env': env, 'args': params, 'code': code, '__out__': out})
- return out['value']
- finally:
- for name in env:
- set(name, env[name])
-
- while True:
- # final state, return composition result
- if state is None:
- print('Entering final state')
- print(json.dumps(params))
- if 'error' in params:
- return params
- else:
- return { 'params': params }
-
- # process one state
- jsonv = fsm[state] # jsonv definition for current state
- if 'path' in jsonv:
- print('Entering composition'+jsonv['path'])
- current = state
- state = current + jsonv['next'] if 'next' in jsonv else None # default next state
- if jsonv['type'] == 'choice':
- state = current + (jsonv['then'] if params['value'] else jsonv['else'])
- elif jsonv['type'] == 'try':
- stack.insert(0, { 'catch': current + jsonv['catch'] })
- elif jsonv['type'] == 'let':
- stack.insert(0, { 'let': jsonv['let'] }) # JSON.parse(JSON.stringify(jsonv.let))
- elif jsonv['type'] == 'exit':
- if len(stack) == 0:
- return internalError('State '+str(current)+' attempted to pop from an empty stack')
- stack = stack[1:]
- elif jsonv['type'] == 'action':
- return { 'action': jsonv['name'], 'params': params, 'state': { '$resume': { 'state': state, 'stack': stack } } } # invoke continuation
- elif jsonv['type'] == 'function':
- result = None
- try:
- functionName = jsonv['exec']['functionName'] if 'functionName' in jsonv['exec'] else None
- result = run(jsonv['exec']['code'], jsonv['exec']['kind'], functionName)
- except Exception as error:
- print(error)
- result = { 'error': 'An exception was caught at state '+str(current)+' (see log for details)' }
-
- if callable(result):
- result = { 'error': 'State '+str(current)+' evaluated to a function' }
-
- # if a function has only side effects and no return value (or return None), return params
- params = params if result is None else result
- inspect_errors()
- elif jsonv['type'] == 'empty':
- inspect_errors()
- elif jsonv['type'] == 'pass':
- pass
- else:
- return internalError('State '+str(current)+ 'has an unknown type')
-
- return guarded_invoke
-
diff --git a/src/composer/fqn.py b/src/composer/fqn.py
new file mode 100644
index 0000000..b6dfef5
--- /dev/null
+++ b/src/composer/fqn.py
@@ -0,0 +1,44 @@
+
+def parse_action_name(name):
+ '''
+ Parses a (possibly fully qualified) resource name and validates it. If it's not a fully qualified name,
+ then attempts to qualify it.
+
+ Examples string to namespace, [package/]action name
+ foo => /_/foo
+ pkg/foo => /_/pkg/foo
+ /ns/foo => /ns/foo
+ /ns/pkg/foo => /ns/pkg/foo
+ '''
+ if not isinstance(name, str):
+ raise ComposerError('Name is not valid')
+ name = name.strip()
+ if len(name) == 0:
+ raise ComposerError('Name is not specified')
+
+ delimiter = '/'
+ parts = name.split(delimiter)
+ n = len(parts)
+ leadingSlash = name[0] == delimiter if len(name) > 0 else False
+ # no more than /ns/p/a
+ if n < 1 or n > 4 or (leadingSlash and n == 2) or (not leadingSlash and n == 4):
+ raise ComposerError('Name is not valid')
+
+ # skip leading slash, all parts must be non empty (could tighten this check to match EntityName regex)
+ for part in parts[1:]:
+ if len(part.strip()) == 0:
+ raise ComposerError('Name is not valid')
+
+ newName = delimiter.join(parts)
+ if leadingSlash:
+ return newName
+ elif n < 3:
+ return delimiter+'_'+delimiter+newName
+ else:
+ return delimiter+newName
+
+class ComposerError(Exception):
+ def __init__(self, message, *arguments):
+ self.message = message
+ self.argument = arguments
+
\ No newline at end of file