Merge pull request #20 from ifplusor/fixed-deadlock
Fixed deadlock, and add args field for python callback.
diff --git a/.gitignore b/.gitignore
index 990936c..b232131 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,8 @@
.idea/
-cmake-build-debug/
+cmake-build-*/
+
+*.pyc
+*.so
+
+bin/
+
diff --git a/doc/Introduction.md b/doc/Introduction.md
index 371ad97..628079e 100644
--- a/doc/Introduction.md
+++ b/doc/Introduction.md
@@ -73,8 +73,10 @@
----------
## How to use
- set LD_LIBRARY_PATH
- ``````
+ ```
export LD_LIBRARY_PATH=/usr/local/lib
+ ```
+
- import module
```
from librocketmqclientpython import *
@@ -90,28 +92,28 @@
- producer must invoke following interface:
```
- producer = CreateProducer("please_rename_unique_group_name");
- - SetProducerNameServerAddress(producer,"please_rename_unique_name_server")
+ - SetProducerNameServerAddress(producer, "please_rename_unique_name_server")
- StartProducer(producer)
- - SendMessageSync(producer,msg)
+ - SendMessageSync(producer, msg)
- ShutdownProducer(producer)
- DestroyProducer(producer)
```
- how to consumer messages
```
- - def consumerMessage(msg):
- - topic = GetMessageTopic(msg)
- - body = GetMessageBody(msg)
- - tags = GetMessageTags(msg)
- - msgid = GetMessageId(msg)
- - handle message
- - return 0
+ - def consumerMessage(msg, args):
+ - topic = GetMessageTopic(msg)
+ - body = GetMessageBody(msg)
+ - tags = GetMessageTags(msg)
+ - msgid = GetMessageId(msg)
+ - # handle message...
+ - return 0
```
- pushconsumer must invoke following interface:
```
- consumer = CreatePushConsumer("please_rename_unique_group_name_1");
- - SetPushConsumerNameServerAddress(consumer,"please_rename_unique_name_server")
+ - SetPushConsumerNameServerAddress(consumer, "please_rename_unique_name_server")
- Subscribe(consumer, "your_topic", "*")
- - RegisterMessageCallback(consumer,consumerMessage)
+ - RegisterMessageCallback(consumer, consumerMessage, args)
- StartPushConsumer(consumer)
- ShutdownPushConsumer(consumer)
- DestroyPushConsumer(consumer)
@@ -122,3 +124,4 @@
- python testProducer.py
- push consumer
- python testConsumer.py
+
diff --git a/doc/api-doc/consumer-push.md b/doc/api-doc/consumer-push.md
index a783e47..6ee498e 100644
--- a/doc/api-doc/consumer-push.md
+++ b/doc/api-doc/consumer-push.md
@@ -31,13 +31,14 @@
topic: topic name
tag: topic tag
-* RegisterMessageCallback(consumer, pyCallBack) <br />
+* RegisterMessageCallback(consumer, pyCallBack, pyArgs) <br />
- function description<br />
set callback for push consumer instance <br />
- input <br />
consumer: consumer intance<br />
- pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method
+ pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method<br />
+ pyArgs: the arguments will be passed to pyCallBack
* SetPushConsumerThreadCount(consumer, threadCount)
- function description<br />
diff --git a/sample/testConsumer.py b/sample/testConsumer.py
index 93665ed..03ca587 100644
--- a/sample/testConsumer.py
+++ b/sample/testConsumer.py
@@ -18,8 +18,10 @@
import base
import time
from librocketmqclientpython import *
+
totalMsg = 0
-def consumerMessage(msg):
+
+def consumerMessage(msg, args):
global totalMsg
totalMsg += 1
print(">>ConsumerMessage Called:",totalMsg)
@@ -33,11 +35,12 @@
consumer = CreatePushConsumer("awtTest_Producer_Python_Test")
print(consumer)
-SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876")
-SetPushConsumerThreadCount(consumer,1)
+SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876")
+SetPushConsumerThreadCount(consumer, 1)
Subscribe(consumer, "T_TestTopic", "*")
-RegisterMessageCallback(consumer,consumerMessage)
+RegisterMessageCallback(consumer, consumerMessage, None)
StartPushConsumer(consumer)
+
i = 1
while i <= 60:
print(i)
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index 8c7c1e2..919ba5b 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -30,24 +30,37 @@
const char *VERSION =
"PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";
-map<CPushConsumer *, PyObject *> g_CallBackMap;
+map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
class PyThreadStateLock {
public:
- PyThreadStateLock(void) {
+ PyThreadStateLock() {
state = PyGILState_Ensure();
}
- ~PyThreadStateLock(void) {
- if (state == PyGILState_LOCKED) {
- PyGILState_Release(state);
- }
+ ~PyThreadStateLock() {
+ // NOTE: must paired with PyGILState_Ensure, otherwise it will cause deadlock!!!
+ PyGILState_Release(state);
}
private:
PyGILState_STATE state;
};
+class PyThreadStateUnlock {
+public:
+ PyThreadStateUnlock() : _save(NULL) {
+ Py_UNBLOCK_THREADS
+ }
+
+ ~PyThreadStateUnlock() {
+ Py_BLOCK_THREADS
+ }
+
+private:
+ PyThreadState *_save;
+};
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -146,22 +159,25 @@
}
//consumer
void *PyCreatePushConsumer(const char *groupId) {
- //Py_Initialize();
- PyEval_InitThreads();
-// PyEval_ReleaseThread(PyThreadState_Get());
+ PyEval_InitThreads(); // ensure create GIL, for call Python callback from C.
return (void *) CreatePushConsumer(groupId);
}
int PyDestroyPushConsumer(void *consumer) {
- return DestroyPushConsumer((CPushConsumer *) consumer);
+ CPushConsumer *consumerInner = (CPushConsumer *) consumer;
+ map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
+ iter = g_CallBackMap.find(consumerInner);
+ if (iter != g_CallBackMap.end()) {
+ UnregisterMessageCallback(consumerInner);
+ g_CallBackMap.erase(iter);
+ }
+ return DestroyPushConsumer(consumerInner);
}
int PyStartPushConsumer(void *consumer) {
return StartPushConsumer((CPushConsumer *) consumer);
}
int PyShutdownPushConsumer(void *consumer) {
- int ret = ShutdownPushConsumer((CPushConsumer *) consumer);
- //PyGILState_Ensure();
- //Py_Finalize();
- return ret;
+ PyThreadStateUnlock PyThreadUnlock; // ShutdownPushConsumer is a block call, ensure thread don't hold GIL.
+ return ShutdownPushConsumer((CPushConsumer *) consumer);
}
int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv) {
return SetPushConsumerNameServerAddress((CPushConsumer *) consumer, namesrv);
@@ -172,29 +188,27 @@
int PySubscribe(void *consumer, const char *topic, const char *expression) {
return Subscribe((CPushConsumer *) consumer, topic, expression);
}
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback) {
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) {
CPushConsumer *consumerInner = (CPushConsumer *) consumer;
- g_CallBackMap[consumerInner] = pCallback;
+ g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args));
return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner);
}
int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
-
- class PyThreadStateLock PyThreadLock;
- PyMessageExt message;
- message.pMessageExt = msg;
- map<CPushConsumer *, PyObject *>::iterator iter;
+ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
+ PyMessageExt message = { .pMessageExt = msg };
+ map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
iter = g_CallBackMap.find(consumer);
if (iter != g_CallBackMap.end()) {
- PyObject * pCallback = iter->second;
+ pair<PyObject *, object> callback = iter->second;
+ PyObject * pCallback = callback.first;
+ object& args = callback.second;
if (pCallback != NULL) {
- int status =
- boost::python::call<int>(pCallback, message);
+ int status = boost::python::call<int>(pCallback, message, args);
return status;
}
}
return 1;
-
}
int PySetPushConsumerThreadCount(void *consumer, int threadCount) {
@@ -212,7 +226,7 @@
}
//push consumer
-int PySetPullConsumerNameServerDomain(void *consumer, const char *domain){
+int PySetPullConsumerNameServerDomain(void *consumer, const char *domain) {
return SetPullConsumerNameServerDomain((CPullConsumer *) consumer, domain);
}
//version
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 324676b..2ad8255 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -89,7 +89,7 @@
int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv);
int PySetPushConsumerNameServerDomain(void *consumer, const char *domain);
int PySubscribe(void *consumer, const char *topic, const char *expression);
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback);
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args);
int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg);
int PySetPushConsumerThreadCount(void *consumer, int threadCount);
int PySetPushConsumerMessageBatchMaxSize(void *consumer, int batchSize);
diff --git a/test/TestConsumeMessages.py b/test/TestConsumeMessages.py
index 6b9f6d2..da4b0b6 100644
--- a/test/TestConsumeMessages.py
+++ b/test/TestConsumeMessages.py
@@ -34,7 +34,7 @@
sys.exit(0)
-def consumer_message(msg):
+def consumer_message(msg, args):
global totalMsg
totalMsg += 1
print 'total count %d' % totalMsg
@@ -55,7 +55,7 @@
SetPushConsumerNameServerAddress(consumer, name_srv)
SetPushConsumerThreadCount(consumer, 1)
Subscribe(consumer, _topic, _tag)
- RegisterMessageCallback(consumer, consumerMessage)
+ RegisterMessageCallback(consumer, consumer_message, None)
StartPushConsumer(consumer)
print 'consumer is ready...'
return consumer