[FLINK-18518] Add an async handler integration test
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index 2a2a956..af28969 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
+import asyncio
import unittest
from datetime import timedelta
@@ -24,7 +24,7 @@
from tests.examples_pb2 import LoginEvent, SeenCount
from statefun.request_reply_pb2 import ToFunction, FromFunction
-from statefun import RequestReplyHandler
+from statefun import RequestReplyHandler, AsyncRequestReplyHandler
from statefun.core import StatefulFunctions, kafka_egress_record
from statefun.core import StatefulFunctions, kinesis_egress_record
@@ -75,6 +75,20 @@
return MessageToDict(f, preserving_proto_field_name=True)
+def async_round_trip(typename, fn, to: InvocationBuilder) -> dict:
+ functions = StatefulFunctions()
+ functions.register(typename, fn)
+ handler = AsyncRequestReplyHandler(functions)
+
+ in_bytes = to.SerializeToString()
+ future = handler(in_bytes)
+ out_bytes = asyncio.get_event_loop().run_until_complete(future)
+
+ f = FromFunction()
+ f.ParseFromString(out_bytes)
+ return MessageToDict(f, preserving_proto_field_name=True)
+
+
def json_at(nested_structure: dict, path):
try:
for next in path:
@@ -192,3 +206,38 @@
self.assertEqual(first_egress['egress_namespace'], 'foo.bar.baz')
self.assertEqual(first_egress['egress_type'], 'my-egress')
self.assertEqual(first_egress['argument']['@type'], 'type.googleapis.com/k8s.demo.SeenCount')
+
+
+class AsyncRequestReplyTestCase(unittest.TestCase):
+
+ def test_integration(self):
+ async def fun(context, message):
+ any = Any()
+ any.type_url = 'type.googleapis.com/k8s.demo.SeenCount'
+ context.send("bar.baz/foo", "12345", any)
+
+ #
+ # build the invocation
+ #
+ builder = InvocationBuilder()
+ builder.with_target("org.foo", "greeter", "0")
+
+ seen = SeenCount()
+ seen.seen = 100
+ builder.with_state("seen", seen)
+
+ arg = LoginEvent()
+ arg.user_name = "user-1"
+ builder.with_invocation(arg, ("org.foo", "greeter-java", "0"))
+
+ #
+ # invoke
+ #
+ result_json = async_round_trip("org.foo/greeter", fun, builder)
+
+ # assert outgoing message
+ second_out_message = json_at(result_json, NTH_OUTGOING_MESSAGE(0))
+ self.assertEqual(second_out_message['target']['namespace'], 'bar.baz')
+ self.assertEqual(second_out_message['target']['type'], 'foo')
+ self.assertEqual(second_out_message['target']['id'], '12345')
+ self.assertEqual(second_out_message['argument']['@type'], 'type.googleapis.com/k8s.demo.SeenCount')