Fixed deadlock in producer.send_async (#87)

Fix #84

Release the GIL while calling `producer.sendAsync()` to avoid a deadlock when PyBind is triggering the Python callback. 


  * Main Thread 
     1. Holds the Python GIL
     2. Call `producer.send_async()`
     3. Tries to acquire internal `ClientConnetion` lock

 * Pulsar client internal thread
     1. Holds lock on `ClientConnection`
     2. Receives ack from the broker
     3. Triggers callback
     4. PyBind11 acquires GIL <---- Deadlock

The problem is the different behavior in PyBind from Boost::Python. 

We always need to make sure we release the GIL before making any call to C++ that potentially acquires any mutexes
diff --git a/src/consumer.cc b/src/consumer.cc
index a77bb50..972bd0b 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -59,23 +59,33 @@
 void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.acknowledgeAsync(msgId, nullptr);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.negativeAcknowledge(msg);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.negativeAcknowledge(msgId);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.acknowledgeCumulativeAsync(msg, nullptr);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.acknowledgeCumulativeAsync(msgId, nullptr);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_close(Consumer& consumer) {
diff --git a/src/producer.cc b/src/producer.cc
index bba262a..1dd5a76 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -34,6 +34,16 @@
     return messageId;
 }
 
+void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
+    Py_BEGIN_ALLOW_THREADS
+    producer.sendAsync(msg, callback);
+    Py_END_ALLOW_THREADS
+
+    if (PyErr_CheckSignals() == -1) {
+        PyErr_SetInterrupt();
+    }
+}
+
 void Producer_flush(Producer& producer) {
     waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); });
 }
@@ -67,7 +77,7 @@
              "This method is equivalent to asyncSend() and wait until the callback is triggered.\n"
              "\n"
              "@param msg message to publish\n")
-        .def("send_async", &Producer::sendAsync)
+        .def("send_async", &Producer_sendAsync)
         .def("flush", &Producer_flush,
              "Flush all the messages buffered in the client and wait until all messages have been\n"
              "successfully persisted\n")
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index feba877..00e2466 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1424,5 +1424,19 @@
         with self.assertRaises(RuntimeError):
             AuthenticationBasic(auth_params_string='invalid auth params')
 
+    def test_send_async_no_deadlock(self):
+        client = Client(self.serviceUrl)
+        producer = client.create_producer('test_send_async_no_deadlock')
+
+        def send_callback(res, msg):
+            print(f"Message '{msg}' published res={res}")
+
+        for i in range(30):
+            producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)
+
+        producer.flush()
+        client.close()
+
+
 if __name__ == "__main__":
     main()