Merge pull request #20 from ifplusor/fixed-deadlock

Fixed deadlock, and add args field for python callback.
diff --git a/.gitignore b/.gitignore
index 990936c..b232131 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,8 @@
 .idea/
-cmake-build-debug/
+cmake-build-*/
+
+*.pyc
+*.so
+
+bin/
+
diff --git a/doc/Introduction.md b/doc/Introduction.md
index 371ad97..628079e 100644
--- a/doc/Introduction.md
+++ b/doc/Introduction.md
@@ -73,8 +73,10 @@
 ----------
 ## How to use
 - set LD_LIBRARY_PATH
-  ``````
+  ```
   export LD_LIBRARY_PATH=/usr/local/lib
+  ```
+  
 - import module
   ```
   from librocketmqclientpython import *
@@ -90,28 +92,28 @@
 - producer must invoke following interface:
   ```
   - producer = CreateProducer("please_rename_unique_group_name");
-  - SetProducerNameServerAddress(producer,"please_rename_unique_name_server")
+  - SetProducerNameServerAddress(producer, "please_rename_unique_name_server")
   - StartProducer(producer)
-  - SendMessageSync(producer,msg)
+  - SendMessageSync(producer, msg)
   - ShutdownProducer(producer)
   - DestroyProducer(producer)
   ```
 - how to consumer messages
   ```
-  - def consumerMessage(msg):
-  - topic = GetMessageTopic(msg)
-  - body = GetMessageBody(msg)
-  - tags = GetMessageTags(msg)
-  - msgid = GetMessageId(msg)
-  - handle message
-  - return 0
+  - def consumerMessage(msg, args):
+  -     topic = GetMessageTopic(msg)
+  -     body = GetMessageBody(msg)
+  -     tags = GetMessageTags(msg)
+  -     msgid = GetMessageId(msg)
+  -     # handle message...
+  -     return 0
   ```
 - pushconsumer must invoke following interface:
   ```
   - consumer = CreatePushConsumer("please_rename_unique_group_name_1");
-  - SetPushConsumerNameServerAddress(consumer,"please_rename_unique_name_server")
+  - SetPushConsumerNameServerAddress(consumer, "please_rename_unique_name_server")
   - Subscribe(consumer, "your_topic", "*")
-  - RegisterMessageCallback(consumer,consumerMessage)
+  - RegisterMessageCallback(consumer, consumerMessage, args)
   - StartPushConsumer(consumer)
   - ShutdownPushConsumer(consumer)
   - DestroyPushConsumer(consumer)
@@ -122,3 +124,4 @@
   - python testProducer.py
 - push consumer
   - python testConsumer.py
+
diff --git a/doc/api-doc/consumer-push.md b/doc/api-doc/consumer-push.md
index a783e47..6ee498e 100644
--- a/doc/api-doc/consumer-push.md
+++ b/doc/api-doc/consumer-push.md
@@ -31,13 +31,14 @@
     topic: topic name
     tag: topic tag
 
-* RegisterMessageCallback(consumer, pyCallBack) <br />
+* RegisterMessageCallback(consumer, pyCallBack, pyArgs) <br />
   - function description<br />
     set callback for push consumer instance <br />
 
   - input <br />
     consumer: consumer intance<br />
-    pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method
+    pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method<br />
+    pyArgs: the arguments will be passed to pyCallBack
 
 * SetPushConsumerThreadCount(consumer, threadCount)
   - function description<br />
diff --git a/sample/testConsumer.py b/sample/testConsumer.py
index 93665ed..03ca587 100644
--- a/sample/testConsumer.py
+++ b/sample/testConsumer.py
@@ -18,8 +18,10 @@
 import base
 import time
 from librocketmqclientpython import *
+
 totalMsg = 0
-def consumerMessage(msg):
+
+def consumerMessage(msg, args):
      global totalMsg
      totalMsg += 1
      print(">>ConsumerMessage Called:",totalMsg)
@@ -33,11 +35,12 @@
 
 consumer = CreatePushConsumer("awtTest_Producer_Python_Test")
 print(consumer)
-SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876")
-SetPushConsumerThreadCount(consumer,1)
+SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876")
+SetPushConsumerThreadCount(consumer, 1)
 Subscribe(consumer, "T_TestTopic", "*")
-RegisterMessageCallback(consumer,consumerMessage)
+RegisterMessageCallback(consumer, consumerMessage, None)
 StartPushConsumer(consumer)
+
 i = 1
 while i <= 60:
     print(i)
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index 8c7c1e2..919ba5b 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -30,24 +30,37 @@
 const char *VERSION =
         "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";
 
