feat(producer) add batch message send support
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index dec63ce..56dc58a 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -94,6 +94,18 @@
int PySetMessageDelayTimeLevel(void *msg, int level) {
return SetDelayTimeLevel((CMessage *) msg, level);
}
+
+//batch message
+void *PyCreateBatchMessage() {
+ return (void *) CreateBatchMessage();
+}
+int PyAddMessage(void *batchMsg, void *msg) {
+ return AddMessage((CBatchMessage *) batchMsg, (CMessage *) msg);
+}
+int PyDestroyBatchMessage(void *batchMsg) {
+ return DestroyBatchMessage((CBatchMessage *) batchMsg);
+}
+
//messageExt
const char *PyGetMessageTopic(PyMessageExt msgExt) {
return GetMessageTopic((CMessageExt *) msgExt.pMessageExt);
@@ -200,6 +212,16 @@
return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback);
}
+PySendResult PySendBatchMessage(void *producer, CBatchMessage *batchMessage) {
+ PySendResult ret;
+ CSendResult result;
+ SendBatchMessage((CProducer *) producer, (CBatchMessage *) batchMessage, &result);
+ ret.sendStatus = result.sendStatus;
+ ret.offset = result.offset;
+ strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
+ ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ return ret;
+}
PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) {
@@ -360,6 +382,11 @@
def("SetMessageProperty", PySetMessageProperty);
def("SetDelayTimeLevel", PySetMessageDelayTimeLevel);
+ //For batch message
+ def("CreateBatchMessage", PyCreateBatchMessage, return_value_policy<return_opaque_pointer>());
+ def("AddMessage", PyAddMessage);
+ def("DestroyBatchMessage", PyDestroyBatchMessage);
+
//For MessageExt
def("GetMessageTopic", PyGetMessageTopic);
def("GetMessageTags", PyGetMessageTags);
@@ -382,6 +409,7 @@
def("SendMessageSync", PySendMessageSync);
def("SendMessageAsync", PySendMessageAsync);
+ def("SendBatchMessage", PySendBatchMessage);
def("SendMessageOneway", PySendMessageOneway);
def("SendMessageOrderly", PySendMessageOrderly);
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index c5bc5b9..dc4a519 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -47,9 +47,11 @@
const char *GetFile() {
return (const char *) file;
}
+
const char *GetMsg() {
return (const char *) msg;
}
+
const char *GetType() {
return (const char *) type;
}
@@ -88,6 +90,11 @@
int PySetMessageProperty(void *msg, const char *key, const char *value);
int PySetMessageDelayTimeLevel(void *msg, int level);
+//batch message
+void *PyCreateBatchMessage();
+int PyAddMessage(void *batchMsg, void *msg);
+int PyDestroyBatchMessage(void *batchMsg);
+
//messageExt
const char *PyGetMessageTopic(PyMessageExt msgExt);
const char *PyGetMessageTags(PyMessageExt msgExt);
@@ -115,7 +122,7 @@
void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback);
int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback);
-
+PySendResult PySendBatchMessage(void *producer, CBatchMessage *msg);
PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector);
PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey);
@@ -138,7 +145,7 @@
int PySetPushConsumerMessageBatchMaxSize(void *consumer, int batchSize);
int PySetPushConsumerInstanceName(void *consumer, const char *instanceName);
int PySetPushConsumerSessionCredentials(void *consumer, const char *accessKey, const char *secretKey,
- const char *channel);
+ const char *channel);
//push consumer
int PySetPullConsumerNameServerDomain(void *consumer, const char *domain);