Merge pull request #100 from tom0392/multi-threaded
[ISSUE #99]Add a function which shows how to use rocketmq in multi-threaded scen…
diff --git a/samples/producer.py b/samples/producer.py
index fb90b7b..f69534c 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -19,11 +19,12 @@
from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus
import time
+import threading
topic = 'TopicTest'
gid = 'test'
name_srv = '127.0.0.1:9876'
-
+MUTEX = threading.Lock()
def create_message():
msg = Message(topic)
@@ -46,6 +47,38 @@
producer.shutdown()
+def send_message_multi_threaded(retry_time):
+ producer = Producer(gid)
+ producer.set_name_server_address(name_srv)
+ msg = create_message()
+
+ global MUTEX
+ MUTEX.acquire()
+ try:
+ producer.start()
+ except Exception as e:
+ print('ProducerStartFailed:', e)
+ MUTEX.release()
+ return
+
+ try:
+ for i in range(retry_time):
+ ret = producer.send_sync(msg)
+ if ret.status == 0:
+ print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
+ break
+ else:
+ print('send message to MQ failed.')
+ if i == (retry_time - 1):
+ print('send message to MQ failed after retries.')
+ except Exception as e:
+ print('ProducerSendSyncFailed:', e)
+ finally:
+ producer.shutdown()
+ MUTEX.release()
+ return
+
+
def send_orderly_with_sharding_key(count):
producer = Producer(gid, True)
producer.set_name_server_address(name_srv)