feat(producer) add thread lock for send message async
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index 0c3657d..f515c9e 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -116,6 +116,7 @@
//producer
void *PyCreateProducer(const char *groupId) {
+ PyEval_InitThreads(); // ensure create GIL, for call Python callback from C.
return (void *) CreateProducer(groupId);
}
int PyDestroyProducer(void *producer) {
@@ -125,6 +126,7 @@
return StartProducer((CProducer *) producer);
}
int PyShutdownProducer(void *producer) {
+ PyThreadStateUnlock PyThreadUnlock; // Shutdown Producer is a block call, ensure thread don't hold GIL.
return ShutdownProducer((CProducer *) producer);
}
int PySetProducerNameServerAddress(void *producer, const char *namesrv) {
@@ -162,20 +164,22 @@
return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
}
-void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback){
+void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){
+ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
PySendResult sendResult;
- PyCallback *callback = (PyCallback *)pyCallback;
sendResult.sendStatus = result.sendStatus;
sendResult.offset = result.offset;
strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ PyCallback *callback = (PyCallback *)pyCallback;
boost::python::call<void>(callback->successCallback, sendResult, (void *) msg);
}
-void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback){
+void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){
+ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
PyCallback *callback = (PyCallback *)pyCallback;
- boost::python::call<void>(callback->execptionCallback, (void *) msg, e);
+ boost::python::call<void>(callback->exceptionCallback, (void *) msg, e);
}
int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){
@@ -197,6 +201,12 @@
return ret;
}
+int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) {
+ PyUserData *userData = (PyUserData *)args;
+ int index = boost::python::call<int>(userData->pyObject, size, (void *) msg, userData->pData);
+ return index;
+}
+
PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) {
PySendResult ret;
CSendResult result;
@@ -208,13 +218,6 @@
return ret;
}
-
-int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) {
- PyUserData *userData = (PyUserData *)args;
- int index = boost::python::call<int>(userData->pyObject, size, (void *) msg, userData->pData);
- return index;
-}
-
//SendResult
const char *PyGetSendResultMsgID(CSendResult &sendResult) {
return (const char *) (sendResult.msgId);
@@ -358,8 +361,6 @@
def("SetProducerMaxMessageSize", PySetProducerMaxMessageSize);
def("SendMessageSync", PySendMessageSync);
- def("SendSuccessCallback", PySendSuccessCallback);
- def("SendExceptionCallback", PySendExceptionCallback);
def("SendMessageAsync", PySendMessageAsync);
def("SendMessageOneway", PySendMessageOneway);
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index f9beea9..d978984 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -48,7 +48,7 @@
typedef struct _PyCallback_ {
PyObject *successCallback;
- PyObject *execptionCallback;
+ PyObject *exceptionCallback;
} PyCallback;
#define PYTHON_CLIENT_VERSION "1.2.0"
@@ -92,8 +92,8 @@
PySendResult PySendMessageSync(void *producer, void *msg);
int PySendMessageOneway(void *producer, void *msg);
-void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback);
-void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback);
+void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback);
+void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback);
int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback);
diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py
index dc80784..7da9157 100644
--- a/test/TestSendMessages.py
+++ b/test/TestSendMessages.py
@@ -231,6 +231,29 @@
def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0....
return 0
-
+
+def send_message_async(count):
+ key = 'rmq-key'
+ print 'start sending message'
+ tag = 'test'
+ for n in range(count):
+ body = 'hi rmq message, now is' + str(n)
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+ SetMessageTags(msg, tag)
+
+ SendMessageAsync(producer, msg, send_message_async_success, send_message_async_fail)
+ DestroyMessage(msg)
+ print 'send done'
+
+def send_message_async_success(result, msg):
+ print 'send success'
+ pass
+
+def send_message_async_fail(msg, exception):
+ print 'send failed'
+ pass
+
if __name__ == '__main__':
send_message_orderly(10)