Support sending orderly message.
Add sendorderly test cases and doc
diff --git a/.gitignore b/.gitignore
index b232131..72dcbef 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,4 +5,4 @@
*.so
bin/
-
+.vscode/
\ No newline at end of file
diff --git a/doc/api-doc/producer.md b/doc/api-doc/producer.md
index 6af4c99..9d329f4 100644
--- a/doc/api-doc/producer.md
+++ b/doc/api-doc/producer.md
@@ -91,4 +91,16 @@
- input<br />
producer : a producer instance <br />
- msg : a message instance <br />
\ No newline at end of file
+ msg : a message instance <br />
+
+* SendMessageOrderly(producer, msg, autoRetryTimes,arg, queueSelectorCallback)
+ - function description<br />
+ send a message orderly
+
+ - input<br />
+ producer : a producer instance <br />
+ msg : a message instance <br />
+ autoRetryTimes: retry times when send fail<br />
+ arg: send args<br />
+ queueSelectorCallback: callback for which queue choose to send message to. return queue index start from 0 to (max queue count -1)
+
diff --git a/doc/quick-start.md b/doc/quick-start.md
new file mode 100644
index 0000000..245eacb
--- /dev/null
+++ b/doc/quick-start.md
@@ -0,0 +1,64 @@
+----------
+## Qucik start
+
+* set cpp despendencies
+ ```bash
+ wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/linux/1.0.2/RHEL7.x/librocketmq.tar.gz
+
+ tar -zxvf librocketmq.tar.gz
+
+ cd librocketmq
+
+ cp -R rocketmq /usr/local/include
+
+ cd librocketmq.a librocketmq.so /usr/local/lib
+
+ set LD_LIBRARY_PATH
+
+ ```
+
+* build python client from source. if you already had it, ignore this step
+ - [how to build](https://github.com/apache/rocketmq-client-python/blob/master/doc/Introduction.md)
+
+ - copy the build result [librocketmqclientpython.so](#) to /usr/local/lib
+
+* how to produce a message<br />
+ ```python
+ from librocketmqclientpython import *
+ ### how to init a producer instance
+ def init_producer():
+ producer = CreateProducer('your producer group name')
+ SetProducerNameServerAddress(producer, 'your name srv address')
+ StartProducer(producer)
+ return producer
+ ### how to send a message
+ def send(body):
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'done . msg id = ' + result.GetMsgId()
+ ```
+
+* how to consume the message
+ ```python
+ from librocketmqclientpython import *
+ ## how to init a consumer intance
+ def build_consumer(_group, _topic, _tag):
+ consumer = CreatePushConsumer(_group)
+ SetPushConsumerNameServerAddress(consumer, name_srv)
+ SetPushConsumerThreadCount(consumer, 1)
+ Subscribe(consumer, _topic, _tag)
+ RegisterMessageCallback(consumer, callback)
+ StartPushConsumer(consumer)
+ print 'consumer is ready...'
+ return consumer
+ ## callback to consume the messages
+ def callback(msg):
+ print 'topic=%s' % GetMessageTopic(msg)
+ print 'tag=%s' % GetMessageTags(msg)
+ print 'body=%s' % GetMessageBody(msg)
+ print 'msg id=%s' % GetMessageId(msg)
+ print 'map.keys %s' % GetMessageKeys(msg)
+ return 0
+ ```
\ No newline at end of file
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index 919ba5b..f924e56 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -153,6 +153,24 @@
return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
}
+PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) {
+ PySendResult ret;
+ CSendResult result;
+ PyUserData userData = {queueSelector,args};
+ SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes, &result);
+ ret.sendStatus = result.sendStatus;
+ ret.offset = result.offset;
+ strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
+ ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ return ret;
+}
+
+int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) {
+ PyUserData *userData = (PyUserData *)args;
+ int index = boost::python::call<int>(userData->pyObject, size, (void *) msg, userData->pData);
+ return index;
+}
+
//SendResult
const char *PyGetSendResultMsgID(CSendResult &sendResult) {
return (const char *) (sendResult.msgId);
@@ -294,6 +312,7 @@
def("SetProducerSessionCredentials", PySetProducerSessionCredentials);
def("SendMessageSync", PySendMessageSync);
def("SendMessageOneway", PySendMessageOneway);
+ def("SendMessageOrderly", PySendMessageOrderly);
//For Consumer
def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy<return_opaque_pointer>());
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 7e66948..987d839 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -40,6 +40,11 @@
CMessageExt *pMessageExt;
} PyMessageExt;
+typedef struct _PyUserData_ {
+ PyObject *pyObject;
+ void *pData;
+} PyUserData;
+
#define PYTHON_CLIENT_VERSION "1.2.0"
#define PYCLI_BUILD_DATE "04-12-2018"
@@ -77,6 +82,9 @@
int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel);
PySendResult PySendMessageSync(void *producer, void *msg);
int PySendMessageOneway(void *producer, void *msg);
+// PySendResult PySendMessageOrderly(void *producer, void *msg , int autoRetryTimes, PyObject *args, PyObject *callback);
+PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector);
+int PyOrderlyCallbackInner(int size, CMessage *msg, void *args);
//sendResult
const char *PyGetSendResultMsgID(CSendResult &sendResult);
diff --git a/test/TestConsumeMessages.py b/test/TestConsumeMessages.py
index da4b0b6..5017a81 100644
--- a/test/TestConsumeMessages.py
+++ b/test/TestConsumeMessages.py
@@ -21,10 +21,13 @@
import time
import sys
-topic = 'test'
+topic = 'test-topic-normal'
+topic_orderly = 'test-topic-normal-orderly'
+
name_srv = '127.0.0.1:9876'
tag = 'rmq-tag'
consumer_group = 'test-consumer-group'
+consumer_group_orderly = 'test-topic-normal-orderly_group'
totalMsg = 0
@@ -50,7 +53,7 @@
return 0
-def init_producer(_group, _topic, _tag):
+def init_consumer(_group, _topic, _tag):
consumer = CreatePushConsumer(_group)
SetPushConsumerNameServerAddress(consumer, name_srv)
SetPushConsumerThreadCount(consumer, 1)
@@ -62,7 +65,19 @@
def start_one_consumer(_group, _topic, _tag):
- consumer = init_producer(_group, _topic, _tag)
+ consumer = init_consumer(_group, _topic, _tag)
+ i = 1
+ while i <= 10:
+ print 'clock: ' + str(i)
+ i += 1
+ time.sleep(10)
+
+ ShutdownPushConsumer(consumer)
+ DestroyPushConsumer(consumer)
+ print("Consumer Down....")
+
+def start_orderly_consumer():
+ consumer = init_consumer(consumer_group_orderly, topic_orderly, "*")
i = 1
while i <= 10:
print 'clock: ' + str(i)
@@ -75,4 +90,4 @@
if __name__ == '__main__':
- start_one_consumer(consumer_group, topic, '*')
+ start_orderly_consumer()
diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py
index 69871a2..7258d70 100644
--- a/test/TestSendMessages.py
+++ b/test/TestSendMessages.py
@@ -19,7 +19,8 @@
from librocketmqclientpython import *
import time
-topic = 'test'
+topic = 'test-topic-normal'
+topic_orderly = 'test-topic-normal-orderly'
name_srv = '127.0.0.1:9876'
@@ -198,9 +199,23 @@
DestroyMessage(msg)
print 'msg id =' + result.GetMsgId()
+def send_message_orderly(count):
+ key = 'rmq-key'
+ print 'start sending order-ly message'
+ tag = 'test'
+ for n in range(count):
+ body = 'hi rmq orderly-message, now is' + str(n)
+ msg = CreateMessage(topic_orderly)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+ SetMessageTags(msg, tag)
+ result = SendMessageOrderly(producer, msg, 1, None, calc_which_queue_to_send)
+ DestroyMessage(msg)
+ print 'msg id =' + result.GetMsgId()
+
+def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0....
+ return 0
+
if __name__ == '__main__':
- # print GetVersion()
- while True:
- send_messages_oneway(1)
- time.sleep(1)
+ send_message_orderly(10)