blob: 31cd46ce2aef93b327f856b009901f19262004d7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from dataclasses import dataclass
from datetime import timedelta
from keyword import iskeyword, kwlist
@dataclass(repr=True, eq=True, order=False, frozen=True)
class SdkAddress:
namespace: str
name: str
id: str
typename: str
class TypeSerializer(metaclass=abc.ABCMeta):
__slots__ = ()
A base class for a TypeSerializer. A TypeSerializer is responsible of serialising and deserializing a specific
value type.
def serialize(self, value) -> bytes:
Serialize the given value to bytes.
:param value: the value to serialize.
:return: a byte representation of the given value.
def deserialize(self, byte: bytes):
Deserialize a value from the given byte representation.
:param byte: the bytes that represent the value to deseralize.
:return: the deserialized value.
class Type(metaclass=abc.ABCMeta):
__slots__ = ("typename",)
A base representation of a Stateful Function value type.
def __init__(self, typename: str):
:param typename: a TypeName represented as a string of the form <namespace>/<name> of this type.
if not typename:
raise ValueError("typename can not be missing")
self.typename = typename
def serializer(self) -> TypeSerializer:
:return: a serializer for this type.
class ValueSpec(object):
__slots__ = ("name", "type", "duration", "after_call", "after_write")
def __init__(self,
name: str,
type: Type,
expire_after_call: timedelta = None,
expire_after_write: timedelta = None):
ValueSpec a specification of a persisted value.
:param name: a user defined state name, used to represent this value. A name must be lower case, alphanumeric string
without spaces, that starts with either a letter or an underscore (_)
:param type: the value's Type. (see
statefun.Type) :param expire_after_call: expire (remove) this value if a call to this function hasn't been
made for the given duration.
:param expire_after_write: expire this value if it wasn't written to for the given duration.
if not name:
raise ValueError("name can not be missing.")
if not name.isidentifier():
raise ValueError(
f"invalid name {name}. A spec name can only contains alphanumeric letters (a-z) and (0-9), "
f"or underscores ( "
f"_). A valid identifier cannot start with a number, or contain any spaces.")
if iskeyword(name):
forbidden = '\n'.join(kwlist)
raise ValueError(
f"invalid spec name {name} (Python SDK specifically). since {name} will result as an attribute on "
f"The following names are forbidden:\n {forbidden}")
if not name.islower():
raise ValueError(f"Only lower case names are allowed, {name} is given.") = name
if not type:
raise ValueError("type can not be missing.")
if not isinstance(type, Type):
raise TypeError("type is not a StateFun type.")
self.type = type
if expire_after_call and expire_after_write:
# both can not be set.
raise ValueError("Either expire_after_call or expire_after_write can be set, but not both.")
if expire_after_call:
self.duration = int(expire_after_call.total_seconds() * 1000.0)
self.after_call = True
self.after_write = False
elif expire_after_write:
self.duration = int(expire_after_write.total_seconds() * 1000.0)
self.after_call = False
self.after_write = True
self.duration = 0
self.after_call = False
self.after_write = False
def parse_typename(typename):
Parse a TypeName string into a namespace, type pair.
:param typename: a string of the form <namespace>/<type>
:return: a tuple of a namespace type.
if typename is None:
raise ValueError("function type must be provided")
idx = typename.rfind("/")
if idx < 0:
raise ValueError("function type must be of the from namespace/name")
namespace = typename[:idx]
if not namespace:
raise ValueError("function type's namespace must not be empty")
type = typename[idx + 1:]
if not type:
raise ValueError("function type's name must not be empty")
return namespace, type
def simple_type(typename=None, serialize_fn=None, deserialize_fn=None) -> Type:
Create a user defined Type, simply by providing two functions. One for serializing a value to bytes, and one for
deserializing the value from bytes.
For example:
import json
tpe = simple_type("", serialize_fn=json.dumps, deserialize_fn=json.loads)
this defines a StateFun type (Type) of the typename and it is a JSON object.
:param typename: this type's TypeName.
:param serialize_fn: a function that is able to serialize values of this type.
:param deserialize_fn: a function that is able to deserialize bytes into values of this type.
:return: a Type definition.
return SimpleType(typename, serialize_fn, deserialize_fn)
# ----------------------------------------------------------------------------------------------------------
# Internal
# ----------------------------------------------------------------------------------------------------------
class SimpleType(Type):
__slots__ = ("typename", "_ser")
def __init__(self, typename, serialize_fn, deserialize_fn):
if not serialize_fn:
raise ValueError("serialize_fn is missing")
if not deserialize_fn:
raise ValueError("deserialize_fn is missing")
self._ser = UserTypeSerializer(serialize_fn, deserialize_fn)
def serializer(self) -> TypeSerializer:
return self._ser
class UserTypeSerializer(TypeSerializer):
__slots__ = ("serialize_fn", "deserialize_fn")
def __init__(self, serialize_fn, deserialize_fn):
self.serialize_fn = serialize_fn
self.deserialize_fn = deserialize_fn
def serialize(self, value):
return self.serialize_fn(value)
def deserialize(self, byte_string):
return self.deserialize_fn(byte_string)