[FLINK-21692] Update the Python SDK

This closes #210.
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
index 1cafcf1..3e877ee 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
@@ -25,9 +25,6 @@
 message Invoke {
 }
 
-message InvokeCount {
-    int32 count = 1;
-}
 
 message InvokeResult {
     string id = 1;
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
index fa94f0d..0c9f017 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
@@ -16,63 +16,58 @@
 # limitations under the License.
 ################################################################################
 
-from remote_module_verification_pb2 import Invoke, InvokeResult, InvokeCount
-
-from statefun import StatefulFunctions
-from statefun import StateSpec
-from statefun import RequestReplyHandler
-from statefun import kafka_egress_record
-
 import uuid
+from statefun import *
+
+from remote_module_verification_pb2 import Invoke, InvokeResult
+
+InvokeType = make_protobuf_type(namespace="statefun.e2e", cls=Invoke)
+InvokeResultType = make_protobuf_type(namespace="statefun.e2e", cls=InvokeResult)
 
 functions = StatefulFunctions()
 
 
 @functions.bind(
     typename="org.apache.flink.statefun.e2e.remote/counter",
-    states=[StateSpec('invoke_count')])
-def counter(context, invoke: Invoke):
+    specs=[ValueSpec(name='invoke_count', type=IntType)])
+def counter(context, message):
     """
     Keeps count of the number of invocations, and forwards that count
     to be sent to the Kafka egress. We do the extra forwarding instead
     of directly sending to Kafka, so that we cover inter-function
     messaging in our E2E test.
     """
-    invoke_count = context.state('invoke_count').unpack(InvokeCount)
-    if not invoke_count:
-        invoke_count = InvokeCount()
-        invoke_count.count = 1
-    else:
-        invoke_count.count += 1
-    context.state('invoke_count').pack(invoke_count)
+    n = context.storage.invoke_count or 0
+    n += 1
+    context.storage.invoke_count = n
 
     response = InvokeResult()
-    response.id = context.address.identity
-    response.invoke_count = invoke_count.count
+    response.id = context.address.id
+    response.invoke_count = n
 
-    context.pack_and_send(
-        "org.apache.flink.statefun.e2e.remote/forward-function",
-        # use random keys to simulate both local handovers and
-        # cross-partition messaging via the feedback loop
-        uuid.uuid4().hex,
-        response
-    )
+    context.send(
+        message_builder(target_typename="org.apache.flink.statefun.e2e.remote/forward-function",
+                        # use random keys to simulate both local handovers and
+                        # cross-partition messaging via the feedback loop
+                        target_id=uuid.uuid4().hex,
+                        value=response,
+                        value_type=InvokeResultType))
 
 
 @functions.bind("org.apache.flink.statefun.e2e.remote/forward-function")
-def forward_to_egress(context, invoke_result: InvokeResult):
+def forward_to_egress(context, message):
     """
     Simply forwards the results to the Kafka egress.
     """
-    egress_message = kafka_egress_record(
+    invoke_result = message.as_type(InvokeResultType)
+
+    egress_message = kafka_egress_message(
+        typename="org.apache.flink.statefun.e2e.remote/invoke-results",
         topic="invoke-results",
         key=invoke_result.id,
-        value=invoke_result
-    )
-    context.pack_and_send_egress(
-        "org.apache.flink.statefun.e2e.remote/invoke-results",
-        egress_message
-    )
+        value=invoke_result,
+        value_type=InvokeResultType)
+    context.send_egress(egress_message)
 
 
 handler = RequestReplyHandler(functions)
@@ -90,7 +85,7 @@
 
 @app.route('/service', methods=['POST'])
 def handle():
-    response_data = handler(request.data)
+    response_data = handler.handle_sync(request.data)
     response = make_response(response_data)
     response.headers.set('Content-Type', 'application/octet-stream')
     return response
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
index 7eb3ffe..f013414 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM python:3.7-alpine
+FROM python:3.8-alpine
 
 RUN mkdir -p /app
 WORKDIR /app
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
index 1925210..8aca93c 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
@@ -40,7 +40,7 @@
               type: earliest
             topics:
               - topic: invoke
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.remote.Invoke
+                typeUrl: statefun.e2e/org.apache.flink.statefun.e2e.remote.Invoke
                 targets:
                   - org.apache.flink.statefun.e2e.remote/counter
     egresses:
diff --git a/statefun-examples/statefun-async-python-example/generator/Dockerfile b/statefun-examples/statefun-async-python-example/generator/Dockerfile
index c3cf87c..5bcca13 100644
--- a/statefun-examples/statefun-async-python-example/generator/Dockerfile
+++ b/statefun-examples/statefun-async-python-example/generator/Dockerfile
@@ -19,11 +19,9 @@
 RUN mkdir -p /app
 WORKDIR /app
 
-RUN pip install protobuf
 RUN pip install kafka-python
 
 COPY event-generator.py /app
-COPY messages_pb2.py /app
 
 CMD ["python", "/app/event-generator.py"]
 
diff --git a/statefun-examples/statefun-async-python-example/generator/event-generator.py b/statefun-examples/statefun-async-python-example/generator/event-generator.py
index 385ef7d..ff0a45b 100644
--- a/statefun-examples/statefun-async-python-example/generator/event-generator.py
+++ b/statefun-examples/statefun-async-python-example/generator/event-generator.py
@@ -20,13 +20,8 @@
 import sys
 import time
 import threading
-
 import random
-
 from kafka.errors import NoBrokersAvailable
-
-from messages_pb2 import GreetRequest, GreetResponse
-
 from kafka import KafkaProducer
 from kafka import KafkaConsumer
 
@@ -37,9 +32,7 @@
 def random_requests():
     """Generate infinite sequence of random GreetRequests."""
     while True:
-        request = GreetRequest()
-        request.name = random.choice(NAMES)
-        yield request
+        yield random.choice(NAMES)
 
 
 def produce():
@@ -48,9 +41,9 @@
     else:
         delay_seconds = 1
     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
-    for request in random_requests():
-        key = request.name.encode('utf-8')
-        val = request.SerializeToString()
+    for name in random_requests():
+        key = name.encode('utf-8')
+        val = key
         producer.send(topic='names', key=key, value=val)
         producer.flush()
         time.sleep(delay_seconds)
@@ -63,9 +56,9 @@
         auto_offset_reset='earliest',
         group_id='event-gen')
     for message in consumer:
-        response = GreetResponse()
-        response.ParseFromString(message.value)
-        print("%s:\t%s" % (response.name, response.greeting), flush=True)
+        who = message.key.decode('utf-8')
+        greeting = message.value.decode('utf-8')
+        print(f"{who}\t{greeting}", flush=True)
 
 
 def handler(number, frame):
diff --git a/statefun-examples/statefun-async-python-example/generator/messages_pb2.py b/statefun-examples/statefun-async-python-example/generator/messages_pb2.py
deleted file mode 100644
index bce4d24..0000000
--- a/statefun-examples/statefun-async-python-example/generator/messages_pb2.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# -*- coding: utf-8 -*-
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# source: messages.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
-  name='messages.proto',
-  package='example',
-  syntax='proto3',
-  serialized_options=None,
-  serialized_pb=_b('\n\x0emessages.proto\x12\x07\x65xample\"\x1c\n\x0cGreetRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"/\n\rGreetResponse\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08greeting\x18\x02 \x01(\t\"\x19\n\tSeenCount\x12\x0c\n\x04seen\x18\x01 \x01(\x03\x62\x06proto3')
-)
-
-
-
-
-_GREETREQUEST = _descriptor.Descriptor(
-  name='GreetRequest',
-  full_name='example.GreetRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetRequest.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=27,
-  serialized_end=55,
-)
-
-
-_GREETRESPONSE = _descriptor.Descriptor(
-  name='GreetResponse',
-  full_name='example.GreetResponse',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetResponse.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='greeting', full_name='example.GreetResponse.greeting', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=57,
-  serialized_end=104,
-)
-
-
-_SEENCOUNT = _descriptor.Descriptor(
-  name='SeenCount',
-  full_name='example.SeenCount',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='seen', full_name='example.SeenCount.seen', index=0,
-      number=1, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=106,
-  serialized_end=131,
-)
-
-DESCRIPTOR.message_types_by_name['GreetRequest'] = _GREETREQUEST
-DESCRIPTOR.message_types_by_name['GreetResponse'] = _GREETRESPONSE
-DESCRIPTOR.message_types_by_name['SeenCount'] = _SEENCOUNT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-GreetRequest = _reflection.GeneratedProtocolMessageType('GreetRequest', (_message.Message,), dict(
-  DESCRIPTOR = _GREETREQUEST,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetRequest)
-  ))
-_sym_db.RegisterMessage(GreetRequest)
-
-GreetResponse = _reflection.GeneratedProtocolMessageType('GreetResponse', (_message.Message,), dict(
-  DESCRIPTOR = _GREETRESPONSE,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetResponse)
-  ))
-_sym_db.RegisterMessage(GreetResponse)
-
-SeenCount = _reflection.GeneratedProtocolMessageType('SeenCount', (_message.Message,), dict(
-  DESCRIPTOR = _SEENCOUNT,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.SeenCount)
-  ))
-_sym_db.RegisterMessage(SeenCount)
-
-
-# @@protoc_insertion_point(module_scope)
diff --git a/statefun-examples/statefun-async-python-example/greeter/Dockerfile b/statefun-examples/statefun-async-python-example/greeter/Dockerfile
index dfbf4f7..6e0b22c 100644
--- a/statefun-examples/statefun-async-python-example/greeter/Dockerfile
+++ b/statefun-examples/statefun-async-python-example/greeter/Dockerfile
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM python:3.7-slim-buster
+FROM python:3.9-slim-buster
 
 RUN mkdir -p /app
 WORKDIR /app
@@ -26,9 +26,8 @@
 RUN pip install -r requirements.txt
 
 COPY greeter.py /app
-COPY messages_pb2.py /app
 
 EXPOSE 8000
 
-CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w", "4", "--worker-class", "aiohttp.GunicornWebWorker", "greeter:app"]
+CMD ["python3", "/app/greeter.py"]
 
diff --git a/statefun-examples/statefun-async-python-example/greeter/greeter.py b/statefun-examples/statefun-async-python-example/greeter/greeter.py
index 80f1552..be0f97d 100644
--- a/statefun-examples/statefun-async-python-example/greeter/greeter.py
+++ b/statefun-examples/statefun-async-python-example/greeter/greeter.py
@@ -15,34 +15,29 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from messages_pb2 import SeenCount, GreetRequest, GreetResponse
 
-from statefun import StatefulFunctions
-from statefun import StateSpec
-from statefun import AsyncRequestReplyHandler
-from statefun import kafka_egress_record
-
+from statefun import *
 import asyncio
 
 functions = StatefulFunctions()
 
-
 @functions.bind(
     typename="example/greeter",
-    states=[StateSpec('seen_count')])
-async def greet(context, greet_request: GreetRequest):
-    state = context.state('seen_count').unpack(SeenCount)
-    if not state:
-        state = SeenCount()
-        state.seen = 1
+    specs=[ValueSpec(name='seen_count', type=IntType)])
+async def greet(context, greet_request):
+    storage = context.storage
+
+    seen = storage.seen_count
+    if not seen:
+        seen = 1
     else:
-        state.seen += 1
-    context.state('seen_count').pack(state)
+        seen += 1
+    storage.seen_count = seen
 
-    response = await compute_greeting(greet_request.name, state.seen)
-
-    egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
-    context.pack_and_send_egress("example/greets", egress_message)
+    who = context.address.id # the person name whom we want to great, is the id part of our address.
+    response = await compute_greeting(who, seen)
+    egress_message = kafka_egress_message(typename="example/greets", topic="greetings", key=who, value=response)
+    context.send_egress(egress_message)
 
 
 async def compute_greeting(name, seen):
@@ -53,18 +48,13 @@
     if seen < len(templates):
         greeting = templates[seen] % name
     else:
-        greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
+        greeting = f"Nice to see you at the {seen}-nth time {name}!"
 
     await asyncio.sleep(1)
-
-    response = GreetResponse()
-    response.name = name
-    response.greeting = greeting
-
-    return response
+    return greeting
 
 
-handler = AsyncRequestReplyHandler(functions)
+handler = RequestReplyHandler(functions)
 
 #
 # Serve the endpoint
@@ -72,15 +62,15 @@
 
 from aiohttp import web
 
-handler = AsyncRequestReplyHandler(functions)
+handler = RequestReplyHandler(functions)
 
 async def handle(request):
     req = await request.read()
-    res = await handler(req)
+    res = await handler.handle_async(req)
     return web.Response(body=res, content_type="application/octet-stream")
 
 app = web.Application()
 app.add_routes([web.post('/statefun', handle)])
 
 if __name__ == '__main__':
-    web.run_app(app, port=5000)
+    web.run_app(app, port=8000)
diff --git a/statefun-examples/statefun-async-python-example/greeter/messages.proto b/statefun-examples/statefun-async-python-example/greeter/messages.proto
deleted file mode 100644
index 3dc1a50..0000000
--- a/statefun-examples/statefun-async-python-example/greeter/messages.proto
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-syntax = "proto3";
-
-// protoc *.proto --python_out=.
-
-package example;
-
-// External request sent by a user who wants to be greeted
-message GreetRequest {
-    // The name of the user to greet
-    string name = 1;
-}
-
-// A customized response sent to the user
-message GreetResponse {
-    // The name of the user being greeted
-    string name = 1;
-    // The users customized greeting
-    string greeting = 2;
-}
-
-// An internal message used to store state
-message SeenCount {
-    // The number of times a users has been seen so far
-    int64 seen = 1;
-}
-
-
diff --git a/statefun-examples/statefun-async-python-example/greeter/messages_pb2.py b/statefun-examples/statefun-async-python-example/greeter/messages_pb2.py
deleted file mode 100644
index bce4d24..0000000
--- a/statefun-examples/statefun-async-python-example/greeter/messages_pb2.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# -*- coding: utf-8 -*-
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# source: messages.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
-  name='messages.proto',
-  package='example',
-  syntax='proto3',
-  serialized_options=None,
-  serialized_pb=_b('\n\x0emessages.proto\x12\x07\x65xample\"\x1c\n\x0cGreetRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"/\n\rGreetResponse\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08greeting\x18\x02 \x01(\t\"\x19\n\tSeenCount\x12\x0c\n\x04seen\x18\x01 \x01(\x03\x62\x06proto3')
-)
-
-
-
-
-_GREETREQUEST = _descriptor.Descriptor(
-  name='GreetRequest',
-  full_name='example.GreetRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetRequest.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=27,
-  serialized_end=55,
-)
-
-
-_GREETRESPONSE = _descriptor.Descriptor(
-  name='GreetResponse',
-  full_name='example.GreetResponse',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetResponse.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='greeting', full_name='example.GreetResponse.greeting', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=57,
-  serialized_end=104,
-)
-
-
-_SEENCOUNT = _descriptor.Descriptor(
-  name='SeenCount',
-  full_name='example.SeenCount',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='seen', full_name='example.SeenCount.seen', index=0,
-      number=1, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=106,
-  serialized_end=131,
-)
-
-DESCRIPTOR.message_types_by_name['GreetRequest'] = _GREETREQUEST
-DESCRIPTOR.message_types_by_name['GreetResponse'] = _GREETRESPONSE
-DESCRIPTOR.message_types_by_name['SeenCount'] = _SEENCOUNT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-GreetRequest = _reflection.GeneratedProtocolMessageType('GreetRequest', (_message.Message,), dict(
-  DESCRIPTOR = _GREETREQUEST,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetRequest)
-  ))
-_sym_db.RegisterMessage(GreetRequest)
-
-GreetResponse = _reflection.GeneratedProtocolMessageType('GreetResponse', (_message.Message,), dict(
-  DESCRIPTOR = _GREETRESPONSE,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetResponse)
-  ))
-_sym_db.RegisterMessage(GreetResponse)
-
-SeenCount = _reflection.GeneratedProtocolMessageType('SeenCount', (_message.Message,), dict(
-  DESCRIPTOR = _SEENCOUNT,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.SeenCount)
-  ))
-_sym_db.RegisterMessage(SeenCount)
-
-
-# @@protoc_insertion_point(module_scope)
diff --git a/statefun-examples/statefun-async-python-example/greeter/requirements.txt b/statefun-examples/statefun-async-python-example/greeter/requirements.txt
index 0a84e21..0796d9b 100644
--- a/statefun-examples/statefun-async-python-example/greeter/requirements.txt
+++ b/statefun-examples/statefun-async-python-example/greeter/requirements.txt
@@ -15,6 +15,4 @@
 # limitations under the License.
 
 aiohttp
