Remove redundant __init__.py (#554)
diff --git a/python/rocketmq/__init__.py b/python/rocketmq/__init__.py
deleted file mode 100644
index 3fc459f..0000000
--- a/python/rocketmq/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-
-logger = logging.getLogger("rocketmqlogger")
-logger.setLevel(logging.DEBUG)
-
-log_path = os.path.join(
- os.path.expanduser("~"), "logs", "rocketmq", "rocketmq-client.log"
-)
-file_handler = logging.FileHandler(log_path)
-file_handler.setLevel(logging.DEBUG)
-
-console_handler = logging.StreamHandler()
-console_handler.setLevel(logging.DEBUG)
-
-formatter = logging.Formatter(
- "%(asctime)s [%(levelname)s] [%(process)d] [%(filename)s#%(funcName)s:%(lineno)d] %(message)s"
-)
-file_handler.setFormatter(formatter)
-console_handler.setFormatter(formatter)
-
-logger.addHandler(file_handler)
-logger.addHandler(console_handler)
diff --git a/python/rocketmq/client.py b/python/rocketmq/client.py
index 4ccda2b..0ef32e6 100644
--- a/python/rocketmq/client.py
+++ b/python/rocketmq/client.py
@@ -39,7 +39,7 @@
self.sessionsLock = threading.Lock()
self.client_manager = ClientManager(self)
- async def start_up(self):
+ async def start(self):
# get topic route
for topic in self.topics:
self.topic_route_cache[topic] = await self.fetch_topic_route(topic)
diff --git a/python/rocketmq/log.py b/python/rocketmq/log.py
index a9c82e4..f3e4eae 100644
--- a/python/rocketmq/log.py
+++ b/python/rocketmq/log.py
@@ -29,8 +29,7 @@
console_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
- "%(asctime)s [%(levelname)s] [%(process)d] [%(threadName)s] [%(filename)s#\
- %(funcName)s:%(lineno)d] %(message)s"
+ "%(asctime)s [%(levelname)s] [%(process)d] [%(threadName)s] [%(filename)s#%(funcName)s:%(lineno)d] %(message)s"
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
diff --git a/python/rocketmq/producer.py b/python/rocketmq/producer.py
index c9c0d35..3d604a2 100644
--- a/python/rocketmq/producer.py
+++ b/python/rocketmq/producer.py
@@ -21,6 +21,7 @@
from rocketmq.client import Client
from rocketmq.client_config import ClientConfig
from rocketmq.definition import TopicRouteData
+from rocketmq.log import logger
from rocketmq.message_id_codec import MessageIdCodec
from rocketmq.protocol.definition_pb2 import Message as ProtoMessage
from rocketmq.protocol.definition_pb2 import Resource, SystemProperties
@@ -92,8 +93,20 @@
self.client_id, self.endpoints, None, 10, topics
)
- async def start_up(self):
- await super().start_up()
+ async def __aenter__(self):
+ await self.start()
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.shutdown()
+
+ async def start(self):
+ logger.info(f"Begin to start the rocketmq producer, client_id={self.client_id}")
+ await super().start()
+ logger.info(f"The rocketmq producer starts successfully, client_id={self.client_id}")
+
+ async def shutdown(self):
+ logger.info(f"Begin to shutdown the rocketmq producer, client_id={self.client_id}")
+ logger.info(f"Shutdown the rocketmq producer successfully, client_id={self.client_id}")
async def send_message(self, message):
req = SendMessageRequest()
@@ -107,14 +120,13 @@
async def test():
- creds = SessionCredentials("username", "password")
- creds_provider = SessionCredentialsProvider(creds)
+ credentials = SessionCredentials("username", "password")
+ credentials_provider = SessionCredentialsProvider(credentials)
client_config = ClientConfig(
endpoints=Endpoints("rmq-cn-jaj390gga04.cn-hangzhou.rmq.aliyuncs.com:8080"),
- session_credentials_provider=creds_provider,
+ session_credentials_provider=credentials_provider,
ssl_enabled=True,
)
- producer = Producer(client_config, topics={"normal_topic"})
topic = Resource()
topic.name = "normal_topic"
msg = ProtoMessage()
@@ -123,8 +135,9 @@
sysperf = SystemProperties()
sysperf.message_id = MessageIdCodec.next_message_id()
msg.system_properties.CopyFrom(sysperf)
- print(msg)
- await producer.start_up()
+ logger.info(f"{msg}")
+ producer = Producer(client_config, topics={"normal_topic"})
+ await producer.start()
result = await producer.send_message(msg)
print(result)
diff --git a/python/rocketmq/rpc_client.py b/python/rocketmq/rpc_client.py
index a737d3d..c1f129c 100644
--- a/python/rocketmq/rpc_client.py
+++ b/python/rocketmq/rpc_client.py
@@ -24,7 +24,7 @@
import certifi
from grpc import aio, ssl_channel_credentials
from protocol import service_pb2
-from rocketmq import logger
+from rocketmq.log import logger
from rocketmq.protocol import service_pb2_grpc
from rocketmq.protocol.definition_pb2 import Address as ProtoAddress
from rocketmq.protocol.definition_pb2 import \