-map<CPushConsumer *, PyObject *> g_CallBackMap;
+map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
 
 class PyThreadStateLock {
 public:
-    PyThreadStateLock(void) {
+    PyThreadStateLock() {
         state = PyGILState_Ensure();
     }
 
-    ~PyThreadStateLock(void) {
-        if (state == PyGILState_LOCKED) {
-            PyGILState_Release(state);
-        }
+    ~PyThreadStateLock() {
+        // NOTE: must paired with PyGILState_Ensure, otherwise it will cause deadlock!!!
+        PyGILState_Release(state);
     }
 
 private:
     PyGILState_STATE state;
 };
 
+class PyThreadStateUnlock {
+public:
+    PyThreadStateUnlock() : _save(NULL) {
+        Py_UNBLOCK_THREADS
+    }
+
+    ~PyThreadStateUnlock() {
+        Py_BLOCK_THREADS
+    }
+
+private:
+    PyThreadState *_save;
+};
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -146,22 +159,25 @@
 }
 //consumer
 void *PyCreatePushConsumer(const char *groupId) {
-    //Py_Initialize();
-    PyEval_InitThreads();
-//    PyEval_ReleaseThread(PyThreadState_Get());
+    PyEval_InitThreads();  // ensure create GIL, for call Python callback from C.
     return (void *) CreatePushConsumer(groupId);
 }
 int PyDestroyPushConsumer(void *consumer) {
-    return DestroyPushConsumer((CPushConsumer *) consumer);
+    CPushConsumer *consumerInner = (CPushConsumer *) consumer;
+    map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
+    iter = g_CallBackMap.find(consumerInner);
+    if (iter != g_CallBackMap.end()) {
+        UnregisterMessageCallback(consumerInner);
+        g_CallBackMap.erase(iter);
+    }
+    return DestroyPushConsumer(consumerInner);
 }
 int PyStartPushConsumer(void *consumer) {
     return StartPushConsumer((CPushConsumer *) consumer);
 }
 int PyShutdownPushConsumer(void *consumer) {
-    int ret = ShutdownPushConsumer((CPushConsumer *) consumer);
-    //PyGILState_Ensure();
-    //Py_Finalize();
-    return ret;
+    PyThreadStateUnlock PyThreadUnlock;  // ShutdownPushConsumer is a block call, ensure thread don't hold GIL.
+    return ShutdownPushConsumer((CPushConsumer *) consumer);
 }
 int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv) {
     return SetPushConsumerNameServerAddress((CPushConsumer *) consumer, namesrv);
@@ -172,29 +188,27 @@
 int PySubscribe(void *consumer, const char *topic, const char *expression) {
     return Subscribe((CPushConsumer *) consumer, topic, expression);
 }
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback) {
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) {
     CPushConsumer *consumerInner = (CPushConsumer *) consumer;
-    g_CallBackMap[consumerInner] = pCallback;
+    g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args));
     return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner);
 }
 
 int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
-
-    class PyThreadStateLock PyThreadLock;
-    PyMessageExt message;
-    message.pMessageExt = msg;
-    map<CPushConsumer *, PyObject *>::iterator iter;
+    PyThreadStateLock PyThreadLock;  // ensure hold GIL, before call python callback
+    PyMessageExt message = { .pMessageExt = msg };
+    map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
     iter = g_CallBackMap.find(consumer);
     if (iter != g_CallBackMap.end()) {
-        PyObject * pCallback = iter->second;
+        pair<PyObject *, object> callback = iter->second;
+        PyObject * pCallback = callback.first;
+        object& args = callback.second;
         if (pCallback != NULL) {
-            int status =
-                    boost::python::call<int>(pCallback, message);
+            int status = boost::python::call<int>(pCallback, message, args);
             return status;
         }
     }
     return 1;
-
 }
 
 int PySetPushConsumerThreadCount(void *consumer, int threadCount) {
@@ -212,7 +226,7 @@
 }
 
 //push consumer
-int PySetPullConsumerNameServerDomain(void *consumer, const char *domain){
+int PySetPullConsumerNameServerDomain(void *consumer, const char *domain) {
     return SetPullConsumerNameServerDomain((CPullConsumer *) consumer, domain);
 }
 //version
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 324676b..2ad8255 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -89,7 +89,7 @@
 int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv);
 int PySetPushConsumerNameServerDomain(void *consumer, const char *domain);
 int PySubscribe(void *consumer, const char *topic, const char *expression);
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback);
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args);
 int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg);
 int PySetPushConsumerThreadCount(void *consumer, int threadCount);
 int PySetPushConsumerMessageBatchMaxSize(void *consumer, int batchSize);
diff --git a/test/TestConsumeMessages.py b/test/TestConsumeMessages.py
index 6b9f6d2..da4b0b6 100644
--- a/test/TestConsumeMessages.py
+++ b/test/TestConsumeMessages.py
@@ -34,7 +34,7 @@
     sys.exit(0)
 
 
-def consumer_message(msg):
+def consumer_message(msg, args):
     global totalMsg
     totalMsg += 1
     print 'total count %d' % totalMsg
@@ -55,7 +55,7 @@
     SetPushConsumerNameServerAddress(consumer, name_srv)
     SetPushConsumerThreadCount(consumer, 1)
     Subscribe(consumer, _topic, _tag)
-    RegisterMessageCallback(consumer, consumerMessage)
+    RegisterMessageCallback(consumer, consumer_message, None)
     StartPushConsumer(consumer)
     print 'consumer is ready...'
     return consumer