complete stream rpc function & rpc_client_mock (#528)
* stream rpc functions
* use log
diff --git a/python/rocketmq/rpc_client.py b/python/rocketmq/rpc_client.py
index 420559c..545419c 100644
--- a/python/rocketmq/rpc_client.py
+++ b/python/rocketmq/rpc_client.py
@@ -67,6 +67,20 @@
):
return await self.__stub.SendMessage(request, timeout=timeout_seconds)
+ async def receive_message(
+ self, request: service_pb2.ReceiveMessageRequest, timeout_seconds: int
+ ):
+ results = self.__stub.ReceiveMessage(request, timeout=timeout_seconds)
+ response = []
+ try:
+ async for result in results:
+ if result.HasField('message'):
+ response.append(result.message)
+ except Exception as e:
+ logger.info("An error occurred: %s", e)
+ # Handle error as appropriate for your use case
+ return response
+
async def query_assignment(
self, request: service_pb2.QueryAssignmentRequest, timeout_seconds: int
):
@@ -105,6 +119,26 @@
request, timeout=timeout_seconds
)
+ async def send_requests(self, requests, stream):
+ for request in requests:
+ await stream.send_message(request)
+
+ async def telemetry(
+ self, timeout_seconds: int, requests
+ ):
+ responses = []
+ async with self.__stub.Telemetry() as stream:
+ # Create a task for sending requests
+ send_task = asyncio.create_task(self.send_requests(requests, stream))
+ # Receiving responses
+ async for response in stream:
+ responses.append(response)
+
+ # Await the send task to ensure all requests have been sent
+ await send_task
+
+ return responses
+
async def test():
client = RpcClient("rmq-cn-72u353icd01.cn-hangzhou.rmq.aliyuncs.com:8080")