style(client) polish code style
style(client) polish code style
diff --git a/.bumpversion.cfg b/.bumpversion.cfg
index daf3a98..8ce077d 100644
--- a/.bumpversion.cfg
+++ b/.bumpversion.cfg
@@ -18,5 +18,5 @@
files = setup.py
commit = True
tag = True
-current_version = 0.4.3
+current_version = 0.5.0-rc3
diff --git a/MANIFEST.in b/MANIFEST.in
index a82d516..67afa4c 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,3 +1,3 @@
-include README.md rocketmq/librocketmq.dylib rocketmq/librocketmq.so
+include README.md rocketmq/librocketmq.dylib rocketmq/librocketmq_client_core.dylib rocketmq/librocketmq.so rocketmq/librocketmq_client_core.so
recursive-exclude * .DS_Store
diff --git a/README.md b/README.md
index 033feca..15d2c2f 100644
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@
from rocketmq.client import Producer, Message
producer = Producer('PID-XXX')
-producer.set_namesrv_addr('127.0.0.1:9876')
+producer.set_name_server_address('127.0.0.1:9876')
producer.start()
msg = Message('YOUR-TOPIC')
@@ -45,7 +45,7 @@
consumer = PushConsumer('CID_XXX')
-consumer.set_namesrv_addr('127.0.0.1:9876')
+consumer.set_name_server_address('127.0.0.1:9876')
consumer.subscribe('YOUR-TOPIC', callback)
consumer.start()
diff --git a/manylinux.sh b/manylinux.sh
index 5bfac7f..0328103 100755
--- a/manylinux.sh
+++ b/manylinux.sh
@@ -23,6 +23,6 @@
# Audit wheels
for wheel in dist/*-linux_*.whl; do
- auditwheel repair $wheel -w dist/
+ auditwheel -v repair $wheel -w dist/
rm $wheel
done
diff --git a/rocketmq/client.py b/rocketmq/client.py
index 9defaf2..7d2ce0c 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -18,21 +18,19 @@
# under the License.
import sys
import ctypes
-from ctypes import c_void_p, c_int, POINTER
from enum import IntEnum
from collections import namedtuple
from .ffi import (
- dll, _CSendResult, MSG_CALLBACK_FUNC, MessageModel, QUEUE_SELECTOR_CALLBACK, TRANSACTION_CHECK_CALLBACK,
+ dll, _CSendResult, MSG_CALLBACK_FUNC, MessageModel, TRANSACTION_CHECK_CALLBACK,
LOCAL_TRANSACTION_EXECUTE_CALLBACK
)
from .exceptions import (
- ffi_check, PushConsumerStartFailed, ProducerSendAsyncFailed,
- NullPointerException,
+ ffi_check, NullPointerException,
)
from .consts import MessageProperty
-__all__ = ['SendStatus', 'Message', 'RecvMessage', 'Producer', 'PushConsumer', 'TransactionMQProducer',
+__all__ = ['SendStatus', 'Message', 'ReceivedMessage', 'Producer', 'PushConsumer', 'TransactionMQProducer',
'TransactionStatus', 'ConsumeStatus']
PY2 = sys.version_info[0] == 2
@@ -74,10 +72,6 @@
def __init__(self, topic):
self._handle = dll.CreateMessage(_to_bytes(topic))
- def destroy(self):
- if self._handle is not None:
- ffi_check(dll.DestroyMessage(self._handle))
-
def set_keys(self, keys):
ffi_check(dll.SetMessageKeys(self._handle, _to_bytes(keys)))
@@ -103,10 +97,10 @@
return val.decode('utf-8')
elif isinstance(val, text_type):
return val
- raise TypeError('Expects string types, got %s', type(val))
+ raise TypeError('Expects string types, but got %s', type(val))
-class RecvMessage(object):
+class ReceivedMessage(object):
def __init__(self, handle):
self._handle = handle
@@ -182,7 +176,7 @@
return self.body
def __repr__(self):
- return '<RecvMessage topic={} id={} body={}>'.format(
+ return '<ReceivedMessage topic={} id={} body={}>'.format(
repr(self.topic),
repr(self.id),
repr(self.body),
@@ -196,7 +190,7 @@
else:
self._handle = dll.CreateProducer(_to_bytes(group_id))
if self._handle is None:
- raise NullPointerException('CreateProducer returned null pointer')
+ raise NullPointerException('Returned null pointer when create Producer')
if timeout is not None:
self.set_timeout(timeout)
if compress_level is not None:
@@ -208,41 +202,38 @@
def __enter__(self):
self.start()
- def __exit__(self, type, value, traceback):
+ def __exit__(self, exec_type, value, traceback):
self.shutdown()
- def destroy(self):
- if self._handle is not None:
- ffi_check(dll.DestroyProducer(self._handle))
-
def send_sync(self, msg):
- cres = _CSendResult()
- ffi_check(dll.SendMessageSync(self._handle, msg, ctypes.pointer(cres)))
+ c_result = _CSendResult()
+ ffi_check(dll.SendMessageSync(self._handle, msg, ctypes.pointer(c_result)))
return SendResult(
- SendStatus(cres.sendStatus),
- cres.msgId.decode('utf-8'),
- cres.offset
+ SendStatus(c_result.sendStatus),
+ c_result.msgId.decode('utf-8'),
+ c_result.offset
)
def send_oneway(self, msg):
ffi_check(dll.SendMessageOneway(self._handle, msg))
def send_orderly_with_sharding_key(self, msg, sharding_key):
- cres = _CSendResult()
- ffi_check(dll.SendMessageOrderlyByShardingKey(self._handle, msg, _to_bytes(sharding_key), ctypes.pointer(cres)))
+ c_result = _CSendResult()
+ ffi_check(
+ dll.SendMessageOrderlyByShardingKey(self._handle, msg, _to_bytes(sharding_key), ctypes.pointer(c_result)))
return SendResult(
- SendStatus(cres.sendStatus),
- cres.msgId.decode('utf-8'),
- cres.offset
+ SendStatus(c_result.sendStatus),
+ c_result.msgId.decode('utf-8'),
+ c_result.offset
)
def set_group(self, group_name):
ffi_check(dll.SetProducerGroupName(self._handle, _to_bytes(group_name)))
- def set_namesrv_addr(self, addr):
+ def set_name_server_address(self, addr):
ffi_check(dll.SetProducerNameServerAddress(self._handle, _to_bytes(addr)))
- def set_namesrv_domain(self, domain):
+ def set_name_server_domain(self, domain):
ffi_check(dll.SetProducerNameServerDomain(self._handle, _to_bytes(domain)))
def set_session_credentials(self, access_key, access_secret, channel):
@@ -272,16 +263,18 @@
class TransactionMQProducer(Producer):
def __init__(self, group_id, checker_callback, user_args=None, timeout=None, compress_level=None,
max_message_size=None):
+ super(TransactionMQProducer, self).__init__(group_id, timeout, compress_level, max_message_size)
self._callback_refs = []
- def _on_check(producer, cmsg, user_args):
+ def _on_check(producer, c_message, user_data):
exc = None
try:
- py_message = RecvMessage(cmsg)
+ py_message = ReceivedMessage(c_message)
check_result = checker_callback(py_message)
if check_result != TransactionStatus.UNKNOWN and check_result != TransactionStatus.COMMIT \
and check_result != TransactionStatus.ROLLBACK:
- raise ValueError('Check transaction status error, please use TransactionStatus as response')
+ raise ValueError(
+ 'Check transaction status error, please use enum \'TransactionStatus\' as response')
return check_result
except BaseException as e:
exc = e
@@ -289,14 +282,13 @@
finally:
if exc:
raise exc
- return ConsumeStatus.UNKNOWN
transaction_checker_callback = TRANSACTION_CHECK_CALLBACK(_on_check)
self._callback_refs.append(transaction_checker_callback)
self._handle = dll.CreateTransactionProducer(_to_bytes(group_id), transaction_checker_callback, user_args)
if self._handle is None:
- raise NullPointerException('Create TransactionProducer returned null pointer')
+ raise NullPointerException('Returned null pointer when create transaction producer')
if timeout is not None:
self.set_timeout(timeout)
if compress_level is not None:
@@ -307,26 +299,26 @@
def __enter__(self):
self.start()
- def __exit__(self, type, value, traceback):
+ def __exit__(self, exec_type, value, traceback):
self.shutdown()
- def destroy(self):
- if self._handle is not None:
- ffi_check(dll.DestroyProducer(self._handle))
+ def set_name_server_address(self, addr):
+ ffi_check(dll.SetProducerNameServerAddress(self._handle, _to_bytes(addr)))
def start(self):
ffi_check(dll.StartProducer(self._handle))
def send_message_in_transaction(self, message, local_execute, user_args=None):
- def _on_local_execute(producer, cmsg, usr_args):
+ def _on_local_execute(producer, c_message, usr_args):
exc = None
try:
- py_message = RecvMessage(cmsg)
+ py_message = ReceivedMessage(c_message)
local_result = local_execute(py_message, usr_args)
if local_result != TransactionStatus.UNKNOWN and local_result != TransactionStatus.COMMIT \
and local_result != TransactionStatus.ROLLBACK:
- raise ValueError('Local transaction status error, please use TransactionStatus as response')
+ raise ValueError(
+ 'Local transaction status error, please use enum \'TransactionStatus\' as response')
return local_result
except BaseException as e:
exc = e
@@ -334,7 +326,6 @@
finally:
if exc:
raise exc
- return ConsumeStatus.UNKNOWN
local_execute_callback = LOCAL_TRANSACTION_EXECUTE_CALLBACK(_on_local_execute)
self._callback_refs.append(local_execute_callback)
@@ -361,7 +352,7 @@
def __init__(self, group_id, orderly=False, message_model=MessageModel.CLUSTERING):
self._handle = dll.CreatePushConsumer(_to_bytes(group_id))
if self._handle is None:
- raise NullPointerException('CreatePushConsumer returned null pointer')
+ raise NullPointerException('Returned null pointer when create PushConsumer')
self._orderly = orderly
self.set_message_model(message_model)
self._callback_refs = []
@@ -369,13 +360,9 @@
def __enter__(self):
self.start()
- def __exit__(self, type, value, traceback):
+ def __exit__(self, exec_type, value, traceback):
self.shutdown()
- def destroy(self):
- if self._handle is not None:
- ffi_check(dll.DestroyPushConsumer(self._handle))
-
def set_message_model(self, model):
ffi_check(dll.SetPushConsumerMessageModel(self._handle, model))
@@ -388,10 +375,10 @@
def set_group(self, group_id):
ffi_check(dll.SetPushConsumerGroupID(self._handle, _to_bytes(group_id)))
- def set_namesrv_addr(self, addr):
+ def set_name_server_address(self, addr):
ffi_check(dll.SetPushConsumerNameServerAddress(self._handle, _to_bytes(addr)))
- def set_namesrv_domain(self, domain):
+ def set_name_server_domain(self, domain):
ffi_check(dll.SetPushConsumerNameServerDomain(self._handle, _to_bytes(domain)))
def set_session_credentials(self, access_key, access_secret, channel):
@@ -406,9 +393,9 @@
def _on_message(consumer, msg):
exc = None
try:
- consume_result = callback(RecvMessage(msg))
+ consume_result = callback(ReceivedMessage(msg))
if consume_result != ConsumeStatus.CONSUME_SUCCESS and consume_result != ConsumeStatus.RECONSUME_LATER:
- raise ValueError('Consume status error, please use ConsumeStatus as response')
+ raise ValueError('Consume status error, please use enum \'ConsumeStatus\' as response')
return consume_result
except BaseException as e:
exc = e
@@ -417,8 +404,6 @@
if exc:
raise exc
- return ConsumeStatus.CONSUME_SUCCESS
-
ffi_check(dll.Subscribe(self._handle, _to_bytes(topic), _to_bytes(expression)))
self._register_callback(_on_message)
diff --git a/samples/consumer.py b/samples/consumer.py
index 5e6c0eb..b95da79 100644
--- a/samples/consumer.py
+++ b/samples/consumer.py
@@ -21,13 +21,13 @@
import time
def callback(msg):
- print(msg.id, msg.body)
+ print(msg.id, msg.body, msg.get_property('property'))
return ConsumeStatus.CONSUME_SUCCESS
def start_consume_message():
consumer = PushConsumer('consumer_group')
- consumer.set_namesrv_addr('127.0.0.1:9876')
- consumer.subscribe(topic, callback)
+ consumer.set_name_server_address('127.0.0.1:9876')
+ consumer.subscribe('TopicTest', callback)
print ('start consume message')
consumer.start()
diff --git a/samples/producer.py b/samples/producer.py
index fda2e9b..fb90b7b 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -29,13 +29,14 @@
msg = Message(topic)
msg.set_keys('XXX')
msg.set_tags('XXX')
+ msg.set_property('property', 'test')
msg.set_body('message body')
return msg
def send_message_sync(count):
producer = Producer(gid)
- producer.set_namesrv_addr(name_srv)
+ producer.set_name_server_address(name_srv)
producer.start()
for n in range(count):
msg = create_message()
@@ -43,35 +44,33 @@
print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
print ('send sync message done')
producer.shutdown()
- producer.destroy()
def send_orderly_with_sharding_key(count):
producer = Producer(gid, True)
- producer.set_namesrv_addr(name_srv)
+ producer.set_name_server_address(name_srv)
producer.start()
for n in range(count):
msg = create_message()
ret = producer.send_orderly_with_sharding_key(msg, 'orderId')
print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
- print ('send sync message done')
+ print ('send sync order message done')
producer.shutdown()
- producer.destroy()
def check_callback(msg):
- print ('check: ' + msg.id.decode('utf-8'))
+ print ('check: ' + msg.body.decode('utf-8'))
return TransactionStatus.COMMIT
def local_execute(msg, user_args):
- print ('local: ' + msg.id.decode('utf-8'))
+ print ('local: ' + msg.body.decode('utf-8'))
return TransactionStatus.UNKNOWN
def send_transaction_message(count):
producer = TransactionMQProducer(gid, check_callback)
- producer.set_namesrv_addr(name_srv)
+ producer.set_name_server_address(name_srv)
producer.start()
for n in range(count):
msg = create_message()
@@ -81,8 +80,6 @@
while True:
time.sleep(3600)
- producer.shutdown()
- producer.destroy()
if __name__ == '__main__':
diff --git a/setup.py b/setup.py
index a1abd68..72952b2 100755
--- a/setup.py
+++ b/setup.py
@@ -64,7 +64,7 @@
setup(
name='rocketmq-client-python',
- version='0.5.0-rc1',
+ version='0.5.0-rc3',
author='apache.rocketmq',
author_email='dev@rocketmq.apache.org',
packages=find_packages(exclude=('tests', 'tests.*')),
diff --git a/tests/conftest.py b/tests/conftest.py
index 8f9e920..d649e12 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -23,22 +23,24 @@
@pytest.fixture(scope='session')
def producer():
prod = Producer('producer_group')
- prod.set_namesrv_addr('127.0.0.1:9876')
+ prod.set_name_server_address('127.0.0.1:9876')
prod.start()
yield prod
prod.shutdown()
+
@pytest.fixture(scope='session')
def orderly_producer():
prod = Producer('orderly_producer_group', True)
- prod.set_namesrv_addr('127.0.0.1:9876')
+ prod.set_name_server_address('127.0.0.1:9876')
prod.start()
yield prod
prod.shutdown()
+
@pytest.fixture(scope='function')
def push_consumer():
consumer = PushConsumer('push_consumer_group')
- consumer.set_namesrv_addr('127.0.0.1:9876')
+ consumer.set_name_server_address('127.0.0.1:9876')
yield consumer
consumer.shutdown()
diff --git a/tests/test_consumer.py b/tests/test_consumer.py
index 50dc850..e83d3c0 100644
--- a/tests/test_consumer.py
+++ b/tests/test_consumer.py
@@ -23,7 +23,6 @@
from rocketmq.client import Message, SendStatus, ConsumeStatus, PushConsumer
from rocketmq.exceptions import PushConsumerStartFailed
-from rocketmq.consts import MessageProperty
def _send_test_msg(producer):
@@ -31,13 +30,14 @@
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body('XXXX')
+ msg.set_property('property', 'test')
ret = producer.send_sync(msg)
assert ret.status == SendStatus.OK
def test_push_consumer_no_subscription_start_fail():
consumer = PushConsumer('testGroup')
- consumer.set_namesrv_addr("127.0.0.1:9876")
+ consumer.set_name_server_address("127.0.0.1:9876")
with pytest.raises(PushConsumerStartFailed):
consumer.start()
@@ -52,6 +52,7 @@
try:
assert msg.body.decode('utf-8') == 'XXXX'
assert msg.keys.decode('utf-8') == 'XXX'
+ assert msg.get_property('property').decode('utf-8') == 'test'
return ConsumeStatus.CONSUME_SUCCESS
except Exception as exc:
errors.append(exc)
diff --git a/tests/test_producer.py b/tests/test_producer.py
index c8eb964..ee71ac8 100644
--- a/tests/test_producer.py
+++ b/tests/test_producer.py
@@ -47,6 +47,7 @@
msg.set_keys('sharding_message')
msg.set_tags('sharding')
msg.set_body('sharding message')
+ msg.set_property('property', 'test')
ret = orderly_producer.send_orderly_with_sharding_key(msg, 'order1')
assert ret.status == SendStatus.OK
@@ -64,7 +65,7 @@
return TransactionStatus.COMMIT
producer = TransactionMQProducer('transactionTestGroup' + str(PY_VERSION), on_check)
- producer.set_namesrv_addr('127.0.0.1:9876')
+ producer.set_name_server_address('127.0.0.1:9876')
producer.start()
msg = Message('test')
msg.set_keys('transaction')