-flask==1.1.1
-gunicorn==20.0.4
 apache-flink-statefun
diff --git a/statefun-examples/statefun-python-greeter-example/Dockerfile b/statefun-examples/statefun-python-greeter-example/Dockerfile
deleted file mode 100644
index d35d2af..0000000
--- a/statefun-examples/statefun-python-greeter-example/Dockerfile
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.3-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/greeter
-ADD module.yaml /opt/statefun/modules/greeter
-
-
diff --git a/statefun-examples/statefun-python-greeter-example/README.md b/statefun-examples/statefun-python-greeter-example/README.md
deleted file mode 100644
index aee4826..0000000
--- a/statefun-examples/statefun-python-greeter-example/README.md
+++ /dev/null
@@ -1,32 +0,0 @@
-# The Greeter Example
-
-This is a simple example that runs a simple stateful function that accepts requests from a Kafka ingress,
-and then responds by sending greeting responses to a Kafka egress. It demonstrates the primitive building blocks
-of a Stateful Functions applications, such as ingresses, handling state in functions,
-and sending messages to egresses.
-
-
-## Building the example
-
-1) Make sure that you have built the Python distribution
-   To build the distribution
-    -  `cd statefun-python-sdk/`
-    -  `./build-distribution.sh`
-    
-2) Run `./build-example.sh` 
-
-## Running the example
-
-To run the example:
-
-```
-./build-example.sh
-docker-compose up -d
-```
-
-Then, to see the example in actions, see what comes out of the topic `greetings`:
-
-```
-docker-compose logs -f event-generator 
-```
-
diff --git a/statefun-examples/statefun-python-greeter-example/build-example.sh b/statefun-examples/statefun-python-greeter-example/build-example.sh
deleted file mode 100755
index 9440c5d..0000000
--- a/statefun-examples/statefun-python-greeter-example/build-example.sh
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# clean
-rm -f apache_flink_statefun-*-py3-none-any.whl
-rm -rf __pycache__
-
-# copy the whl distribution
-cp ../../statefun-python-sdk/dist/apache_flink_statefun-*-py3-none-any.whl greeter/apache_flink_statefun-snapshot-py3-none-any.whl 2>/dev/null
-rc=$?
-if [[ ${rc} -ne 0 ]]; then
-    echo "Failed copying the whl distribution, please build the Python distribution first."
-    echo "To build the distribution:"
-    echo "  goto to statefun-python-sdk/"
-    echo "  call ./build-distribution.sh"
-    exit 1;
-fi
-
-# build
-
-docker-compose build
-
-rm -f greeter/apache_flink_statefun-*-py3-none-any.whl
-
-echo "Done. To start the example run: docker-compose up"
-
diff --git a/statefun-examples/statefun-python-greeter-example/docker-compose.yml b/statefun-examples/statefun-python-greeter-example/docker-compose.yml
deleted file mode 100644
index 501e36b..0000000
--- a/statefun-examples/statefun-python-greeter-example/docker-compose.yml
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: "2.1"
-services:
-  zookeeper:
-    image: wurstmeister/zookeeper
-    ports:
-      - "2181:2181"
-  kafka-broker:
-    image: wurstmeister/kafka:2.12-2.0.1
-    ports:
-      - "9092:9092"
-    environment:
-      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
-      KAFKA_CREATE_TOPICS: "names:1:1,greetings:1:1"
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-    depends_on:
-      - zookeeper
-    volumes:
-      - /var/run/docker.sock:/var/run/docker.sock
-  master:
-    build:
-      context: .
-    expose:
-      - "6123"
-    ports:
-      - "8081:8081"
-    environment:
-      - ROLE=master
-      - MASTER_HOST=master
-    volumes:
-      - ./checkpoint-dir:/checkpoint-dir
-  worker:
-    build:
-      context: .
-    expose:
-      - "6121"
-      - "6122"
-    depends_on:
-      - master
-      - kafka-broker
-    links:
-      - "master:master"
-      - "kafka-broker:kafka-broker"
-    environment:
-      - ROLE=worker
-      - MASTER_HOST=master
-    volumes:
-      - ./checkpoint-dir:/checkpoint-dir
-  python-worker:
-    build:
-      context: ./greeter
-    expose:
-      - "8000"
-  event-generator:
-    build:
-      context: generator
-      dockerfile: Dockerfile
-    depends_on:
-      - kafka-broker
diff --git a/statefun-examples/statefun-python-greeter-example/generator/event-generator.py b/statefun-examples/statefun-python-greeter-example/generator/event-generator.py
deleted file mode 100644
index 385ef7d..0000000
--- a/statefun-examples/statefun-python-greeter-example/generator/event-generator.py
+++ /dev/null
@@ -1,104 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import signal
-import sys
-import time
-import threading
-
-import random
-
-from kafka.errors import NoBrokersAvailable
-
-from messages_pb2 import GreetRequest, GreetResponse
-
-from kafka import KafkaProducer
-from kafka import KafkaConsumer
-
-KAFKA_BROKER = "kafka-broker:9092"
-NAMES = ["Jerry", "George", "Elaine", "Kramer", "Newman", "Frank"]
-
-
-def random_requests():
-    """Generate infinite sequence of random GreetRequests."""
-    while True:
-        request = GreetRequest()
-        request.name = random.choice(NAMES)
-        yield request
-
-
-def produce():
-    if len(sys.argv) == 2:
-        delay_seconds = int(sys.argv[1])
-    else:
-        delay_seconds = 1
-    producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
-    for request in random_requests():
-        key = request.name.encode('utf-8')
-        val = request.SerializeToString()
-        producer.send(topic='names', key=key, value=val)
-        producer.flush()
-        time.sleep(delay_seconds)
-
-
-def consume():
-    consumer = KafkaConsumer(
-        'greetings',
-        bootstrap_servers=[KAFKA_BROKER],
-        auto_offset_reset='earliest',
-        group_id='event-gen')
-    for message in consumer:
-        response = GreetResponse()
-        response.ParseFromString(message.value)
-        print("%s:\t%s" % (response.name, response.greeting), flush=True)
-
-
-def handler(number, frame):
-    sys.exit(0)
-
-
-def safe_loop(fn):
-    while True:
-        try:
-            fn()
-        except SystemExit:
-            print("Good bye!")
-            return
-        except NoBrokersAvailable:
-            time.sleep(2)
-            continue
-        except Exception as e:
-            print(e)
-            return
-
-
-def main():
-    signal.signal(signal.SIGTERM, handler)
-
-    producer = threading.Thread(target=safe_loop, args=[produce])
-    producer.start()
-
-    consumer = threading.Thread(target=safe_loop, args=[consume])
-    consumer.start()
-
-    producer.join()
-    consumer.join()
-
-
-if __name__ == "__main__":
-    main()
diff --git a/statefun-examples/statefun-python-greeter-example/generator/messages_pb2.py b/statefun-examples/statefun-python-greeter-example/generator/messages_pb2.py
deleted file mode 100644
index bce4d24..0000000
--- a/statefun-examples/statefun-python-greeter-example/generator/messages_pb2.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# -*- coding: utf-8 -*-
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# source: messages.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
-  name='messages.proto',
-  package='example',
-  syntax='proto3',
-  serialized_options=None,
-  serialized_pb=_b('\n\x0emessages.proto\x12\x07\x65xample\"\x1c\n\x0cGreetRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"/\n\rGreetResponse\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08greeting\x18\x02 \x01(\t\"\x19\n\tSeenCount\x12\x0c\n\x04seen\x18\x01 \x01(\x03\x62\x06proto3')
-)
-
-
-
-
-_GREETREQUEST = _descriptor.Descriptor(
-  name='GreetRequest',
-  full_name='example.GreetRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetRequest.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=27,
-  serialized_end=55,
-)
-
-
-_GREETRESPONSE = _descriptor.Descriptor(
-  name='GreetResponse',
-  full_name='example.GreetResponse',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetResponse.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='greeting', full_name='example.GreetResponse.greeting', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=57,
-  serialized_end=104,
-)
-
-
-_SEENCOUNT = _descriptor.Descriptor(
-  name='SeenCount',
-  full_name='example.SeenCount',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='seen', full_name='example.SeenCount.seen', index=0,
-      number=1, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=106,
-  serialized_end=131,
-)
-
-DESCRIPTOR.message_types_by_name['GreetRequest'] = _GREETREQUEST
-DESCRIPTOR.message_types_by_name['GreetResponse'] = _GREETRESPONSE
-DESCRIPTOR.message_types_by_name['SeenCount'] = _SEENCOUNT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-GreetRequest = _reflection.GeneratedProtocolMessageType('GreetRequest', (_message.Message,), dict(
-  DESCRIPTOR = _GREETREQUEST,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetRequest)
-  ))
-_sym_db.RegisterMessage(GreetRequest)
-
-GreetResponse = _reflection.GeneratedProtocolMessageType('GreetResponse', (_message.Message,), dict(
-  DESCRIPTOR = _GREETRESPONSE,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetResponse)
-  ))
-_sym_db.RegisterMessage(GreetResponse)
-
-SeenCount = _reflection.GeneratedProtocolMessageType('SeenCount', (_message.Message,), dict(
-  DESCRIPTOR = _SEENCOUNT,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.SeenCount)
-  ))
-_sym_db.RegisterMessage(SeenCount)
-
-
-# @@protoc_insertion_point(module_scope)
diff --git a/statefun-examples/statefun-python-greeter-example/greeter/Dockerfile b/statefun-examples/statefun-python-greeter-example/greeter/Dockerfile
deleted file mode 100644
index d313fdd..0000000
--- a/statefun-examples/statefun-python-greeter-example/greeter/Dockerfile
+++ /dev/null
@@ -1,34 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM python:3.7-alpine
-
-RUN mkdir -p /app
-WORKDIR /app
-
-COPY apache_flink_statefun-snapshot-py3-none-any.whl /app
-RUN pip install apache_flink_statefun-snapshot-py3-none-any.whl
-
-COPY requirements.txt /app
-RUN pip install -r requirements.txt
-
-COPY greeter.py /app
-COPY messages_pb2.py /app
-
-EXPOSE 8000
-
-CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w 4", "greeter:app"]
-
diff --git a/statefun-examples/statefun-python-greeter-example/greeter/greeter.py b/statefun-examples/statefun-python-greeter-example/greeter/greeter.py
deleted file mode 100644
index 8f9c5bf..0000000
--- a/statefun-examples/statefun-python-greeter-example/greeter/greeter.py
+++ /dev/null
@@ -1,85 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from messages_pb2 import SeenCount, GreetRequest, GreetResponse
-
-from statefun import StatefulFunctions
-from statefun import StateSpec
-from statefun import RequestReplyHandler
-from statefun import kafka_egress_record
-
-functions = StatefulFunctions()
-
-
-@functions.bind(
-    typename="example/greeter",
-    states=[StateSpec('seen_count')])
-def greet(context, greet_request: GreetRequest):
-    state = context.state('seen_count').unpack(SeenCount)
-    if not state:
-        state = SeenCount()
-        state.seen = 1
-    else:
-        state.seen += 1
-    context.state('seen_count').pack(state)
-
-    response = compute_greeting(greet_request.name, state.seen)
-
-    egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
-    context.pack_and_send_egress("example/greets", egress_message)
-
-
-def compute_greeting(name, seen):
-    """
-    Compute a personalized greeting, based on the number of times this @name had been seen before.
-    """
-    templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
-    if seen < len(templates):
-        greeting = templates[seen] % name
-    else:
-        greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
-
-    response = GreetResponse()
-    response.name = name
-    response.greeting = greeting
-
-    return response
-
-
-handler = RequestReplyHandler(functions)
-
-#
-# Serve the endpoint
-#
-
-from flask import request
-from flask import make_response
-from flask import Flask
-
-app = Flask(__name__)
-
-
-@app.route('/statefun', methods=['POST'])
-def handle():
-    response_data = handler(request.data)
-    response = make_response(response_data)
-    response.headers.set('Content-Type', 'application/octet-stream')
-    return response
-
-
-if __name__ == "__main__":
-    app.run()
diff --git a/statefun-examples/statefun-python-greeter-example/greeter/messages.proto b/statefun-examples/statefun-python-greeter-example/greeter/messages.proto
deleted file mode 100644
index 3dc1a50..0000000
--- a/statefun-examples/statefun-python-greeter-example/greeter/messages.proto
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-syntax = "proto3";
-
-// protoc *.proto --python_out=.
-
-package example;
-
-// External request sent by a user who wants to be greeted
-message GreetRequest {
-    // The name of the user to greet
-    string name = 1;
-}
-
-// A customized response sent to the user
-message GreetResponse {
-    // The name of the user being greeted
-    string name = 1;
-    // The users customized greeting
-    string greeting = 2;
-}
-
-// An internal message used to store state
-message SeenCount {
-    // The number of times a users has been seen so far
-    int64 seen = 1;
-}
-
-
diff --git a/statefun-examples/statefun-python-greeter-example/greeter/messages_pb2.py b/statefun-examples/statefun-python-greeter-example/greeter/messages_pb2.py
deleted file mode 100644
index bce4d24..0000000
--- a/statefun-examples/statefun-python-greeter-example/greeter/messages_pb2.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# -*- coding: utf-8 -*-
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# source: messages.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
-  name='messages.proto',
-  package='example',
-  syntax='proto3',
-  serialized_options=None,
-  serialized_pb=_b('\n\x0emessages.proto\x12\x07\x65xample\"\x1c\n\x0cGreetRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"/\n\rGreetResponse\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08greeting\x18\x02 \x01(\t\"\x19\n\tSeenCount\x12\x0c\n\x04seen\x18\x01 \x01(\x03\x62\x06proto3')
-)
-
-
-
-
-_GREETREQUEST = _descriptor.Descriptor(
-  name='GreetRequest',
-  full_name='example.GreetRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetRequest.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=27,
-  serialized_end=55,
-)
-
-
-_GREETRESPONSE = _descriptor.Descriptor(
-  name='GreetResponse',
-  full_name='example.GreetResponse',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='name', full_name='example.GreetResponse.name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='greeting', full_name='example.GreetResponse.greeting', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=57,
-  serialized_end=104,
-)
-
-
-_SEENCOUNT = _descriptor.Descriptor(
-  name='SeenCount',
-  full_name='example.SeenCount',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='seen', full_name='example.SeenCount.seen', index=0,
-      number=1, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=106,
-  serialized_end=131,
-)
-
-DESCRIPTOR.message_types_by_name['GreetRequest'] = _GREETREQUEST
-DESCRIPTOR.message_types_by_name['GreetResponse'] = _GREETRESPONSE
-DESCRIPTOR.message_types_by_name['SeenCount'] = _SEENCOUNT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-GreetRequest = _reflection.GeneratedProtocolMessageType('GreetRequest', (_message.Message,), dict(
-  DESCRIPTOR = _GREETREQUEST,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetRequest)
-  ))
-_sym_db.RegisterMessage(GreetRequest)
-
-GreetResponse = _reflection.GeneratedProtocolMessageType('GreetResponse', (_message.Message,), dict(
-  DESCRIPTOR = _GREETRESPONSE,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.GreetResponse)
-  ))
-_sym_db.RegisterMessage(GreetResponse)
-
-SeenCount = _reflection.GeneratedProtocolMessageType('SeenCount', (_message.Message,), dict(
-  DESCRIPTOR = _SEENCOUNT,
-  __module__ = 'messages_pb2'
-  # @@protoc_insertion_point(class_scope:example.SeenCount)
-  ))
-_sym_db.RegisterMessage(SeenCount)
-
-
-# @@protoc_insertion_point(module_scope)
diff --git a/statefun-examples/statefun-python-greeter-example/greeter/requirements.txt b/statefun-examples/statefun-python-greeter-example/greeter/requirements.txt
deleted file mode 100644
index fcbc07c..0000000
--- a/statefun-examples/statefun-python-greeter-example/greeter/requirements.txt
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-apache-flink-statefun
-flask==1.1.1
-gunicorn==20.0.4
-
-
-
diff --git a/statefun-examples/statefun-python-greeter-example/module.yaml b/statefun-examples/statefun-python-greeter-example/module.yaml
deleted file mode 100644
index f4b197d..0000000
--- a/statefun-examples/statefun-python-greeter-example/module.yaml
+++ /dev/null
@@ -1,55 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-version: "3.0"
-module:
-  meta:
-    type: remote
-  spec:
-    endpoints:
-      - endpoint:
-          meta:
-            kind: http
-          spec:
-            typename:
-              namespace: example
-              type: greeter
-            urlPathTemplate: http://python-worker:8000/statefun
-            maxNumBatchRequests: 500
-            timeouts:
-              call: 2min
-    ingresses:
-      - ingress:
-          meta:
-            type: statefun.kafka.io/routable-protobuf-ingress
-            id: example/names
-          spec:
-            address: kafka-broker:9092
-            consumerGroupId: my-group-id
-            topics:
-              - topic: names
-                typeUrl: com.googleapis/example.GreetRequest
-                targets:
-                  - example/greeter
-    egresses:
-      - egress:
-          meta:
-            type: statefun.kafka.io/generic-egress
-            id: example/greets
-          spec:
-            address: kafka-broker:9092
-            deliverySemantic:
-              type: exactly-once
-              transactionTimeoutMillis: 100000
-
diff --git a/statefun-python-sdk/README.md b/statefun-python-sdk/README.md
index 03a48c0..286060a 100755
--- a/statefun-python-sdk/README.md
+++ b/statefun-python-sdk/README.md
@@ -40,44 +40,40 @@
 #### Define and Declare a Function
 
 ```
-from statefun import StatefulFunctions, StateSpec
+from statefun import *
 
 functions = StatefulFunctions()
 
-@functions.bind("demo/greeter")
-def greet(context, message: LoginEvent):
-    print("Hey " + message.user_name)
+@functions.bind(typename="demo/greeter")
+def greet(context, message):
+    print(f"Hey {message.as_string()}!")
 ```
 
