Fixed deadlock in producer.send_async (#87)
Fix #84
Release the GIL while calling `producer.sendAsync()` to avoid a deadlock when PyBind is triggering the Python callback.
* Main Thread
1. Holds the Python GIL
2. Call `producer.send_async()`
3. Tries to acquire internal `ClientConnetion` lock
* Pulsar client internal thread
1. Holds lock on `ClientConnection`
2. Receives ack from the broker
3. Triggers callback
4. PyBind11 acquires GIL <---- Deadlock
The problem is the different behavior in PyBind from Boost::Python.
We always need to make sure we release the GIL before making any call to C++ that potentially acquires any mutexes
diff --git a/src/consumer.cc b/src/consumer.cc
index a77bb50..972bd0b 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -59,23 +59,33 @@
void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
+ Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeAsync(msgId, nullptr);
+ Py_END_ALLOW_THREADS
}
void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
+ Py_BEGIN_ALLOW_THREADS
consumer.negativeAcknowledge(msg);
+ Py_END_ALLOW_THREADS
}
void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
+ Py_BEGIN_ALLOW_THREADS
consumer.negativeAcknowledge(msgId);
+ Py_END_ALLOW_THREADS
}
void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
+ Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeCumulativeAsync(msg, nullptr);
+ Py_END_ALLOW_THREADS
}
void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
+ Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeCumulativeAsync(msgId, nullptr);
+ Py_END_ALLOW_THREADS
}
void Consumer_close(Consumer& consumer) {
diff --git a/src/producer.cc b/src/producer.cc
index bba262a..1dd5a76 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -34,6 +34,16 @@
return messageId;
}
+void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
+ Py_BEGIN_ALLOW_THREADS
+ producer.sendAsync(msg, callback);
+ Py_END_ALLOW_THREADS
+
+ if (PyErr_CheckSignals() == -1) {
+ PyErr_SetInterrupt();
+ }
+}
+
void Producer_flush(Producer& producer) {
waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); });
}
@@ -67,7 +77,7 @@
"This method is equivalent to asyncSend() and wait until the callback is triggered.\n"
"\n"
"@param msg message to publish\n")
- .def("send_async", &Producer::sendAsync)
+ .def("send_async", &Producer_sendAsync)
.def("flush", &Producer_flush,
"Flush all the messages buffered in the client and wait until all messages have been\n"
"successfully persisted\n")
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index feba877..00e2466 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1424,5 +1424,19 @@
with self.assertRaises(RuntimeError):
AuthenticationBasic(auth_params_string='invalid auth params')
+ def test_send_async_no_deadlock(self):
+ client = Client(self.serviceUrl)
+ producer = client.create_producer('test_send_async_no_deadlock')
+
+ def send_callback(res, msg):
+ print(f"Message '{msg}' published res={res}")
+
+ for i in range(30):
+ producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)
+
+ producer.flush()
+ client.close()
+
+
if __name__ == "__main__":
main()