blob: ade3b635cd93bd1422fb02724d300fc235894d37 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import typing
from statefun.core import ValueSpec
from statefun.context import Context
from statefun.messages import Message
from statefun.storage import make_address_storage_spec, StorageSpec
import inspect
class StatefulFunction(object):
__slots__ = ("fun", "storage_spec", "is_async")
def __init__(self,
fun: typing.Callable[[Context, Message], None],
specs: StorageSpec,
is_async: bool):
if fun is None:
raise ValueError("function code is missing.")
self.fun = fun
if specs is None:
raise ValueError("storage spec is missing.")
self.storage_spec = specs
self.is_async = is_async
class StatefulFunctions(object):
__slots__ = ("_functions",)
def __init__(self):
self._functions = {}
def register(self, typename: str, fun, specs: typing.Optional[typing.List[ValueSpec]] = None):
"""registers a StatefulFunction function instance, under the given namespace with the given function type. """
if fun is None:
raise ValueError("function instance must be provided")
if not typename:
raise ValueError("function typename must be provided")
storage_spec = make_address_storage_spec(specs if specs else [])
is_async = inspect.iscoroutinefunction(fun)
sig = inspect.getfullargspec(fun)
if len(sig.args) != 2:
raise ValueError(
f"The registered function ${typename} does not expect a context and a message but rather {sig.args}.")
self._functions[typename] = StatefulFunction(fun=fun, specs=storage_spec, is_async=is_async)
def bind(self, typename, specs: typing.List[ValueSpec] = None):
"""wraps a StatefulFunction instance with a given namespace and type.
for example:
s = StatefulFunctions()
@s.define("com.foo.bar/greeter")
def greeter(context, message):
print("Hi there")
This would add an invokable stateful function that can accept messages
sent to "com.foo.bar/greeter".
"""
def wrapper(function):
self.register(typename, function, specs)
return function
return wrapper
def for_typename(self, typename: str) -> StatefulFunction:
return self._functions[typename]