complete stream rpc function & rpc_client_mock (#528)

* stream rpc functions

* use log
diff --git a/python/rocketmq/rpc_client.py b/python/rocketmq/rpc_client.py
index 420559c..545419c 100644
--- a/python/rocketmq/rpc_client.py
+++ b/python/rocketmq/rpc_client.py
@@ -67,6 +67,20 @@
     ):
         return await self.__stub.SendMessage(request, timeout=timeout_seconds)
 
+    async def receive_message(
+        self, request: service_pb2.ReceiveMessageRequest, timeout_seconds: int
+    ):
+        results = self.__stub.ReceiveMessage(request, timeout=timeout_seconds)
+        response = []
+        try:
+            async for result in results:
+                if result.HasField('message'):
+                    response.append(result.message)
+        except Exception as e:
+            logger.info("An error occurred: %s", e)
+            # Handle error as appropriate for your use case
+        return response
+
     async def query_assignment(
         self, request: service_pb2.QueryAssignmentRequest, timeout_seconds: int
     ):
@@ -105,6 +119,26 @@
             request, timeout=timeout_seconds
         )
 
+    async def send_requests(self, requests, stream):
+        for request in requests:
+            await stream.send_message(request)
+
+    async def telemetry(
+        self, timeout_seconds: int, requests
+    ):
+        responses = []
+        async with self.__stub.Telemetry() as stream:
+            # Create a task for sending requests
+            send_task = asyncio.create_task(self.send_requests(requests, stream))
+            # Receiving responses
+            async for response in stream:
+                responses.append(response)
+
+            # Await the send task to ensure all requests have been sent
+            await send_task
+
+        return responses
+
 
 async def test():
     client = RpcClient("rmq-cn-72u353icd01.cn-hangzhou.rmq.aliyuncs.com:8080")