-This code declares a function with of type `FunctionType("demo", "greeter")` and binds it to the instance.
+This code declares a function with of type `demo/greeter` and binds it to the instance.
 
 #### Registering and accessing persisted state
 
 You can register persistent state that will be managed by the Stateful Functions workers
-for state consistency and fault-tolerance. The state values could be absent (`None` or a `google.protobuf.Any`) and
-they can be generally obtained via the context parameter:
+for state consistency and fault-tolerance. Values can be generally obtained via the context parameter:
 
 ```
-from statefun import StatefulFunctions, StateSpec
+from statefun import * 
 
 functions = StatefulFunctions()
 
 @functions.bind(
     typename="demo/greeter",
-    states=[StateSpec('session')])
-def greet(context, message: LoginEvent):
-    session = context['session']
-    if not session:
-       session = start_session(message)
-       context['session'] = session
-    ...
+    specs=[ValueSpec(name="seen", type=IntType)])
+def greet(context, message):
+    seen = context.storage.seen or 0
+    seen += 1
+    context.storage.seen = seen
+    print(f"Hey {message.as_string()} I've seen you {seen} times")
 ```
 
 #### Expose with a Request Reply Handler
 
 ```
-from statefun import RequestReplyHandler
-
 handler = RequestReplyHandler(functions)
 ```
 
