chore(client) add strong response value check in callback
diff --git a/rocketmq/client.py b/rocketmq/client.py
index 81dc493..daaa6b7 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -359,7 +359,11 @@
def _on_check(producer, cmsg, user_args):
py_message = RecvMessage(cmsg)
- return checker_callback(py_message)
+ check_result = checker_callback(py_message)
+ if check_result != TransactionStatus.UNKNOWN and check_result != TransactionStatus.COMMIT \
+ and check_result != TransactionStatus.ROLLBACK:
+ raise ValueError('Check transaction status error, please use TransactionStatus as response')
+ return check_result
transaction_checker_callback = TRANSACTION_CHECK_CALLBACK(_on_check)
self._callback_refs.append(transaction_checker_callback)
@@ -391,7 +395,11 @@
def _on_local_execute(producer, cmsg, usr_args):
py_message = RecvMessage(cmsg)
- return local_execute(py_message, usr_args)
+ local_result = local_execute(py_message, usr_args)
+ if local_result != TransactionStatus.UNKNOWN and local_result != TransactionStatus.COMMIT \
+ and local_result != TransactionStatus.ROLLBACK:
+ raise ValueError('Local transaction status error, please use TransactionStatus as response')
+ return local_result
local_execute_callback = LOCAL_TRANSACTION_EXECUTE_CALLBACK(_on_local_execute)
self._callback_refs.append(local_execute_callback)
@@ -463,7 +471,10 @@
def _on_message(consumer, msg):
exc = None
try:
- return callback(RecvMessage(msg))
+ consume_result = callback(RecvMessage(msg))
+ if consume_result != ConsumeStatus.CONSUME_SUCCESS and consume_result != ConsumeStatus.RECONSUME_LATER:
+ raise ValueError('Consume status error, please use ConsumeStatus as response')
+ return consume_result
except BaseException as e:
exc = e
return ConsumeStatus.RECONSUME_LATER
diff --git a/samples/producer.py b/samples/producer.py
index 87fce45..83bf976 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -43,6 +43,7 @@
print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id + ' offset: ' + str(ret.offset))
print ('send sync message done')
producer.shutdown()
+ producer.destroy()
def send_orderly_with_sharding_key(count):
@@ -55,6 +56,7 @@
print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
print ('send sync message done')
producer.shutdown()
+ producer.destroy()
def check_callback(msg):
@@ -68,18 +70,19 @@
def send_transaction_message(count):
- transactionMQProducer = TransactionMQProducer(gid, check_callback)
- transactionMQProducer.set_namesrv_addr(name_srv)
- transactionMQProducer.start()
+ producer = TransactionMQProducer(gid, check_callback)
+ producer.set_namesrv_addr(name_srv)
+ producer.start()
for n in range(count):
msg = create_message()
- ret = transactionMQProducer.send_message_in_transaction(msg, local_execute, None)
+ ret = producer.send_message_in_transaction(msg, local_execute, None)
print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
print ('send transaction message done')
while True:
time.sleep(3600)
producer.shutdown()
+ producer.destroy()
if __name__ == '__main__':
diff --git a/tests/conftest.py b/tests/conftest.py
index 164b205..9895f05 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -27,6 +27,7 @@
prod.start()
yield prod
prod.shutdown()
+ prod.destroy()
@pytest.fixture(scope='function')
@@ -35,3 +36,4 @@
consumer.set_namesrv_addr('127.0.0.1:9876')
yield consumer
consumer.shutdown()
+ consumer.destroy()