[FLINK-18518] Add an async handler integration test
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index 2a2a956..af28969 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
+import asyncio
 import unittest
 from datetime import timedelta
 
@@ -24,7 +24,7 @@
 
 from tests.examples_pb2 import LoginEvent, SeenCount
 from statefun.request_reply_pb2 import ToFunction, FromFunction
-from statefun import RequestReplyHandler
+from statefun import RequestReplyHandler, AsyncRequestReplyHandler
 from statefun.core import StatefulFunctions, kafka_egress_record
 from statefun.core import StatefulFunctions, kinesis_egress_record
 
@@ -75,6 +75,20 @@
     return MessageToDict(f, preserving_proto_field_name=True)
 
 
+def async_round_trip(typename, fn, to: InvocationBuilder) -> dict:
+    functions = StatefulFunctions()
+    functions.register(typename, fn)
+    handler = AsyncRequestReplyHandler(functions)
+
+    in_bytes = to.SerializeToString()
+    future = handler(in_bytes)
+    out_bytes = asyncio.get_event_loop().run_until_complete(future)
+
+    f = FromFunction()
+    f.ParseFromString(out_bytes)
+    return MessageToDict(f, preserving_proto_field_name=True)
+
+
 def json_at(nested_structure: dict, path):
     try:
         for next in path:
@@ -192,3 +206,38 @@
         self.assertEqual(first_egress['egress_namespace'], 'foo.bar.baz')
         self.assertEqual(first_egress['egress_type'], 'my-egress')
         self.assertEqual(first_egress['argument']['@type'], 'type.googleapis.com/k8s.demo.SeenCount')
+
+
+class AsyncRequestReplyTestCase(unittest.TestCase):
+
+    def test_integration(self):
+        async def fun(context, message):
+            any = Any()
+            any.type_url = 'type.googleapis.com/k8s.demo.SeenCount'
+            context.send("bar.baz/foo", "12345", any)
+
+        #
+        # build the invocation
+        #
+        builder = InvocationBuilder()
+        builder.with_target("org.foo", "greeter", "0")
+
+        seen = SeenCount()
+        seen.seen = 100
+        builder.with_state("seen", seen)
+
+        arg = LoginEvent()
+        arg.user_name = "user-1"
+        builder.with_invocation(arg, ("org.foo", "greeter-java", "0"))
+
+        #
+        # invoke
+        #
+        result_json = async_round_trip("org.foo/greeter", fun, builder)
+
+        # assert outgoing message
+        second_out_message = json_at(result_json, NTH_OUTGOING_MESSAGE(0))
+        self.assertEqual(second_out_message['target']['namespace'], 'bar.baz')
+        self.assertEqual(second_out_message['target']['type'], 'foo')
+        self.assertEqual(second_out_message['target']['id'], '12345')
+        self.assertEqual(second_out_message['argument']['@type'], 'type.googleapis.com/k8s.demo.SeenCount')