@@ -88,7 +84,7 @@
 ``` 
 @app.route('/statefun', methods=['POST'])
 def handle():
-    response_data = handler(request.data)
+    response_data = handler.handle_sync(request.data)
     response = make_response(response_data)
     response.headers.set('Content-Type', 'application/octet-stream')
     return response
diff --git a/statefun-examples/statefun-python-greeter-example/generator/Dockerfile b/statefun-python-sdk/generte-dev-protos.sh
old mode 100644
new mode 100755
similarity index 71%
rename from statefun-examples/statefun-python-greeter-example/generator/Dockerfile
rename to statefun-python-sdk/generte-dev-protos.sh
index c3cf87c..b5f72e4
--- a/statefun-examples/statefun-python-greeter-example/generator/Dockerfile
+++ b/statefun-python-sdk/generte-dev-protos.sh
@@ -1,3 +1,4 @@
+#!/bin/bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -14,17 +15,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM python:3.7-alpine
 
-RUN mkdir -p /app
-WORKDIR /app
-
-RUN pip install protobuf
-RUN pip install kafka-python
-
-COPY event-generator.py /app
-COPY messages_pb2.py /app
-
-CMD ["python", "/app/event-generator.py"]
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+SDK_PROTOS_DIR="${BASE_DIR}/../statefun-sdk-protos/src/main/protobuf"
 
 
+cd ${BASE_DIR}
+find ${SDK_PROTOS_DIR} -type f -name "*proto" -exec cp {} . \;
+protoc *proto --python_out=statefun/
+rm *proto
+cd ${CURR_DIR}
diff --git a/statefun-python-sdk/setup.py b/statefun-python-sdk/setup.py
index bd91a7b..efe73de 100644
--- a/statefun-python-sdk/setup.py
+++ b/statefun-python-sdk/setup.py
@@ -39,7 +39,7 @@
     long_description_content_type='text/markdown',
     install_requires=['protobuf>=3.11.3,<4.0.0'],
     tests_require=['pytest'],
-    python_requires='>=3.5',
+    python_requires='>=3.8',
     classifiers=[
         'License :: OSI Approved :: Apache Software License',
         'Programming Language :: Python :: 3.5',
diff --git a/statefun-python-sdk/statefun/__init__.py b/statefun-python-sdk/statefun/__init__.py
index 3c04a8e..52651dc 100644
--- a/statefun-python-sdk/statefun/__init__.py
+++ b/statefun-python-sdk/statefun/__init__.py
@@ -16,13 +16,28 @@
 # limitations under the License.
 ################################################################################
 
-from statefun.core import StatefulFunctions
-from statefun.core import StateSpec
-from statefun.core import AfterInvoke, AfterWrite
-from statefun.core import StateRegistrationError
+# type API
+from statefun.core import TypeSerializer, Type, simple_type
+from statefun.core import ValueSpec
+from statefun.core import SdkAddress
 
-from statefun.request_reply import RequestReplyHandler
-from statefun.request_reply import AsyncRequestReplyHandler
+# wrapper types
+from statefun.wrapper_types import BoolType, IntType, FloatType, DoubleType, LongType, StringType
 
-from statefun.core import kafka_egress_record
-from statefun.core import kinesis_egress_record
\ No newline at end of file
+# messaging
+from statefun.messages import Message, EgressMessage, message_builder, egress_message_builder
+
+# egress io
+from statefun.egress_io import kafka_egress_message, kinesis_egress_message
+
+# context
+from statefun.context import Context
+
+# statefun builder
+from statefun.statefun_builder import StatefulFunctions
+
+# request reply protocol handler
+from statefun.request_reply_v3 import RequestReplyHandler
+
+# utilits
+from statefun.utils import make_protobuf_type
diff --git a/statefun-python-sdk/statefun/context.py b/statefun-python-sdk/statefun/context.py
new file mode 100644
index 0000000..b1692a5
--- /dev/null
+++ b/statefun-python-sdk/statefun/context.py
@@ -0,0 +1,79 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+import typing
+from datetime import timedelta
+
+from statefun.core import SdkAddress
+from statefun.messages import Message, EgressMessage
+
+
+class Context(abc.ABC):
+
+    __slots__ = ()
+
+    @property
+    @abc.abstractmethod
+    def address(self) -> SdkAddress:
+        """
+
+        :return: the address of the currently executing function. the address is of the form (typename, id)
+        """
+        pass
+
+    @property
+    @abc.abstractmethod
+    def storage(self):
+        """
+
+        :return: the address scoped storage.
+        """
+        pass
+
+    @property
+    @abc.abstractmethod
+    def caller(self) -> typing.Union[None, SdkAddress]:
+        """
+
+        :return: the address of the caller or None if this function was triggered by the ingress.
+        """
+        pass
+
+    def send(self, message: Message):
+        """
+        Send a message to a function.
+
+        :param message: a message to send.
+        """
+        pass
+
+    def send_after(self, duration: timedelta, message: Message):
+        """
+        Send a message to a target function after a specified delay.
+
+        :param duration: the amount of time to wait before sending this message out.
+        :param message: the message to send.
+        """
+
+    def send_egress(self, message: EgressMessage):
+        """
+        Send a message to an egress.
+
+        :param message: the EgressMessage to send.
+        """
+        pass
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 8e342d0..31cd46c 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -15,160 +15,134 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
-from google.protobuf.any_pb2 import Any
-import inspect
-
-from enum import Enum
-from typing import List
+import abc
+from dataclasses import dataclass
 from datetime import timedelta
-
-from statefun.kafka_egress_pb2 import KafkaProducerRecord
-from statefun.kinesis_egress_pb2 import KinesisEgressRecord
-
-class SdkAddress(object):
-    def __init__(self, namespace, type, identity):
-        self.namespace = namespace
-        self.type = type
-        self.identity = identity
-
-    def __repr__(self):
-        return "%s/%s/%s" % (self.namespace, self.type, self.identity)
-
-    def typename(self):
-        return "%s/%s" % (self.namespace, self.type)
+from keyword import iskeyword, kwlist
 
 
-class AnyStateHandle(object):
-    def __init__(self, any_bytes):
-        self.any = None
-        self.value_bytes = any_bytes
-        self.modified = False
-        self.deleted = False
-
-    #
-    # TODO This should reflect the actual type URL.
-    # TODO we can support that only after reworking the SDK.
-    #
-    def typename(self):
-        return "type.googleapis.com/google.protobuf.Any"
-
-    def bytes(self):
-        if self.deleted:
-            raise AssertionError("can not obtain the bytes of a delete handle")
-        if self.modified:
-            return self.value.SerializeToString()
-        else:
-            return self.value_bytes
-
-    def unpack(self, into_class):
-        if self.value:
-            into_ref = into_class()
-            self.value.Unpack(into_ref)
-            return into_ref
-        else:
-            return None
-
-    def pack(self, message):
-        any = Any()
-        any.Pack(message)
-        self.value = any
-
-    @property
-    def value(self):
-        """returns the current value of this state"""
-        if self.deleted:
-            return None
-        if self.any:
-            return self.any
-        if not self.value_bytes:
-            return None
-        self.any = Any()
-        self.any.ParseFromString(self.value_bytes)
-        return self.any
-
-    @value.setter
-    def value(self, any):
-        """updates this value to the supplied value, and also marks this state as modified"""
-        self.any = any
-        self.value_bytes = None
-        self.modified = True
-        self.deleted = False
-
-    @value.deleter
-    def value(self):
-        """marks this state as deleted and also as modified"""
-        self.any = None
-        self.value_bytes = None
-        self.deleted = True
-        self.modified = True
+@dataclass(repr=True, eq=True, order=False, frozen=True)
+class SdkAddress:
+    namespace: str
+    name: str
+    id: str
+    typename: str
 
 
-class Expiration(object):
-    class Mode(Enum):
-        AFTER_INVOKE = 0
-        AFTER_WRITE = 1
+class TypeSerializer(metaclass=abc.ABCMeta):
+    __slots__ = ()
 
-    def __init__(self, expire_after: timedelta, expire_mode: Mode=Mode.AFTER_INVOKE):
-        self.expire_mode = expire_mode
-        self.expire_after_millis = self.total_milliseconds(expire_after)
-        if self.expire_after_millis <= 0:
-            raise ValueError("expire_after_millis must be a positive number.")
+    """
+    A base class for a TypeSerializer. A TypeSerializer is responsible of serialising and deserializing a specific
+    value type.
+    """
 
-    @staticmethod
-    def total_milliseconds(expire_after: timedelta):
-        return int(expire_after.total_seconds() * 1000.0)
+    @abc.abstractmethod
+    def serialize(self, value) -> bytes:
+        """
+        Serialize the given value to bytes.
+
+        :param value: the value to serialize.
+        :return: a byte representation of the given value.
+        """
+        pass
+
+    @abc.abstractmethod
+    def deserialize(self, byte: bytes):
+        """
+        Deserialize a value from the given byte representation.
+
+        :param byte: the bytes that represent the value to deseralize.
+        :return: the deserialized value.
+        """
+        pass
 
 
-class AfterInvoke(Expiration):
-    def __init__(self, expire_after: timedelta):
-        super().__init__(expire_after, expire_mode=Expiration.Mode.AFTER_INVOKE)
+class Type(metaclass=abc.ABCMeta):
+    __slots__ = ("typename",)
+
+    """
+    A base representation of a Stateful Function value type.
+    """
+
+    def __init__(self, typename: str):
+        """
+        :param typename: a TypeName represented as a string of the form <namespace>/<name> of this type.
+        """
+        if not typename:
+            raise ValueError("typename can not be missing")
+        self.typename = typename
+
+    @abc.abstractmethod
+    def serializer(self) -> TypeSerializer:
+        """
+        :return: a serializer for this type.
+        """
+        pass
 
 
-class AfterWrite(Expiration):
-    def __init__(self, expire_after: timedelta):
-        super().__init__(expire_after, expire_mode=Expiration.Mode.AFTER_WRITE)
+class ValueSpec(object):
+    __slots__ = ("name", "type", "duration", "after_call", "after_write")
 
+    def __init__(self,
+                 name: str,
+                 type: Type,
+                 expire_after_call: timedelta = None,
+                 expire_after_write: timedelta = None):
+        """
+        ValueSpec a specification of a persisted value.
 
-class StateSpec(object):
-    def __init__(self, name, expire_after: Expiration=None):
-        self.name = name
-        self.expiration = expire_after
+        :param name: a user defined state name, used to represent this value. A name must be lower case, alphanumeric string
+        without spaces, that starts with either a letter or an underscore (_)
+        :param type: the value's Type. (see
+        statefun.Type) :param expire_after_call: expire (remove) this value if a call to this function hasn't been
+        made for the given duration.
+        :param expire_after_write: expire this value if it wasn't written to for the given duration.
+        """
         if not name:
-            raise ValueError("state name must be provided")
-
-
-class StateRegistrationError(Exception):
-    pass
-
-
-class StatefulFunction(object):
-    def __init__(self, fun, state_specs: List[StateSpec], expected_messages=None):
-        self.known_messages = expected_messages[:] if expected_messages else None
-        self.func = fun
-        if not fun:
-            raise ValueError("function code is missing.")
-        self.registered_state_specs = {}
-        if state_specs:
-            for state_spec in state_specs:
-                if state_spec.name in self.registered_state_specs:
-                    raise StateRegistrationError("duplicate registered state name: " + state_spec.name)
-                self.registered_state_specs[state_spec.name] = state_spec
-
-    def unpack_any(self, any: Any):
-        if self.known_messages is None:
-            return None
-        for cls in self.known_messages:
-            if any.Is(cls.DESCRIPTOR):
-                instance = cls()
-                any.Unpack(instance)
-                return instance
-
-        raise ValueError("Unknown message type " + any.type_url)
+            raise ValueError("name can not be missing.")
+        if not name.isidentifier():
+            raise ValueError(
+                f"invalid name {name}. A spec name can only contains alphanumeric letters (a-z) and (0-9), "
+                f"or underscores ( "
+                f"_). A valid identifier cannot start with a number, or contain any spaces.")
+        if iskeyword(name):
+            forbidden = '\n'.join(kwlist)
+            raise ValueError(
+                f"invalid spec name {name} (Python SDK specifically). since {name} will result as an attribute on "
+                f"context.store.\n"
+                f"The following names are forbidden:\n {forbidden}")
+        if not name.islower():
+            raise ValueError(f"Only lower case names are allowed, {name} is given.")
+        self.name = name
+        if not type:
+            raise ValueError("type can not be missing.")
+        if not isinstance(type, Type):
+            raise TypeError("type is not a StateFun type.")
+        self.type = type
+        if expire_after_call and expire_after_write:
+            # both can not be set.
+            raise ValueError("Either expire_after_call or expire_after_write can be set, but not both.")
+        if expire_after_call:
+            self.duration = int(expire_after_call.total_seconds() * 1000.0)
+            self.after_call = True
+            self.after_write = False
+        elif expire_after_write:
+            self.duration = int(expire_after_write.total_seconds() * 1000.0)
+            self.after_call = False
+            self.after_write = True
+        else:
+            self.duration = 0
+            self.after_call = False
+            self.after_write = False
 
 
 def parse_typename(typename):
-    """parses a string of type namespace/type into a tuple of (namespace, type)"""
+    """
+    Parse a TypeName string into a namespace, type pair.
+    :param typename: a string of the form <namespace>/<type>
+    :return: a tuple of a namespace type.
+    """
     if typename is None:
         raise ValueError("function type must be provided")
     idx = typename.rfind("/")
@@ -183,107 +157,54 @@
     return namespace, type
 
 
-def deduce_protobuf_types(fn):
+def simple_type(typename=None, serialize_fn=None, deserialize_fn=None) -> Type:
     """
-    Try to extract the class names that are attached as the typing annotation.
+    Create a user defined Type, simply by providing two functions. One for serializing a value to bytes, and one for
+    deserializing the value from bytes.
+    For example:
 
-    :param fn: the function with the annotated parameters.
-    :return: a list of classes or None.
+    import json
+    tpe = simple_type("org.foo.bar/User", serialize_fn=json.dumps, deserialize_fn=json.loads)
+
+    this defines a StateFun type (Type) of the typename org.foo.bar/User and it is a JSON object.
+
+    :param typename: this type's TypeName.
+    :param serialize_fn: a function that is able to serialize values of this type.
+    :param deserialize_fn: a function that is able to deserialize bytes into values of this type.
+    :return: a Type definition.
     """
-    spec = inspect.getfullargspec(fn)
-    if not spec:
-        return None
-    if len(spec.args) != 2:
-        raise TypeError("A stateful function must have two arguments: a context and a message. but got ", spec.args)
-    message_arg_name = spec.args[1]  # has to be the second element
-    if message_arg_name not in spec.annotations:
-        return None
-    message_annotation = spec.annotations[message_arg_name]
-    if inspect.isclass(message_annotation):
-        return [message_annotation]
-    try:
-        # it is not a class, then it is only allowed to be
-        # typing.SpecialForm('Union')
-        return list(message_annotation.__args__)
-    except Exception:
-        return None
+    return SimpleType(typename, serialize_fn, deserialize_fn)
 
 
-class StatefulFunctions:
-    def __init__(self):
-        self.functions = {}
-
-    def register(self, typename: str, fun, state_specs: List[StateSpec]=None):
-        """registers a StatefulFunction function instance, under the given namespace with the given function type. """
-        if fun is None:
-            raise ValueError("function instance must be provided")
-        namespace, type = parse_typename(typename)
-        expected_messages = deduce_protobuf_types(fun)
-        self.functions[(namespace, type)] = StatefulFunction(fun, state_specs, expected_messages)
-
-    def bind(self, typename, states: List[StateSpec]=None):
-        """wraps a StatefulFunction instance with a given namespace and type.
-           for example:
-            s = StateFun()
-
-            @s.define("com.foo.bar/greeter")
-            def greeter(context, message):
-                print("Hi there")
-
-            This would add an invokable stateful function that can accept messages
-            sent to "com.foo.bar/greeter".
-         """
-
-        def wrapper(function):
-            self.register(typename, function, states)
-            return function
-
-        return wrapper
-
-    def for_type(self, namespace, type):
-        return self.functions[(namespace, type)]
+# ----------------------------------------------------------------------------------------------------------
+# Internal
+# ----------------------------------------------------------------------------------------------------------
 
 
-def kafka_egress_record(topic: str, value, key: str = None):
-    """
-    Build a ProtobufMessage that can be emitted to a Kafka generic egress.
+class SimpleType(Type):
+    __slots__ = ("typename", "_ser")
 
-    :param topic: The Kafka destination topic for that record
-    :param key: the utf8 encoded string key to produce (can be empty)
-    :param value: the Protobuf value to produce
-    :return: A Protobuf message representing the record to be produced via the Kafka generic egress.
-    """
-    if not topic:
-        raise ValueError("A destination Kafka topic is missing")
-    if not value:
-        raise ValueError("Missing value")
-    record = KafkaProducerRecord()
-    record.topic = topic
-    record.value_bytes = value.SerializeToString()
-    if key is not None:
-        record.key = key
-    return record
+    def __init__(self, typename, serialize_fn, deserialize_fn):
+        super().__init__(typename)
+        if not serialize_fn:
+            raise ValueError("serialize_fn is missing")
+        if not deserialize_fn:
+            raise ValueError("deserialize_fn is missing")
+        self._ser = UserTypeSerializer(serialize_fn, deserialize_fn)
 
-def kinesis_egress_record(stream: str, value, partition_key: str, explicit_hash_key: str = None):
-    """
-    Build a ProtobufMessage that can be emitted to a Kinesis generic egress.
+    def serializer(self) -> TypeSerializer:
+        return self._ser
 
-    :param stream: The AWS Kinesis destination stream for that record
-    :param partition_key: the utf8 encoded string partition key to use
-    :param value: the Protobuf value to produce
-    :param explicit_hash_key: a utf8 encoded string explicit hash key to use (can be empty)
-    :return: A Protobuf message representing the record to be produced to AWS Kinesis via the Kinesis generic egress.
-    """
-    if not stream:
-        raise ValueError("Missing destination Kinesis stream")
-    if not value:
-        raise ValueError("Missing value")
-    if not partition_key:
-        raise ValueError("Missing partition key")
-    record = KinesisEgressRecord()
-    record.stream = stream
-    record.value_bytes = value.SerializeToString()
-    record.partition_key = partition_key
-    if explicit_hash_key is not None:
-        record.explicit_hash_key = explicit_hash_key
-    return record
+
+class UserTypeSerializer(TypeSerializer):
+    __slots__ = ("serialize_fn", "deserialize_fn")
+
+    def __init__(self, serialize_fn, deserialize_fn):
+        self.serialize_fn = serialize_fn
+        self.deserialize_fn = deserialize_fn
+
+    def serialize(self, value):
+        return self.serialize_fn(value)
+
+    def deserialize(self, byte_string):
+        return self.deserialize_fn(byte_string)
diff --git a/statefun-python-sdk/statefun/egress_io.py b/statefun-python-sdk/statefun/egress_io.py
new file mode 100644
index 0000000..b6ec1f2
--- /dev/null
+++ b/statefun-python-sdk/statefun/egress_io.py
@@ -0,0 +1,133 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import struct
+import typing
+
+from statefun.core import Type
+from statefun.messages import EgressMessage
+
+from statefun.kafka_egress_pb2 import KafkaProducerRecord
+from statefun.kinesis_egress_pb2 import KinesisEgressRecord
+from statefun.request_reply_pb2 import TypedValue
+
+
+def kafka_egress_message(typename: str,
+                         topic: str,
+                         value: typing.Union[str, bytes, bytearray, int, float],
+                         value_type: Type = None,
+                         key: str = None):
+    """
+    Build a message that can be emitted to a Kafka generic egress.
+
+    If a value_type is provided, then @value will be serialized according to the
+    provided value_type's serializer. Otherwise we will try to convert @value to bytes
+    if it is one of:
+    - utf-8 string
+    - bytes
+    - bytearray
+    - an int (as defined by Kafka's serialization format)
+    - float (as defined by Kafka's serialization format)
+
+    :param typename: the target egress to emit to (as defined in the module.yaml)
+    :param topic: The Kafka destination topic for that record
+    :param key: the utf8 encoded string key to produce (can be empty)
+    :param value: the value to produce
+    :param value_type: an optional hint to this value type.
+    :return: A Protobuf message representing the record to be produced via the Kafka generic egress.
+    """
+    if not topic:
+        raise ValueError("A destination Kafka topic is missing")
+    if value is None:
+        raise ValueError("Missing value")
+    record = KafkaProducerRecord()
+    record.topic = topic
+    if value_type:
+        ser = value_type.serializer()
+        record.value_bytes = ser.serialize(value)
+    elif isinstance(value, str):
+        record.value_bytes = bytes(value, 'utf-8')
+    elif isinstance(value, (bytes, bytearray)):
+        record.value_bytes = bytes(value)
+    elif isinstance(value, int):
+        # see:
+        # IntegerSerializer Javadoc
+        # https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html
+        record.value_bytes = struct.pack('>i', value)
+    elif isinstance(value, float):
+        # see:
+        # DoubleDeserializer Javadoc
+        # https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html
+        record.value_bytes = struct.pack('>d', value)
+    else:
+        raise TypeError("Unable to convert value to bytes.")
+    if key is not None:
+        record.key = key
+
+    typed_value = TypedValue()
+    typed_value.typename = "type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord"
+    typed_value.has_value = True
+    typed_value.value = record.SerializeToString()
+
+    return EgressMessage(typename, typed_value)
+
+
+def kinesis_egress_message(typename: str,
+                           stream: str,
+                           value: typing.Union[str, bytes, bytearray],
+                           partition_key: str,
+                           value_type: typing.Union[None, Type] = None,
+                           explicit_hash_key: str = None):
+    """
+    Build a message that can be emitted to a Kinesis generic egress.
+
+    :param typename: the typename as specified in module.yaml
+    :param stream: The AWS Kinesis destination stream for that record
+    :param partition_key: the utf8 encoded string partition key to use
+    :param value: the value to produce
+    :param explicit_hash_key: a utf8 encoded string explicit hash key to use (can be empty)
+    :param value_type: an optional hint to this value type
+    :return: A Protobuf message representing the record to be produced to AWS Kinesis via the Kinesis generic egress.
+    """
+    if not stream:
+        raise ValueError("Missing destination Kinesis stream")
+    if value is None:
+        raise ValueError("Missing value")
+    if partition_key is None:
+        raise ValueError("Missing partition key")
+    record = KinesisEgressRecord()
+    record.stream = stream
+    if value_type:
+        ser = value_type.serializer()
+        record.value_bytes = ser.serialize(value)
+    elif isinstance(value, str):
+        record.value_bytes = bytes(value, 'utf-8')
+    elif isinstance(value, (bytes, bytearray)):
+        record.value_bytes = bytes(value)
+    else:
+        raise TypeError("Unable to convert value to bytes.")
+    record.partition_key = partition_key
+    if explicit_hash_key is not None:
+        record.explicit_hash_key = explicit_hash_key
+
+    typed_value = TypedValue()
+    typed_value.typename = "type.googleapis.com/io.statefun.sdk.egress.KinesisEgressRecord"
+    typed_value.has_value = True
+    typed_value.value = record.SerializeToString()
+
+    return EgressMessage(typename, typed_value)
diff --git a/statefun-python-sdk/statefun/messages.py b/statefun-python-sdk/statefun/messages.py
new file mode 100644
index 0000000..cda78ff
--- /dev/null
+++ b/statefun-python-sdk/statefun/messages.py
@@ -0,0 +1,190 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import typing
+
+import statefun.wrapper_types as wrapper_types
+from statefun.core import Type
+from statefun.request_reply_pb2 import TypedValue
+from statefun.utils import to_typed_value
+
+
+class Message(object):
+    __slots__ = ("target_typename", "target_id", "typed_value")
+
+    def __init__(self, target_typename: str, target_id: str, typed_value: TypedValue):
+        """
+        A Stateful Functions Message.
+
+        :param target_typename: The TypeName represented as a string of the form <namespace>/<name> of the
+        target function.
+        :param target_id: The id of the target function
+        :param typed_value: The internal protobuf representation of the typed_value.
+        """
+        if not target_typename:
+            raise ValueError("target_typename can not be missing")
+        if not target_id:
+            raise ValueError("target_id can not be missing")
+        if not typed_value:
+            raise ValueError("typed_value can not be missing")
+        self.target_typename = target_typename
+        self.target_id = target_id
+        self.typed_value = typed_value
+
+    def is_int(self):
+        return self.is_type(wrapper_types.IntType)
+
+    def as_int(self):
+        return self.as_type(wrapper_types.IntType)
+
+    def is_bool(self):
+        return self.is_type(wrapper_types.BoolType)
+
+    def as_bool(self) -> typing.Optional[bool]:
+        return self.as_type(wrapper_types.BoolType)
+
+    def is_long(self):
+        return self.is_type(wrapper_types.LongType)
+
+    def as_long(self) -> typing.Optional[int]:
+        return self.as_type(wrapper_types.LongType)
+
+    def is_string(self):
+        return self.is_type(wrapper_types.StringType)
+
+    def as_string(self) -> typing.Optional[str]:
+        return self.as_type(wrapper_types.StringType)
+
+    def is_float(self):
+        return self.is_type(wrapper_types.FloatType)
+
+    def as_float(self) -> typing.Optional[float]:
+        return self.as_type(wrapper_types.FloatType)
+
+    def is_double(self):
+        return self.is_type(wrapper_types.DoubleType)
+
+    def as_double(self) -> typing.Optional[float]:
+        return self.as_type(wrapper_types.DoubleType)
+
+    def is_type(self, tpe: Type) -> bool:
+        return self.typed_value.typename == tpe.typename
+
+    def value_typename(self) -> str:
+        return self.typed_value.typename
+
+    def raw_value(self) -> typing.Optional[bytes]:
+        tv = self.typed_value
+        return tv.value if tv.has_value else None
+
+    def as_type(self, tpe: Type) -> typing.Optional[bytes]:
+        tv = self.typed_value
+        if tv.has_value:
+            serializer = tpe.serializer()
+            return serializer.deserialize(tv.value)
+        else:
+            return None
+
+
+class EgressMessage(object):
+    __slots__ = ("typename", "typed_value")
+
+    def __init__(self, typename: str, typed_value: TypedValue):
+        if not typename:
+            raise ValueError("typename is missing")
+        if not typed_value:
+            raise ValueError("value is missing")
+        self.typename = typename
+        self.typed_value = typed_value
+
+    def value_typename(self) -> str:
+        return self.typed_value.typename
+
+    def raw_value(self) -> typing.Optional[bytes]:
+        tv = self.typed_value
+        if tv.has_value:
+            return tv.value
+        else:
+            return None
+
+
+PRIMITIVE_GETTERS = {"int_value": wrapper_types.IntType,
+                     "float_value": wrapper_types.FloatType,
+                     "long_value": wrapper_types.LongType,
+                     "str_value": wrapper_types.StringType,
+                     "double_value": wrapper_types.DoubleType,
+                     "bool_value": wrapper_types.BoolType}
+
+
+def message_builder(target_typename: str, target_id: str, **kwargs) -> Message:
+    """
+    Build a Message that can be sent to any other function.
+    :param target_typename: The TypeName represented as a string of the form <namespace>/<name> of the
+           target function.
+    :param target_id: The id of the target function
+    :param kwargs: This specify the value type to attach to this message. The following arguments are supported:
+                    int_value=<an int>,
+                    float_value=<a float>
+                    long_value=<a signed 64 bit integer>
+                    str_value=<str>
+                    double_value=<double>
+                    bool_value=<bool>
+                    ...
+                    value=<arbitrary value>, value_type=<a StateFun Type for this value>
+    :return: A Message object, that can be sent.
+    """
+    if len(kwargs) == 2:
+        value, value_type = kwargs["value"], kwargs["value_type"]
+    elif len(kwargs) == 1:
+        # expecting: <type>_value : value
+        # for example one of the following:
+        #              int_value=1
+        #              str_value="hello world"
+        #              long_value= 5511
+        type_keyword, value = next(iter(kwargs.items()))
+        value_type = PRIMITIVE_GETTERS.get(type_keyword)
+    else:
+        raise TypeError(f"Wrong number of value keywords given: {kwargs}, there must be exactly one of:"
+                        f"\nint_value=.."
+                        f"\nfloat_value.."
+                        f"\netc'"
+                        f"\nor:"
+                        f"\nvalue=.. ,value_type=.. ")
+    if value is None:
+        raise ValueError("value can not be missing")
+    if not value_type:
+        raise ValueError(
+            "Could not deduce the value type, please specify the type explicitly. via passing: value=<the value>, "
+            "value_type=<the type>")
+    typed_value = to_typed_value(type=value_type, value=value)
+    return Message(target_typename=target_typename, target_id=target_id, typed_value=typed_value)
+
+
+def egress_message_builder(target_typename: str, value: typing.Any, value_type: Type):
+    """
+    Create a generic egress record.
+
+    To use Kafka specific egress please use kafka_egress_message(), and for Kinesis please use
+    kinesis_egress_message().
+    """
+    if not target_typename:
+        raise ValueError("target typename is missing")
+    if value is None:
+        raise ValueError("value can not be missing")
+    typed_value = to_typed_value(type=value_type, value=value)
+    return EgressMessage(typename=target_typename, typed_value=typed_value)
diff --git a/statefun-python-sdk/statefun/request_reply.py b/statefun-python-sdk/statefun/request_reply.py
deleted file mode 100644
index f58e6d7..0000000
--- a/statefun-python-sdk/statefun/request_reply.py
+++ /dev/null
@@ -1,381 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from datetime import timedelta
-
-from google.protobuf.any_pb2 import Any
-
-from statefun.core import SdkAddress
-from statefun.core import Expiration
-from statefun.core import parse_typename
-from statefun.core import StateRegistrationError
-
-# generated function protocol
-from statefun.request_reply_pb2 import FromFunction
-from statefun.request_reply_pb2 import ToFunction
-from statefun.typed_value_utils import to_proto_any, from_proto_any, to_proto_any_state, from_proto_any_state
-
-class InvocationContext:
-    def __init__(self, functions):
-        self.functions = functions
-        self.missing_state_specs = None
-        self.batch = None
-        self.context = None
-        self.target_function = None
-
-    def setup(self, request_bytes):
-        to_function = ToFunction()
-        to_function.ParseFromString(request_bytes)
-        #
-        # setup
-        #
-        target_address = to_function.invocation.target
-        target_function = self.functions.for_type(target_address.namespace, target_address.type)
-        if target_function is None:
-            raise ValueError("Unable to find a function of type ", target_function)
-
-        # for each state spec defined in target function
-        #    if state name is in request -> add to Batch Context
-        #    if state name is not in request -> add to missing_state_specs
-        provided_state_values = self.provided_state_values(to_function)
-        missing_state_specs = []
-        resolved_state_values = {}
-        for state_name, state_spec in target_function.registered_state_specs.items():
-            if state_name in provided_state_values:
-                resolved_state_values[state_name] = provided_state_values[state_name]
-            else:
-                missing_state_specs.append(state_spec)
-
-        self.batch = to_function.invocation.invocations
-        self.context = BatchContext(target_address, resolved_state_values)
-        self.target_function = target_function
-        if missing_state_specs:
-            self.missing_state_specs = missing_state_specs
-
-    def complete(self):
-        from_function = FromFunction()
-        if not self.missing_state_specs:
-            invocation_result = from_function.invocation_result
-            context = self.context
-            self.add_mutations(context, invocation_result)
-            self.add_outgoing_messages(context, invocation_result)
-            self.add_delayed_messages(context, invocation_result)
-            self.add_egress(context, invocation_result)
-            # reset the state for the next invocation
-            self.batch = None
-            self.context = None
-            self.target_function = None
-            # return the result
-        else:
-            incomplete_context = from_function.incomplete_invocation_context
-            self.add_missing_state_specs(self.missing_state_specs, incomplete_context)
-        return from_function.SerializeToString()
-
-    @staticmethod
-    def provided_state_values(to_function):
-        return {s.state_name: to_proto_any_state(s.state_value) for s in to_function.invocation.state}
-
-    @staticmethod
-    def add_outgoing_messages(context, invocation_result):
-        outgoing_messages = invocation_result.outgoing_messages
-        for typename, id, message in context.messages:
-            outgoing = outgoing_messages.add()
-
-            namespace, type = parse_typename(typename)
-            outgoing.target.namespace = namespace
-            outgoing.target.type = type
-            outgoing.target.id = id
-            outgoing.argument.CopyFrom(from_proto_any(message))
-
-    @staticmethod
-    def add_mutations(context, invocation_result):
-        for name, handle in context.states.items():
-            if not handle.modified:
-                continue
-            mutation = invocation_result.state_mutations.add()
-
-            mutation.state_name = name
-            if handle.deleted:
-                mutation.mutation_type = FromFunction.PersistedValueMutation.MutationType.Value('DELETE')
-            else:
-                mutation.mutation_type = FromFunction.PersistedValueMutation.MutationType.Value('MODIFY')
-                mutation.state_value.CopyFrom(from_proto_any_state(handle))
-
-    @staticmethod
-    def add_delayed_messages(context, invocation_result):
-        delayed_invocations = invocation_result.delayed_invocations
-        for delay, typename, id, message in context.delayed_messages:
-            outgoing = delayed_invocations.add()
-
-            namespace, type = parse_typename(typename)
-            outgoing.target.namespace = namespace
-            outgoing.target.type = type
-            outgoing.target.id = id
-            outgoing.delay_in_ms = delay
-            outgoing.argument.CopyFrom(from_proto_any(message))
-
-    @staticmethod
-    def add_egress(context, invocation_result):
-        outgoing_egresses = invocation_result.outgoing_egresses
-        for typename, message in context.egresses:
-            outgoing = outgoing_egresses.add()
-
-            namespace, type = parse_typename(typename)
-            outgoing.egress_namespace = namespace
-            outgoing.egress_type = type
-            outgoing.argument.CopyFrom(from_proto_any(message))
-
-    @staticmethod
-    def add_missing_state_specs(missing_state_specs, incomplete_context_response):
-        missing_values = incomplete_context_response.missing_values
-        for state_spec in missing_state_specs:
-            missing_value = missing_values.add()
-            missing_value.state_name = state_spec.name
-
-            # TODO see the comment in typed_value_utils.from_proto_any_state on
-            # TODO the reason to use this specific typename
-            missing_value.type_typename = "type.googleapis.com/google.protobuf.Any"
-
-            protocol_expiration_spec = FromFunction.ExpirationSpec()
-            sdk_expiration_spec = state_spec.expiration
-            if not sdk_expiration_spec:
-                protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.NONE
-            else:
-                protocol_expiration_spec.expire_after_millis = sdk_expiration_spec.expire_after_millis
-                if sdk_expiration_spec.expire_mode is Expiration.Mode.AFTER_INVOKE:
-                    protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.AFTER_INVOKE
-                elif sdk_expiration_spec.expire_mode is Expiration.Mode.AFTER_WRITE:
-                    protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.AFTER_WRITE
-                else:
-                    raise ValueError("Unexpected state expiration mode.")
-            missing_value.expiration_spec.CopyFrom(protocol_expiration_spec)
-
-
-class RequestReplyHandler:
-    def __init__(self, functions):
-        self.functions = functions
-
-    def __call__(self, request_bytes):
-        ic = InvocationContext(self.functions)
-        ic.setup(request_bytes)
-        if not ic.missing_state_specs:
-            self.handle_invocation(ic)
-        return ic.complete()
-
-    @staticmethod
-    def handle_invocation(ic: InvocationContext):
-        batch = ic.batch
-        context = ic.context
-        target_function = ic.target_function
-        fun = target_function.func
-        for invocation in batch:
-            context.prepare(invocation)
-            any_arg = to_proto_any(invocation.argument)
-            unpacked = target_function.unpack_any(any_arg)
-            if not unpacked:
-                fun(context, any_arg)
-            else:
-                fun(context, unpacked)
-
-
-class AsyncRequestReplyHandler:
-    def __init__(self, functions):
-        self.functions = functions
-
-    async def __call__(self, request_bytes):
-        ic = InvocationContext(self.functions)
-        ic.setup(request_bytes)
-        if not ic.missing_state_specs:
-            await self.handle_invocation(ic)
-        return ic.complete()
-
-    @staticmethod
-    async def handle_invocation(ic: InvocationContext):
-        batch = ic.batch
-        context = ic.context
-        target_function = ic.target_function
-        fun = target_function.func
-        for invocation in batch:
-            context.prepare(invocation)
-            any_arg = to_proto_any(invocation.argument)
-            unpacked = target_function.unpack_any(any_arg)
-            if not unpacked:
-                await fun(context, any_arg)
-            else:
-                await fun(context, unpacked)
-
-
-class BatchContext(object):
-    def __init__(self, target, states):
-        self.states = states
-        # remember own address
-        self.address = SdkAddress(target.namespace, target.type, target.id)
-        # the caller address would be set for each individual invocation in the batch
-        self.caller = None
-        # outgoing messages
-        self.messages = []
-        self.delayed_messages = []
-        self.egresses = []
-
-    def prepare(self, invocation):
-        """setup per invocation """
-        if invocation.caller:
-            caller = invocation.caller
-            self.caller = SdkAddress(caller.namespace, caller.type, caller.id)
-        else:
-            self.caller = None
-
-    # --------------------------------------------------------------------------------------
-    # state access
-    # --------------------------------------------------------------------------------------
-
-    def state(self, name):
-        if name not in self.states:
-            raise StateRegistrationError(
-                'unknown state name ' + name + '; states need to be explicitly registered when binding functions.')
-        return self.states[name]
-
-    def __getitem__(self, name):
-        return self.state(name).value
-
-    def __delitem__(self, name):
-        state = self.state(name)
-        del state.value
-
-    def __setitem__(self, name, value):
-        state = self.state(name)
-        state.value = value
-
-    # --------------------------------------------------------------------------------------
-    # messages
-    # --------------------------------------------------------------------------------------
-
-    def send(self, typename: str, id: str, message: Any):
-        """
-        Send a message to a function of type and id.
-
-        :param typename: the target function type name, for example: "org.apache.flink.statefun/greeter"
-        :param id: the id of the target function
-        :param message: the message to send
-        """
-        if not typename:
-            raise ValueError("missing type name")
-        if not id:
-            raise ValueError("missing id")
-        if not message:
-            raise ValueError("missing message")
-        out = (typename, id, message)
-        self.messages.append(out)
-
-    def pack_and_send(self, typename: str, id: str, message):
-        """
-        Send a Protobuf message to a function.
-
-        This variant of send, would first pack this message
-        into a google.protobuf.Any and then send it.
-
-        :param typename: the target function type name, for example: "org.apache.flink.statefun/greeter"
-        :param id: the id of the target function
-        :param message: the message to pack into an Any and the send.
-        """
-        if not message:
-            raise ValueError("missing message")
-        any = Any()
-        any.Pack(message)
-        self.send(typename, id, any)
-
-    def reply(self, message: Any):
-        """
-        Reply to the sender (assuming there is a sender)
-
-        :param message: the message to reply to.
-        """
-        caller = self.caller
-        if not caller:
-            raise AssertionError(
-                "Unable to reply without a caller. Was this message was sent directly from an ingress?")
-        self.send(caller.typename(), caller.identity, message)
-
-    def pack_and_reply(self, message):
-        """
-        Reply to the sender (assuming there is a sender)
-
-        :param message: the message to reply to.
-        """
-        any = Any()
-        any.Pack(message)
-        self.reply(any)
-
-    def send_after(self, delay: timedelta, typename: str, id: str, message: Any):
-        """
-        Send a message to a function of type and id.
-
-        :param delay: the amount of time to wait before sending this message.
-        :param typename: the target function type name, for example: "org.apache.flink.statefun/greeter"
-        :param id: the id of the target function
-        :param message: the message to send
-        """
-        if not delay:
-            raise ValueError("missing delay")
-        if not typename:
-            raise ValueError("missing type name")
-        if not id:
-            raise ValueError("missing id")
-        if not message:
-            raise ValueError("missing message")
-        duration_ms = int(delay.total_seconds() * 1000.0)
-        out = (duration_ms, typename, id, message)
-        self.delayed_messages.append(out)
-
-    def pack_and_send_after(self, delay: timedelta, typename: str, id: str, message):
-        """
-        Send a message to a function of type and id.
-
-        :param delay: the amount of time to wait before sending this message.
-        :param typename: the target function type name, for example: "org.apache.flink.statefun/greeter"
-        :param id: the id of the target function
-        :param message: the message to send
-        """
-        if not message:
-            raise ValueError("missing message")
-        any = Any()
-        any.Pack(message)
-        self.send_after(delay, typename, id, any)
-
-    def send_egress(self, typename, message: Any):
-        """
-        Sends a message to an egress defined by @typename
-        :param typename: an egress identifier of the form <namespace>/<name>
-        :param message: the message to send.
-        """
-        if not typename:
-            raise ValueError("missing type name")
-        if not message:
-            raise ValueError("missing message")
-        self.egresses.append((typename, message))
-
-    def pack_and_send_egress(self, typename, message):
-        """
-        Sends a message to an egress defined by @typename
-        :param typename: an egress identifier of the form <namespace>/<name>
-        :param message: the message to send.
-        """
-        if not message:
-            raise ValueError("missing message")
-        any = Any()
-        any.Pack(message)
-        self.send_egress(typename, any)
diff --git a/statefun-python-sdk/statefun/request_reply_v3.py b/statefun-python-sdk/statefun/request_reply_v3.py
new file mode 100644
index 0000000..3eb258f
--- /dev/null
+++ b/statefun-python-sdk/statefun/request_reply_v3.py
@@ -0,0 +1,237 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import asyncio
+import typing
+from datetime import timedelta
+
+import statefun.context
+from statefun.core import parse_typename, ValueSpec, SdkAddress
+from statefun.messages import Message, EgressMessage
+from statefun.statefun_builder import StatefulFunctions, StatefulFunction
+
+# generated function protocol
+from statefun.request_reply_pb2 import ToFunction, FromFunction, Address, TypedValue
+from statefun.storage import resolve, Cell
+
+
+class UserFacingContext(statefun.context.Context):
+    __slots__ = (
+        "_self_address", "_outgoing_messages", "_outgoing_delayed_messages", "_outgoing_egress_messages", "_storage",
+        "_caller")
+
+    def __init__(self, address, storage):
+        self._self_address = address
+        self._outgoing_messages = []
+        self._outgoing_delayed_messages = []
+        self._outgoing_egress_messages = []
+        self._storage = storage
+        self._caller = None
+
+    @property
+    def address(self) -> SdkAddress:
+        """
+
+        :return: the address of the currently executing function. the address is of the form (typename, id)
+        """
+        return self._self_address
+
+    @property
+    def storage(self):
+        return self._storage
+
+    @property
+    def caller(self):
+        return self._caller
+
+    def send(self, message: Message):
+        """
+        Send a message to a function.
+
+        :param message: a message to send.
+        """
+        self._outgoing_messages.append(message)
+
+    def send_after(self, duration: timedelta, message: Message):
+        """
+        Send a message to a target function after a specified delay.
+
+        :param duration: the amount of time to wait before sending this message out.
+        :param message: the message to send.
+        """
+        ms = int(duration.total_seconds() * 1000.0)
+        self._outgoing_delayed_messages.append((ms, message))
+
+    def send_egress(self, message: EgressMessage):
+        """
+        Send a message to an egress.
+
+        :param message: the EgressMessage to send.
+        """
+        self._outgoing_egress_messages.append(message)
+
+
+# -------------------------------------------------------------------------------------------------------------------
+# Protobuf Helpers
+# -------------------------------------------------------------------------------------------------------------------
+
+def sdk_address_from_pb(addr: Address) -> typing.Optional[SdkAddress]:
+    if not addr:
+        return None
+    return SdkAddress(namespace=addr.namespace,
+                      name=addr.type,
+                      id=addr.id,
+                      typename=f"{addr.namespace}/{addr.type}")
+
+
+# noinspection PyProtectedMember
+def collect_success(ctx: UserFacingContext) -> FromFunction:
+    pb_from_function = FromFunction()
+    pb_invocation_result = pb_from_function.invocation_result
+    collect_messages(ctx._outgoing_messages, pb_invocation_result)
+    collect_delayed(ctx._outgoing_delayed_messages, pb_invocation_result)
+    collect_egress(ctx._outgoing_egress_messages, pb_invocation_result)
+    collect_mutations(ctx._storage._cells, pb_invocation_result)
+    return pb_from_function
+
+
+def collect_failure(missing_state_specs: typing.List[ValueSpec]) -> FromFunction:
+    pb_from_function = FromFunction()
+    incomplete_context = pb_from_function.incomplete_invocation_context
+    missing_values = incomplete_context.missing_values
+    for state_spec in missing_state_specs:
+        missing_value = missing_values.add()
+        missing_value.state_name = state_spec.name
+        missing_value.type_typename = state_spec.type.typename
+
+        protocol_expiration_spec = FromFunction.ExpirationSpec()
+        if not state_spec.after_write and not state_spec.after_call:
+            protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.NONE
+        else:
+            protocol_expiration_spec.expire_after_millis = state_spec.duration
+            if state_spec.after_call:
+                protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.AFTER_INVOKE
+            elif state_spec.after_write:
+                protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.AFTER_WRITE
+            else:
+                raise ValueError("Unexpected state expiration mode.")
+        missing_value.expiration_spec.CopyFrom(protocol_expiration_spec)
+    return pb_from_function
+
+
+def collect_messages(messages: typing.List[Message], pb_invocation_result):
+    pb_outgoing_messages = pb_invocation_result.outgoing_messages
+    for message in messages:
+        outgoing = pb_outgoing_messages.add()
+
+        namespace, type = parse_typename(message.target_typename)
+        outgoing.target.namespace = namespace
+        outgoing.target.type = type
+        outgoing.target.id = message.target_id
+        outgoing.argument.CopyFrom(message.typed_value)
+
+
+def collect_delayed(delayed_messages: typing.List[typing.Tuple[timedelta, Message]], invocation_result):
+    delayed_invocations = invocation_result.delayed_invocations
+    for delay, message in delayed_messages:
+        outgoing = delayed_invocations.add()
+
+        namespace, type = parse_typename(message.target_typename)
+        outgoing.target.namespace = namespace
+        outgoing.target.type = type
+        outgoing.target.id = message.target_id
+        outgoing.delay_in_ms = delay
+        outgoing.argument.CopyFrom(message.typed_value)
+
+
+def collect_egress(egresses: typing.List[EgressMessage], invocation_result):
+    outgoing_egresses = invocation_result.outgoing_egresses
+    for message in egresses:
+        outgoing = outgoing_egresses.add()
+
+        namespace, type = parse_typename(message.typename)
+        outgoing.egress_namespace = namespace
+        outgoing.egress_type = type
+        outgoing.argument.CopyFrom(message.typed_value)
+
+
+def collect_mutations(cells: typing.Dict[str, Cell], invocation_result):
+    for key, cell in cells.items():
+        if not cell.dirty:
+            continue
+        mutation = invocation_result.state_mutations.add()
+        mutation.state_name = key
+        val: typing.Optional[TypedValue] = cell.typed_value
+        if val is None:
+            # it is deleted.
+            mutation.mutation_type = FromFunction.PersistedValueMutation.MutationType.Value('DELETE')
+        else:
+            mutation.mutation_type = FromFunction.PersistedValueMutation.MutationType.Value('MODIFY')
+            mutation.state_value.CopyFrom(val)
+
+
+# --------------------------------------------------------------------------------------------------------------------
+# The main Request Reply Handler.
+# --------------------------------------------------------------------------------------------------------------------
+
+class RequestReplyHandler(object):
+    def __init__(self, functions: StatefulFunctions):
+        if not functions:
+            raise ValueError("functions must be provided.")
+        self.functions = functions
+
+    def handle_sync(self, request_bytes: typing.Union[str, bytes, bytearray]) -> bytes:
+        return asyncio.run(self.handle_async(request_bytes))
+
+    async def handle_async(self, request_bytes: typing.Union[str, bytes, bytearray]) -> bytes:
+        # parse
+        pb_to_function = ToFunction()
+        pb_to_function.ParseFromString(request_bytes)
+        # target address
+        pb_target_address = pb_to_function.invocation.target
+        sdk_address = sdk_address_from_pb(pb_target_address)
+        # target stateful function
+        target_fn: StatefulFunction = self.functions.for_typename(sdk_address.typename)
+        if not target_fn:
+            raise ValueError(f"Unable to find a function of type {sdk_address.typename}")
+        # resolve state
+        res = resolve(target_fn.storage_spec, pb_to_function.invocation.state)
+        if res.missing_specs:
+            pb_from_function = collect_failure(res.missing_specs)
+            return pb_from_function.SerializeToString()
+        # invoke the batch
+        ctx = UserFacingContext(sdk_address, res.storage)
+        fun = target_fn.fun
+        pb_batch = pb_to_function.invocation.invocations
+        if target_fn.is_async:
+            for pb_invocation in pb_batch:
+                msg = Message(target_typename=sdk_address.typename, target_id=sdk_address.id,
+                              typed_value=pb_invocation.argument)
+                ctx._caller = sdk_address_from_pb(pb_invocation.caller)
+                # await for an async function to complete.
+                # noinspection PyUnresolvedReferences
+                await fun(ctx, msg)
+        else:
+            for pb_invocation in pb_batch:
+                msg = Message(target_typename=sdk_address.typename, target_id=sdk_address.id,
+                              typed_value=pb_invocation.argument)
+                ctx._caller = sdk_address_from_pb(pb_invocation.caller)
+                # we need to call the function directly ¯\_(ツ)_/¯
+                fun(ctx, msg)
+        # collect the results
+        pb_from_function = collect_success(ctx)
+        return pb_from_function.SerializeToString()
diff --git a/statefun-python-sdk/statefun/statefun_builder.py b/statefun-python-sdk/statefun/statefun_builder.py
new file mode 100644
index 0000000..ade3b63
--- /dev/null
+++ b/statefun-python-sdk/statefun/statefun_builder.py
@@ -0,0 +1,84 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import typing
+
+from statefun.core import ValueSpec
+from statefun.context import Context
+from statefun.messages import Message
+from statefun.storage import make_address_storage_spec, StorageSpec
+import inspect
+
+
+class StatefulFunction(object):
+    __slots__ = ("fun", "storage_spec", "is_async")
+
+    def __init__(self,
+                 fun: typing.Callable[[Context, Message], None],
+                 specs: StorageSpec,
+                 is_async: bool):
+        if fun is None:
+            raise ValueError("function code is missing.")
+        self.fun = fun
+        if specs is None:
+            raise ValueError("storage spec is missing.")
+        self.storage_spec = specs
+        self.is_async = is_async
+
+
+class StatefulFunctions(object):
+    __slots__ = ("_functions",)
+
+    def __init__(self):
+        self._functions = {}
+
+    def register(self, typename: str, fun, specs: typing.Optional[typing.List[ValueSpec]] = None):
+        """registers a StatefulFunction function instance, under the given namespace with the given function type. """
+        if fun is None:
+            raise ValueError("function instance must be provided")
+        if not typename:
+            raise ValueError("function typename must be provided")
+        storage_spec = make_address_storage_spec(specs if specs else [])
+        is_async = inspect.iscoroutinefunction(fun)
+        sig = inspect.getfullargspec(fun)
+        if len(sig.args) != 2:
+            raise ValueError(
+                f"The registered function ${typename} does not expect a context and a message but rather {sig.args}.")
+        self._functions[typename] = StatefulFunction(fun=fun, specs=storage_spec, is_async=is_async)
+
+    def bind(self, typename, specs: typing.List[ValueSpec] = None):
+        """wraps a StatefulFunction instance with a given namespace and type.
+           for example:
+            s = StatefulFunctions()
+
+            @s.define("com.foo.bar/greeter")
+            def greeter(context, message):
+                print("Hi there")
+
+            This would add an invokable stateful function that can accept messages
+            sent to "com.foo.bar/greeter".
+         """
+
+        def wrapper(function):
+            self.register(typename, function, specs)
+            return function
+
+        return wrapper
+
+    def for_typename(self, typename: str) -> StatefulFunction:
+        return self._functions[typename]
diff --git a/statefun-python-sdk/statefun/storage.py b/statefun-python-sdk/statefun/storage.py
new file mode 100644
index 0000000..0eafe94
--- /dev/null
+++ b/statefun-python-sdk/statefun/storage.py
@@ -0,0 +1,152 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import typing
+from dataclasses import dataclass
+
+from statefun.core import ValueSpec, Type
+from statefun.request_reply_pb2 import TypedValue, ToFunction
+
+from statefun.utils import to_typed_value, from_typed_value
+
+
+@dataclass(frozen=True, repr=True, eq=True, order=False)
+class StorageSpec:
+    make_instance: typing.Any
+    names: typing.FrozenSet[str]
+    specs: typing.List[ValueSpec]
+
+
+@dataclass(frozen=True, eq=False, repr=False, order=False)
+class Resolution:
+    missing_specs: typing.Union[None, typing.List[ValueSpec]]
+    storage: typing.Union[None, typing.Any]  # this holds the storage instance.
+
+
+class Cell(object):
+    __slots__ = ("tpe", "typed_value", "dirty")
+
+    def __init__(self, tpe: Type, typed_value: typing.Optional[TypedValue]):
+        # read only
+        self.tpe = tpe
+        # mutable
+        self.typed_value = typed_value
+        self.dirty = False
+
+    def get(self):
+        typed_value = self.typed_value
+        return from_typed_value(self.tpe, typed_value)
+
+    def set(self, val):
+        if val is None:
+            raise ValueError('provided value must not be None. To delete a value, please use del.')
+        tpe = self.tpe
+        typed_value = to_typed_value(tpe, val)
+        self.typed_value = typed_value
+        self.dirty = True
+
+    def delete(self):
+        self.typed_value = None
+        self.dirty = True
+
+
+# self.cells: typing.Dict[str, Cell] = {name: Cell(name, tpe, vals[name]) for name, tpe in types.items()}
+
+
+def storage_constructor(self, cells: typing.Dict[str, Cell]):
+    self._cells = cells
+
+
+def property_named(name):
+    """
+    Creates a property, that delegates all the operations to an instance
+    of a HiddenClass, that is expected to be member of target class (the class that
+    this property will be part of).
+
+    :param name: the name of this field
+    :return: a property as described above.
+    """
+
+    def fget(self):
+        cell: Cell = self._cells.get(name)
+        if not cell:
+            raise AttributeError(name)
+        return cell.get()
+
+    def fset(self, val):
+        cell: Cell = self._cells.get(name)
+        if not cell:
+            raise AttributeError(name)
+        cell.set(val)
+
+    def fdel(self):
+        cell: Cell = self._cells.get(name)
+        if not cell:
+            raise AttributeError(name)
+        cell.delete()
+
+    return property(fget, fset, fdel)
+
+
+def make_address_storage_spec(specs: typing.List[ValueSpec]) -> StorageSpec:
+    """
+    Creates an StorageSpec from user supplied value specs.
+    :param specs: a list of specs as supplied by the user.
+    :return: a StorageSpec.
+    """
+    props = {"__init__": storage_constructor, "__slots__": ["_cells"]}
+    for spec in specs:
+        if spec.name in props:
+            raise ValueError("duplicate registered value name: " + spec.name)
+        props[spec.name] = property_named(spec.name)
+    cls = type("GeneratedAddressedScopedStorage", (object,), props)
+    return StorageSpec(make_instance=cls,
+                       names=frozenset(spec.name for spec in specs),
+                       specs=specs)
+
+
+def resolve(storage: StorageSpec,
+            values: typing.List[ToFunction.PersistedValue]) -> Resolution:
+    """
+    Resolve the registered specs and the actually received values.
+
+    :param storage: a storage factory
+    :param values: the actually received values
+    :return: a Resolution result, that might have either a list of missing specs
+    (specs that were defined by the user but didn't arrived from StateFun) or a
+     successful resolution, with an instance of an addressed scoped storage.
+    """
+    #
+    # index the actually received values (TypedValue) by name.
+    #
+    received: typing.Dict[str, TypedValue] = {state.state_name: state.state_value for state in values}
+    #
+    # see if any of the specs are missing.
+    #
+    missing_keys = storage.names - received.keys()
+    if missing_keys:
+        # keep the missing specs in exactly the same order as they were originally defined.
+        # This is not strictly required from the protocol point of view, but it makes it a bit easier
+        # to troubleshoot.
+        missing = [spec for spec in storage.specs if spec.name in missing_keys]
+        return Resolution(missing_specs=missing, storage=None)
+    else:
+        cells: typing.Dict[str, Cell] = {spec.name: Cell(tpe=spec.type, typed_value=received[spec.name]) for spec in
+                                         storage.specs}
+        s = storage.make_instance(cells)
+        return Resolution(missing_specs=None, storage=s)
diff --git a/statefun-python-sdk/statefun/typed_value_utils.py b/statefun-python-sdk/statefun/typed_value_utils.py
deleted file mode 100644
index 8706800..0000000
--- a/statefun-python-sdk/statefun/typed_value_utils.py
+++ /dev/null
@@ -1,49 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-from google.protobuf.any_pb2 import Any
-
-from statefun.core import AnyStateHandle
-from statefun.request_reply_pb2 import TypedValue
-
-#
-# Utility methods to covert back and forth from Protobuf Any to our TypedValue.
-# TODO this conversion needs to take place only because the Python SDK still works with Protobuf Any's
-# TODO this would soon go away by letting the SDK work directly with TypedValues.
-#
-
-def to_proto_any(typed_value: TypedValue):
-    proto_any = Any()
-    proto_any.type_url = typed_value.typename
-    proto_any.value = typed_value.value
-    return proto_any
-
-def from_proto_any(proto_any: Any):
-    typed_value = TypedValue()
-    typed_value.typename = proto_any.type_url
-    typed_value.value = proto_any.value
-    return typed_value
-
-def from_proto_any_state(any_state_handle: AnyStateHandle):
-    typed_value = TypedValue()
-    typed_value.typename = any_state_handle.typename()
-    typed_value.value = any_state_handle.bytes()
-    return typed_value
-
-def to_proto_any_state(typed_value: TypedValue) -> AnyStateHandle:
-    return AnyStateHandle(typed_value.value)
diff --git a/statefun-python-sdk/statefun/utils.py b/statefun-python-sdk/statefun/utils.py
new file mode 100644
index 0000000..e04d30c
--- /dev/null
+++ b/statefun-python-sdk/statefun/utils.py
@@ -0,0 +1,79 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+
+from statefun.request_reply_pb2 import TypedValue
+from statefun.core import Type, simple_type
+
+
+def to_typed_value_deduce(value: typing.Union[str, bytes, bytearray],
+                          value_type: typing.Union[None, Type] = None) -> bytes:
+    if value_type:
+        ser = value_type.serializer()
+        return ser.serialize(value)
+    if isinstance(value, (bytes, bytearray)):
+        return bytes(value)
+    try:
+        return value.encode('utf-8')
+    except AttributeError:
+        pass
+    raise TypeError(
+        "Unable to convert value to bytes. strings, bytes, and anytype that has __bytes__ attribute is supported.")
+
+
+def from_typed_value(tpe: Type, typed_value: typing.Optional[TypedValue]):
+    if not typed_value or not typed_value.has_value:
+        return None
+    if tpe.typename != typed_value.typename:
+        raise TypeError(
+            f"Type mismatch: "
+            f"type {typed_value.typename} (remote), yet it is specified locally "
+            f"as {tpe.typename} ")
+    ser = tpe.serializer()
+    return ser.deserialize(typed_value.value)
+
+
+def to_typed_value(type, value):
+    typed_value = TypedValue()
+    typed_value.typename = type.typename
+    if value is None:
+        typed_value.has_value = False
+        return typed_value
+    typed_value.has_value = True
+    ser = type.serializer()
+    typed_value.value = ser.serialize(value)
+    return typed_value
+
+
+def make_protobuf_type(cls, namespace: str = None) -> Type:
+    def deserialize_fn(b):
+        v = cls()
+        v.ParseFromString(b)
+        return v
+
+    def serialize_fn(v):
+        return v.SerializeToString()
+
+    if not namespace:
+        namespace = "type.googleapis.com"
+    if not cls:
+        raise ValueError("The Protobuf generated class is missing.")
+    name = cls().DESCRIPTOR.full_name
+    if not name:
+        raise TypeError("Unable to deduce the Protobuf Message full name.")
+    return simple_type(typename=f"{namespace}/{name}", serialize_fn=serialize_fn, deserialize_fn=deserialize_fn)
diff --git a/statefun-python-sdk/statefun/wrapper_types.py b/statefun-python-sdk/statefun/wrapper_types.py
new file mode 100644
index 0000000..42aeba2
--- /dev/null
+++ b/statefun-python-sdk/statefun/wrapper_types.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from statefun.core import Type, TypeSerializer
+from statefun.types_pb2 import *
+
+
+class ProtobufWrappingTypeSerializer(TypeSerializer):
+    __slots__ = ("wrapper",)
+
+    def __init__(self, wrapper):
+        self.wrapper = wrapper
+
+    def serialize(self, value):
+        instance = self.wrapper()
+        instance.value = value
+        return instance.SerializeToString()
+
+    def deserialize(self, string):
+        instance = self.wrapper()
+        instance.ParseFromString(string)
+        return instance.value
+
+
+class ProtobufWrappingType(Type):
+    __slots__ = ("wrapper",)
+
+    def __init__(self, typename, wrapper_message_type):
+        super().__init__(typename)
+        self.wrapper = wrapper_message_type
+
+    def serializer(self) -> TypeSerializer:
+        return ProtobufWrappingTypeSerializer(self.wrapper)
+
+
+BoolType = ProtobufWrappingType("io.statefun.types/bool", BooleanWrapper)
+IntType = ProtobufWrappingType("io.statefun.types/int", IntWrapper)
+FloatType = ProtobufWrappingType("io.statefun.types/float", FloatWrapper)
+LongType = ProtobufWrappingType("io.statefun.types/long", LongWrapper)
+DoubleType = ProtobufWrappingType("io.statefun.types/double", DoubleWrapper)
+StringType = ProtobufWrappingType("io.statefun.types/string", StringWrapper)
+
+PY_TYPE_TO_WRAPPER_TYPE = {
+    int: IntType,
+    bool: BoolType,
+    float: FloatType,
+    str: StringType
+}
+
+WRAPPER_TYPE_TO_PY_TYPE = {
+    IntType: int,
+    BoolType: bool,
+    FloatType: float,
+    StringType: str
+}
diff --git a/statefun-python-sdk/tests/__init__.py b/statefun-python-sdk/tests/__init__.py
index bef0767..da24a0e 100644
--- a/statefun-python-sdk/tests/__init__.py
+++ b/statefun-python-sdk/tests/__init__.py
@@ -16,8 +16,10 @@
 # limitations under the License.
 ################################################################################
 
-from tests.any_state_handle_test import *
 from tests.typename_test import *
 from tests.statefun_test import *
 from tests.request_reply_test import *
-from tests.type_deduction_test import *
+from tests.types_test import *
+from tests.storage_test import *
+from tests.valuespec_test import *
+from tests.message_test import *
diff --git a/statefun-python-sdk/tests/any_state_handle_test.py b/statefun-python-sdk/tests/any_state_handle_test.py
deleted file mode 100644
index d26f1c0..0000000
--- a/statefun-python-sdk/tests/any_state_handle_test.py
+++ /dev/null
@@ -1,74 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import unittest
-
-from google.protobuf.any_pb2 import Any
-
-from statefun.core import AnyStateHandle
-
-
-def typed_any(type_url):
-    any = Any()
-    if type_url:
-        any.type_url = type_url
-    return any
-
-
-def typed_any_bytes(type_url=None):
-    return typed_any(type_url).SerializeToString()
-
-
-class AnyHandleTestCase(unittest.TestCase):
-
-    def test_example(self):
-        handle = AnyStateHandle(typed_any_bytes("com/hello"))
-
-        self.assertEqual(handle.value.type_url, "com/hello")
-
-    def test_delete_handle(self):
-        handle = AnyStateHandle(typed_any_bytes("com/hello"))
-
-        del handle.value
-
-        self.assertTrue(handle.value is None)
-
-    def test_modify_handle(self):
-        handle = AnyStateHandle(typed_any_bytes("com/hello"))
-
-        handle.value = typed_any("com/world")
-
-        self.assertEqual(handle.value.type_url, "com/world")
-
-    def test_un_modified_bytes(self):
-        handle = AnyStateHandle(typed_any_bytes("com/hello"))
-        handle = AnyStateHandle(handle.bytes())
-
-        self.assertEqual(handle.value.type_url, "com/hello")
-
-    def test_modified_bytes(self):
-        handle = AnyStateHandle(typed_any_bytes("com/hello"))
-
-        handle.value = typed_any("com/world")
-        handle = AnyStateHandle(handle.bytes())
-
-        self.assertEqual(handle.value.type_url, "com/world")
-
-    def test_missing_value(self):
-        handle = AnyStateHandle(None)
-        self.assertIsNone(handle.value)
diff --git a/statefun-python-sdk/tests/examples_pb2.py b/statefun-python-sdk/tests/examples_pb2.py
deleted file mode 100644
index f917ca1..0000000
--- a/statefun-python-sdk/tests/examples_pb2.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# -*- coding: utf-8 -*-
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# source: flask.proto
-
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
-  name='flask.proto',
-  package='k8s.demo',
-  syntax='proto3',
-  serialized_options=None,
-  serialized_pb=b'\n\x0b\x66lask.proto\x12\x08k8s.demo\"\x1f\n\nLoginEvent\x12\x11\n\tuser_name\x18\x01 \x01(\t\"\x19\n\tSeenCount\x12\x0c\n\x04seen\x18\x01 \x01(\x03\x62\x06proto3'
-)
-
-
-
-
-_LOGINEVENT = _descriptor.Descriptor(
-  name='LoginEvent',
-  full_name='k8s.demo.LoginEvent',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='user_name', full_name='k8s.demo.LoginEvent.user_name', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=b"".decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=25,
-  serialized_end=56,
-)
-
-
-_SEENCOUNT = _descriptor.Descriptor(
-  name='SeenCount',
-  full_name='k8s.demo.SeenCount',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='seen', full_name='k8s.demo.SeenCount.seen', index=0,
-      number=1, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=58,
-  serialized_end=83,
-)
-
-DESCRIPTOR.message_types_by_name['LoginEvent'] = _LOGINEVENT
-DESCRIPTOR.message_types_by_name['SeenCount'] = _SEENCOUNT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-LoginEvent = _reflection.GeneratedProtocolMessageType('LoginEvent', (_message.Message,), {
-  'DESCRIPTOR' : _LOGINEVENT,
-  '__module__' : 'flask_pb2'
-  # @@protoc_insertion_point(class_scope:k8s.demo.LoginEvent)
-  })
-_sym_db.RegisterMessage(LoginEvent)
-
-SeenCount = _reflection.GeneratedProtocolMessageType('SeenCount', (_message.Message,), {
-  'DESCRIPTOR' : _SEENCOUNT,
-  '__module__' : 'flask_pb2'
-  # @@protoc_insertion_point(class_scope:k8s.demo.SeenCount)
-  })
-_sym_db.RegisterMessage(SeenCount)
-
-
-# @@protoc_insertion_point(module_scope)
diff --git a/statefun-python-sdk/tests/message_test.py b/statefun-python-sdk/tests/message_test.py
new file mode 100644
index 0000000..d265a62
--- /dev/null
+++ b/statefun-python-sdk/tests/message_test.py
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+import statefun
+
+from statefun import message_builder
+
+
+class MessageTestCase(unittest.TestCase):
+
+    def test_example(self):
+        m = message_builder(target_typename="foo/bar", target_id="a", int_value=1)
+
+        self.assertTrue(m.is_int())
+        self.assertEqual(m.as_int(), 1)
+
+    def test_with_type(self):
+        m = message_builder(target_typename="foo/bar", target_id="a", value=5.0, value_type=statefun.FloatType)
+        self.assertTrue(m.is_float())
+        self.assertEqual(m.as_float(), 5.0)
+
+    def test_kafka_egress(self):
+        record = statefun.kafka_egress_message(typename="foo/bar", topic="topic", value=1337420)
+
+        self.assertEqual(record.typed_value.typename, "type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord")
+        self.assertTrue(record.typed_value.has_value)
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index 157bba6..612750f 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -15,18 +15,14 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import asyncio
 import unittest
 from datetime import timedelta
 
 from google.protobuf.json_format import MessageToDict
-from google.protobuf.any_pb2 import Any
 
-from tests.examples_pb2 import LoginEvent, SeenCount
-from statefun.request_reply_pb2 import ToFunction, FromFunction, TypedValue
-from statefun import RequestReplyHandler, AsyncRequestReplyHandler
-from statefun import StatefulFunctions, StateSpec, AfterWrite, StateRegistrationError
-from statefun import kafka_egress_record, kinesis_egress_record
+from statefun import *
+from statefun.request_reply_pb2 import ToFunction, FromFunction
+from statefun.utils import to_typed_value
 
 
 class InvocationBuilder(object):
@@ -39,43 +35,25 @@
         InvocationBuilder.set_address(ns, type, id, self.to_function.invocation.target)
         return self
 
-    def with_state(self, name, value=None):
+    def with_state(self, name, value=None, type=None):
         state = self.to_function.invocation.state.add()
         state.state_name = name
-        if value:
-            state.state_value.CopyFrom(self.to_typed_value_any_state(value))
+        if value is not None:
+            state.state_value.CopyFrom(to_typed_value(type, value))
         return self
 
-    def with_invocation(self, arg, caller=None):
+    def with_invocation(self, arg, tpe, caller=None):
         invocation = self.to_function.invocation.invocations.add()
         if caller:
             (ns, type, id) = caller
             InvocationBuilder.set_address(ns, type, id, invocation.caller)
-        invocation.argument.CopyFrom(self.to_typed_value(arg))
+        invocation.argument.CopyFrom(to_typed_value(tpe, arg))
         return self
 
     def SerializeToString(self):
         return self.to_function.SerializeToString()
 
     @staticmethod
-    def to_typed_value(proto_msg):
-        any = Any()
-        any.Pack(proto_msg)
-        typed_value = TypedValue()
-        typed_value.typename = any.type_url
-        typed_value.value = any.value
-        return typed_value
-
-    @staticmethod
-    def to_typed_value_any_state(proto_msg):
-        any = Any()
-        any.Pack(proto_msg)
-        typed_value = TypedValue()
-        typed_value.typename = "type.googleapis.com/google.protobuf.Any"
-        typed_value.value = any.SerializeToString()
-        return typed_value
-
-    @staticmethod
     def set_address(namespace, type, id, address):
         address.namespace = namespace
         address.type = type
@@ -84,17 +62,9 @@
 
 def round_trip(functions: StatefulFunctions, to: InvocationBuilder) -> dict:
     handler = RequestReplyHandler(functions)
-    f = FromFunction()
-    f.ParseFromString(handler(to.SerializeToString()))
-    return MessageToDict(f, preserving_proto_field_name=True)
-
-
-def async_round_trip(functions: StatefulFunctions, to: InvocationBuilder) -> dict:
-    handler = AsyncRequestReplyHandler(functions)
 
     in_bytes = to.SerializeToString()
-    future = handler(in_bytes)
-    out_bytes = asyncio.get_event_loop().run_until_complete(future)
+    out_bytes = handler.handle_sync(in_bytes)
 
     f = FromFunction()
     f.ParseFromString(out_bytes)
@@ -132,63 +102,46 @@
 
         @functions.bind(
             typename='org.foo/greeter',
-            states=[StateSpec('seen')])
-        def fun(context, message):
+            specs=[ValueSpec(name='seen', type=IntType)])
+        def fun(context: Context, message: Message):
+            # messaging
+            if message.is_string():
+                unused = message.as_string()
+                pass
+                # print(f"A string message {message.as_string()}")
+
             # state access
-            seen = context.state('seen').unpack(SeenCount)
-            seen.seen += 1
-            context.state('seen').pack(seen)
+            seen = context.storage.seen
+            context.storage.seen += 1
 
-            # regular state access
-            seenAny = context['seen']
-            seenAny.Unpack(seen)
-
-            # sending and replying
-            context.pack_and_reply(seen)
-
-            any = Any()
-            any.type_url = 'type.googleapis.com/k8s.demo.SeenCount'
-            context.send("bar.baz/foo", "12345", any)
-
+            # sending
+            context.send(message_builder(target_typename="org.foo/greeter-java",
+                                         target_id="0",
+                                         int_value=seen))
             # delayed messages
-            context.send_after(timedelta(hours=1), "night/owl", "1", any)
-
-            # egresses
-            context.send_egress("foo.bar.baz/my-egress", any)
-            context.pack_and_send_egress("foo.bar.baz/my-egress", seen)
-
-            # kafka egress
-            context.pack_and_send_egress("sdk/kafka",
-                                         kafka_egress_record(topic="hello", key=u"hello world", value=seen))
-            context.pack_and_send_egress("sdk/kafka",
-                                         kafka_egress_record(topic="hello", value=seen))
-
-            # AWS Kinesis generic egress
-            context.pack_and_send_egress("sdk/kinesis",
-                                         kinesis_egress_record(
-                                             stream="hello",
-                                             partition_key=u"hello world",
-                                             value=seen,
-                                             explicit_hash_key=u"1234"))
-            context.pack_and_send_egress("sdk/kinesis",
-                                         kinesis_egress_record(
-                                             stream="hello",
-                                             partition_key=u"hello world",
-                                             value=seen))
+            context.send_after(timedelta(hours=1),
+                               message_builder(target_typename="night/owl",
+                                               target_id="1",
+                                               str_value="hoo hoo"))
+            # kafka egresses
+            context.send_egress(
+                kafka_egress_message(typename="e/kafka",
+                                     topic="out",
+                                     key="abc",
+                                     value=1337420))
+            # kinesis egress
+            context.send_egress(kinesis_egress_message(typename="e/kinesis",
+                                                       stream="out",
+                                                       partition_key="abc",
+                                                       value="hello there"))
 
         #
         # build the invocation
         #
         builder = InvocationBuilder()
         builder.with_target("org.foo", "greeter", "0")
-
-        seen = SeenCount()
-        seen.seen = 100
-        builder.with_state("seen", seen)
-
-        arg = LoginEvent()
-        arg.user_name = "user-1"
-        builder.with_invocation(arg, ("org.foo", "greeter-java", "0"))
+        builder.with_state("seen", 1, IntType)
+        builder.with_invocation("Hello", StringType, ("org.foo", "greeter-java", "0"))
 
         #
         # invoke
@@ -200,14 +153,7 @@
         self.assertEqual(first_out_message['target']['namespace'], 'org.foo')
         self.assertEqual(first_out_message['target']['type'], 'greeter-java')
         self.assertEqual(first_out_message['target']['id'], '0')
-        self.assertEqual(first_out_message['argument']['typename'], 'type.googleapis.com/k8s.demo.SeenCount')
-
-        # assert second outgoing message
-        second_out_message = json_at(result_json, NTH_OUTGOING_MESSAGE(1))
-        self.assertEqual(second_out_message['target']['namespace'], 'bar.baz')
-        self.assertEqual(second_out_message['target']['type'], 'foo')
-        self.assertEqual(second_out_message['target']['id'], '12345')
-        self.assertEqual(second_out_message['argument']['typename'], 'type.googleapis.com/k8s.demo.SeenCount')
+        self.assertEqual(first_out_message['argument']['typename'], 'io.statefun.types/int')
 
         # assert state mutations
         first_mutation = json_at(result_json, NTH_STATE_MUTATION(0))
@@ -221,19 +167,20 @@
 
         # assert egresses
         first_egress = json_at(result_json, NTH_EGRESS(0))
-        self.assertEqual(first_egress['egress_namespace'], 'foo.bar.baz')
-        self.assertEqual(first_egress['egress_type'], 'my-egress')
-        self.assertEqual(first_egress['argument']['typename'], 'type.googleapis.com/k8s.demo.SeenCount')
+        self.assertEqual(first_egress['egress_namespace'], 'e')
+        self.assertEqual(first_egress['egress_type'], 'kafka')
+        self.assertEqual(first_egress['argument']['typename'],
+                         'type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord')
 
     def test_integration_incomplete_context(self):
         functions = StatefulFunctions()
 
         @functions.bind(
             typename='org.foo/bar',
-            states=[
-                StateSpec('seen'),
-                StateSpec('missing_state_1'),
-                StateSpec('missing_state_2', expire_after=AfterWrite(timedelta(milliseconds=2000)))
+            specs=[
+                ValueSpec(name='seen', type=IntType),
+                ValueSpec('missing_state_1', type=StringType),
+                ValueSpec('missing_state_2', type=FloatType, expire_after_write=timedelta(milliseconds=2000))
             ])
         def fun(context, message):
             pass
@@ -244,11 +191,8 @@
         builder = InvocationBuilder()
         builder.with_target("org.foo", "bar", "0")
 
-        seen = SeenCount()
-        seen.seen = 100
-        builder.with_state("seen", seen)
-
-        builder.with_invocation(Any(), None)
+        builder.with_state("seen")
+        builder.with_invocation(arg=1, tpe=IntType)
 
         #
         # invoke
@@ -265,132 +209,3 @@
         self.assertEqual(missing_state_2_spec['state_name'], 'missing_state_2')
         self.assertEqual(missing_state_2_spec['expiration_spec']['mode'], 'AFTER_WRITE')
         self.assertEqual(missing_state_2_spec['expiration_spec']['expire_after_millis'], '2000')
-
-    def test_integration_access_non_registered_state(self):
-        functions = StatefulFunctions()
-
-        @functions.bind(
-            typename='org.foo/bar',
-            states=[StateSpec('seen')])
-        def fun(context, message):
-            ignored = context['non_registered_state']
-
-        #
-        # build the invocation
-        #
-        builder = InvocationBuilder()
-        builder.with_target("org.foo", "bar", "0")
-
-        seen = SeenCount()
-        seen.seen = 100
-        builder.with_state("seen", seen)
-
-        builder.with_invocation(Any(), None)
-
-        #
-        # assert error is raised on invoke
-        #
-        with self.assertRaises(StateRegistrationError):
-            round_trip(functions, builder)
-
-
-class AsyncRequestReplyTestCase(unittest.TestCase):
-
-    def test_integration(self):
-        functions = StatefulFunctions()
-
-        @functions.bind('org.foo/greeter')
-        async def fun(context, message):
-            any = Any()
-            any.type_url = 'type.googleapis.com/k8s.demo.SeenCount'
-            context.send("bar.baz/foo", "12345", any)
-
-        #
-        # build the invocation
-        #
-        builder = InvocationBuilder()
-        builder.with_target("org.foo", "greeter", "0")
-
-        arg = LoginEvent()
-        arg.user_name = "user-1"
-        builder.with_invocation(arg, ("org.foo", "greeter-java", "0"))
-
-        #
-        # invoke
-        #
-        result_json = async_round_trip(functions, builder)
-
-        # assert outgoing message
-        second_out_message = json_at(result_json, NTH_OUTGOING_MESSAGE(0))
-        self.assertEqual(second_out_message['target']['namespace'], 'bar.baz')
-        self.assertEqual(second_out_message['target']['type'], 'foo')
-        self.assertEqual(second_out_message['target']['id'], '12345')
-        self.assertEqual(second_out_message['argument']['typename'], 'type.googleapis.com/k8s.demo.SeenCount')
-
-    def test_integration_incomplete_context(self):
-        functions = StatefulFunctions()
-
-        @functions.bind(
-            typename='org.foo/bar',
-            states=[
-                StateSpec('seen'),
-                StateSpec('missing_state_1'),
-                StateSpec('missing_state_2', expire_after=AfterWrite(timedelta(milliseconds=2000)))
-            ])
-        async def fun(context, message):
-            pass
-
-        #
-        # build an invocation that provides only 'seen' state
-        #
-        builder = InvocationBuilder()
-        builder.with_target("org.foo", "bar", "0")
-
-        seen = SeenCount()
-        seen.seen = 100
-        builder.with_state("seen", seen)
-
-        builder.with_invocation(Any(), None)
-
-        #
-        # invoke
-        #
-        result_json = async_round_trip(functions, builder)
-
-        #
-        # assert indicated missing states
-        #
-        missing_state_1_spec = json_at(result_json, NTH_MISSING_STATE_SPEC(0))
-        self.assertEqual(missing_state_1_spec['state_name'], 'missing_state_1')
-
-        missing_state_2_spec = json_at(result_json, NTH_MISSING_STATE_SPEC(1))
-        self.assertEqual(missing_state_2_spec['state_name'], 'missing_state_2')
-        self.assertEqual(missing_state_2_spec['expiration_spec']['mode'], 'AFTER_WRITE')
-        self.assertEqual(missing_state_2_spec['expiration_spec']['expire_after_millis'], '2000')
-
-    def test_integration_access_non_registered_state(self):
-        functions = StatefulFunctions()
-
-        @functions.bind(
-            typename='org.foo/bar',
-            states=[StateSpec('seen')])
-        async def fun(context, message):
-            ignored = context['non_registered_state']
-
-        #
-        # build the invocation
-        #
-        builder = InvocationBuilder()
-        builder.with_target("org.foo", "bar", "0")
-
-        seen = SeenCount()
-        seen.seen = 100
-        builder.with_state("seen", seen)
-
-        builder.with_invocation(Any(), None)
-
-        #
-        # assert error is raised on invoke
-        #
-        with self.assertRaises(StateRegistrationError):
-            async_round_trip(functions, builder)
diff --git a/statefun-python-sdk/tests/statefun_test.py b/statefun-python-sdk/tests/statefun_test.py
index ad3b436..9bf444a 100644
--- a/statefun-python-sdk/tests/statefun_test.py
+++ b/statefun-python-sdk/tests/statefun_test.py
@@ -17,12 +17,10 @@
 ################################################################################
 
 import unittest
+from statefun import StatefulFunctions, ValueSpec, IntType, StringType
 
-from google.protobuf.any_pb2 import Any
 
-from statefun.core import StatefulFunctions, StatefulFunction, StateSpec, StateRegistrationError
-from tests.examples_pb2 import LoginEvent
-
+# noinspection PyUnusedLocal
 class StatefulFunctionsTestCase(unittest.TestCase):
 
     def test_example(self):
@@ -30,56 +28,66 @@
 
         @functions.bind(
             typename="org.foo/greeter",
-            states=[StateSpec('seen_count')])
+            specs=[ValueSpec(name='seen_count', type=IntType)])
         def greeter(context, message):
             pass
 
-        @functions.bind("org.foo/echo")
-        def echo(context, message):
+        fun = functions.for_typename("org.foo/greeter")
+        self.assertFalse(fun.is_async)
+        self.assertIsNotNone(fun.storage_spec)
+
+    def test_async(self):
+        functions = StatefulFunctions()
+
+        @functions.bind(
+            typename="org.foo/greeter",
+            specs=[ValueSpec(name='seen_count', type=IntType)])
+        async def greeter(context, message):
             pass
 
-        self.assertIn(("org.foo", "greeter"), functions.functions)
-        self.assertIn(("org.foo", "echo"), functions.functions)
+        fun = functions.for_typename("org.foo/greeter")
+        self.assertTrue(fun.is_async)
+        self.assertIsNotNone(fun.storage_spec)
 
-        registered_state_specs = functions.functions[("org.foo", "greeter")].registered_state_specs
-        self.assertIs(len(registered_state_specs), 1)
-        self.assertIn("seen_count", registered_state_specs)
+    def test_state_spec(self):
+        functions = StatefulFunctions()
+
+        foo = ValueSpec(name='foo', type=IntType)
+        bar = ValueSpec(name='bar', type=StringType)
+
+        @functions.bind(typename="org.foo/greeter", specs=[foo, bar])
+        def greeter(context, message):
+            pass
+
+        fun = functions.for_typename("org.foo/greeter")
+        self.assertListEqual(fun.storage_spec.specs, [foo, bar])
+
+    def test_stateless(self):
+        functions = StatefulFunctions()
+
+        @functions.bind(typename="org.foo/greeter")
+        def greeter(context, message):
+            pass
+
+        fun = functions.for_typename("org.foo/greeter")
+        self.assertListEqual(fun.storage_spec.specs, [])
 
     def test_duplicate_state(self):
         functions = StatefulFunctions()
 
-        with self.assertRaises(StateRegistrationError):
+        with self.assertRaises(ValueError):
             @functions.bind(
                 typename="org.foo/greeter",
-                states=[StateSpec("bar"), StateSpec("bar")])
+                specs=[ValueSpec(name="bar", type=IntType), ValueSpec(name="bar", type=IntType)])
             def foo(context, message):
                 pass
 
-    def test_type_deduction(self):
+    def test_wrong_signature(self):
         functions = StatefulFunctions()
 
-        @functions.bind("org.foo/greeter")
-        def greeter(context, message: int):
-            pass
-
-        x: StatefulFunction = functions.functions[("org.foo", "greeter")]
-        self.assertEqual(x.known_messages, [int])
-
-    def test_unpacking(self):
-        functions = StatefulFunctions()
-
-        @functions.bind("org.foo/greeter")
-        def greeter(context, message: LoginEvent):
-            pass
-
-        greeter_fn = functions.functions[("org.foo", "greeter")]
-
-        # pack the function argument as an Any
-        argument = LoginEvent()
-        any_argument = Any()
-        any_argument.Pack(argument)
-
-        # unpack Any automatically
-        unpacked_argument = greeter_fn.unpack_any(any_argument)
-
-        self.assertEqual(argument, unpacked_argument)
+        with self.assertRaises(ValueError):
+            @functions.bind(
+                typename="org.foo/greeter",
+                specs=[ValueSpec(name="bar", type=IntType)])
+            def foo(message):  # missing context
+                pass
diff --git a/statefun-python-sdk/tests/storage_test.py b/statefun-python-sdk/tests/storage_test.py
new file mode 100644
index 0000000..2e99cd2
--- /dev/null
+++ b/statefun-python-sdk/tests/storage_test.py
@@ -0,0 +1,161 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+from statefun import IntType, StringType
+from statefun.storage import *
+from statefun.utils import to_typed_value
+
+
+class PbPersistedValueLike:
+
+    def __init__(self, name, val, tpe):
+        self.state_name = name
+        self.state_value = to_typed_value(tpe, val)
+
+
+class StorageTestCase(unittest.TestCase):
+
+    def test_example(self):
+        specs = [ValueSpec(name="a", type=IntType), ValueSpec(name="b", type=StringType)]
+        storage_spec = make_address_storage_spec(specs)
+
+        # simulate values received from the StateFun cluster
+        values = [PbPersistedValueLike("a", 1, IntType), PbPersistedValueLike("b", "hello", StringType)]
+
+        # resolve spec and values
+        resolution = resolve(storage_spec, values)
+        store = resolution.storage
+
+        self.assertEqual(store.a, 1)
+        self.assertEqual(store.b, "hello")
+
+    def test_failed_resolution(self):
+        specs = [ValueSpec(name="a", type=IntType), ValueSpec(name="b", type=StringType)]
+        storage_spec = make_address_storage_spec(specs)
+
+        # simulate values received from the StateFun cluster
+        values = []
+
+        # resolve spec and values
+        resolution = resolve(storage_spec, values)
+        self.assertListEqual(resolution.missing_specs, specs)
+
+    def test_partial_failed_resolution(self):
+        specs = [ValueSpec(name="a", type=IntType), ValueSpec(name="b", type=StringType)]
+        storage_spec = make_address_storage_spec(specs)
+
+        # simulate values received from the StateFun cluster
+        values = [PbPersistedValueLike("a", 1, IntType)]
+
+        # resolve spec and values
+        resolution = resolve(storage_spec, values)
+        self.assertListEqual(resolution.missing_specs, specs[1:])
+
+    def test_ignore_unknown(self):
+        specs = [ValueSpec(name="a", type=IntType)]
+        storage_spec = make_address_storage_spec(specs)
+
+        # simulate values received from the StateFun cluster
+        values = [PbPersistedValueLike("a", 1, IntType), PbPersistedValueLike("b", "hello", StringType)]
+
+        # resolve spec and values
+        resolution = resolve(storage_spec, values)
+        store = resolution.storage
+
+        self.assertEqual(store.a, 1)
+        with self.assertRaises(AttributeError):
+            print(store.b)
+
+    def test_attribute_manipulation(self):
+        store = store_from(ValueSpec("a", IntType),
+                           PbPersistedValueLike("a", 1, IntType))
+
+        store.a += 1
+        self.assertEqual(store.a, 2)
+
+        store.a = 1337
+        self.assertEqual(store.a, 1337)
+
+        del store.a
+        self.assertIsNone(store.a)
+
+        store.a = 0
+        self.assertEqual(store.a, 0)
+
+    def test_no_modifications(self):
+        store = store_from(ValueSpec("a", IntType),
+                           PbPersistedValueLike("a", 1, IntType))
+
+        # noinspection PyUnusedLocal
+        unused_a = store.a
+
+        cell = store._cells["a"]
+        self.assertFalse(cell.dirty)
+
+    def test_modification(self):
+        store = store_from(ValueSpec("a", IntType),
+                           PbPersistedValueLike("a", 1, IntType))
+
+        cell = store._cells["a"]
+
+        store.a = 23249425
+        self.assertTrue(cell.dirty)
+        self.assertTrue(cell.typed_value.has_value)
+
+        modified_a = IntType.serializer().deserialize(cell.typed_value.value)
+        self.assertEqual(modified_a, 23249425)
+
+    def test_deletion(self):
+        store = store_from(ValueSpec("a", IntType),
+                           PbPersistedValueLike("a", 1, IntType))
+
+        cell = store._cells["a"]
+
+        del store.a
+        self.assertTrue(cell.dirty)
+        self.assertIsNone(cell.typed_value)
+
+    def test_missing_val(self):
+        store = store_from(ValueSpec("a", IntType), PbPersistedValueLike("a", None, IntType))
+        self.assertIsNone(store.a)
+
+        store.a = 121314
+
+        cell = store._cells["a"]
+        self.assertTrue(cell.dirty)
+        self.assertIsNotNone(cell.typed_value)
+        self.assertTrue(cell.typed_value.has_value)
+
+    def test_stateless(self):
+        store = store_from()
+        self.assertTrue(len(store._cells) == 0)
+
+
+def store_from(*args):
+    """test helper that creates an already resolved store from specs and pb values."""
+    specs = []
+    vals = []
+    for arg in args:
+        if isinstance(arg, ValueSpec):
+            specs.append(arg)
+        else:
+            vals.append(arg)
+    storage_spec = make_address_storage_spec(specs)
+    resolution = resolve(storage_spec, vals)
+    return resolution.storage
diff --git a/statefun-python-sdk/tests/type_deduction_test.py b/statefun-python-sdk/tests/type_deduction_test.py
deleted file mode 100644
index f0572db..0000000
--- a/statefun-python-sdk/tests/type_deduction_test.py
+++ /dev/null
@@ -1,57 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import unittest
-import typing
-
-from statefun.core import deduce_protobuf_types
-
-
-class TypeDeductionTestCase(unittest.TestCase):
-
-    def test_simple_annotation(self):
-        def foo(context, message: int):
-            pass
-
-        types = deduce_protobuf_types(foo)
-
-        self.assertEqual(types, [int])
-
-    def test_simple_union(self):
-        def foo(context, message: typing.Union[int]):
-            pass
-
-        types = deduce_protobuf_types(foo)
-
-        self.assertEqual(types, [int])
-
-    def test_union_annotations(self):
-        def foo(context, message: typing.Union[int, str]):
-            pass
-
-        types = deduce_protobuf_types(foo)
-
-        self.assertEqual(types, [int, str])
-
-    def test_no_annotations(self):
-        def foo(context, message):
-            pass
-
-        types = deduce_protobuf_types(foo)
-
-        self.assertTrue(types is None)
diff --git a/statefun-python-sdk/tests/types_test.py b/statefun-python-sdk/tests/types_test.py
new file mode 100644
index 0000000..b1f150b
--- /dev/null
+++ b/statefun-python-sdk/tests/types_test.py
@@ -0,0 +1,53 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+import statefun
+from statefun.utils import to_typed_value
+
+
+class TypeNameTestCase(unittest.TestCase):
+
+    def assertRoundTrip(self, tpe, value):
+        serializer = tpe.serializer()
+        out = serializer.serialize(value)
+        got = serializer.deserialize(out)
+        self.assertEqual(got, value)
+
+    def test_built_ins(self):
+        self.assertRoundTrip(statefun.BoolType, True)
+        self.assertRoundTrip(statefun.IntType, 0)
+        self.assertRoundTrip(statefun.FloatType, float(0.5))
+        self.assertRoundTrip(statefun.DoubleType, 1e-20)
+        self.assertRoundTrip(statefun.LongType, 1 << 45)
+        self.assertRoundTrip(statefun.StringType, "hello world")
+
+    def test_json_type(self):
+        import json
+        tpe = statefun.simple_type(typename="org.foo.bar/UserJson",
+                                   serialize_fn=json.dumps,
+                                   deserialize_fn=json.loads)
+
+        self.assertRoundTrip(tpe, {"name": "bob", "last": "mop"})
+
+    def test_message(self):
+        typed_value = to_typed_value(statefun.StringType, "hello world")
+        msg = statefun.Message(target_typename="foo/bar", target_id="1", typed_value=typed_value)
+
+        self.assertTrue(msg.is_string())
+        self.assertEqual(msg.as_string(), "hello world")
diff --git a/statefun-python-sdk/tests/valuespec_test.py b/statefun-python-sdk/tests/valuespec_test.py
new file mode 100644
index 0000000..912e735
--- /dev/null
+++ b/statefun-python-sdk/tests/valuespec_test.py
@@ -0,0 +1,52 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+from statefun import IntType
+from statefun.storage import *
+from datetime import timedelta
+
+
+class ValueSpecTestCase(unittest.TestCase):
+
+    def test_example(self):
+        a = ValueSpec(name="a", type=IntType)
+        self.assertEqual(a.name, "a")
+        self.assertEqual(a.type, IntType)
+        self.assertFalse(a.after_write)
+        self.assertFalse(a.after_call)
+
+    def test_expire_after_access(self):
+        a = ValueSpec(name="a", type=IntType, expire_after_call=timedelta(seconds=1))
+        self.assertTrue(a.after_call)
+        self.assertEqual(a.duration, 1000)
+
+        self.assertFalse(a.after_write)
+
+    def test_expire_after_write(self):
+        a = ValueSpec(name="a", type=IntType, expire_after_write=timedelta(seconds=1))
+        self.assertTrue(a.after_write)
+        self.assertEqual(a.duration, 1000)
+
+        self.assertFalse(a.after_call)
+
+    def test_illegal_name(self):
+        with self.assertRaises(ValueError):
+            ValueSpec(name="-a", type=IntType, expire_after_call=timedelta(1))
+        with self.assertRaises(ValueError):
+            ValueSpec(name="def", type=IntType, expire_after_call=timedelta(1))