Wrap the interruption to a custom exception when a blocking API is interrupted (#99)
### Motivation
Currently, when a blocking API is interrupted by a signal, `SystemError`
will be thrown. However, in this case, `PyErr_SetInterrupt` will be
called and next time a blocking API is called, `std::system_error` will
be somehow thrown.
The failure of
https://lists.apache.org/thread/cmzykd9qz9x1d0s35nc5912o3slwpxpv is
caused by this issue. The `SystemError` is not called, then
`client.close()` will be skipped, which leads to the `bad_weak_ptr`
error.
P.S. Currently we have to call `client.close()` on a `Client` instance,
otherwise, the `bad_weak_ptr` will be thrown.
However, even if we caught the `SystemError` like:
```python
try:
msg = consumer.receive()
# ...
except SystemError:
break
```
we would still see the following error:
```
terminate called after throwing an instance of 'std::system_error'
what(): Operation not permitted
Aborted
```
### Modifications
- Wrap `ResultInterrupted` into the `pulsar.Interrupted` exception.
- Refactor the `waitForAsyncValue` and `waitForAsyncResult` functions
and raise `pulsar.Interrupted` when `PyErr_CheckSignals` detects a
signal.
- Add `InterruptedTest` to cover this case.
- Remove `future.h` since we now use `std::future` instead of the
manually implemented `Future`.
- Fix the `examples/consumer.py` to support stopping by Ctrl+C.
(cherry picked from commit ec05f50bf489aef85532d61f577c62649a5b71a6)
diff --git a/examples/consumer.py b/examples/consumer.py
index 8c2985e..d698f48 100755
--- a/examples/consumer.py
+++ b/examples/consumer.py
@@ -29,8 +29,12 @@
})
while True:
- msg = consumer.receive()
- print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
- consumer.acknowledge(msg)
+ try:
+ msg = consumer.receive()
+ print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
+ consumer.acknowledge(msg)
+ except pulsar.Interrupted:
+ print("Stop receiving messages")
+ break
client.close()
diff --git a/pulsar/exceptions.py b/pulsar/exceptions.py
index d151564..1b425c8 100644
--- a/pulsar/exceptions.py
+++ b/pulsar/exceptions.py
@@ -25,4 +25,4 @@
ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
- NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
+ NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull, Interrupted
diff --git a/src/client.cc b/src/client.cc
index 206c4e2..0103309 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -24,73 +24,38 @@
namespace py = pybind11;
Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
- Producer producer;
-
- waitForAsyncValue(std::function<void(CreateProducerCallback)>([&](CreateProducerCallback callback) {
- client.createProducerAsync(topic, conf, callback);
- }),
- producer);
-
- return producer;
+ return waitForAsyncValue<Producer>(
+ [&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
}
Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
- Consumer consumer;
-
- waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
- client.subscribeAsync(topic, subscriptionName, conf, callback);
- }),
- consumer);
-
- return consumer;
+ return waitForAsyncValue<Consumer>(
+ [&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
}
Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
- Consumer consumer;
-
- waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
- client.subscribeAsync(topics, subscriptionName, conf, callback);
- }),
- consumer);
-
- return consumer;
+ return waitForAsyncValue<Consumer>(
+ [&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
}
Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
- Consumer consumer;
-
- waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
- client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
- }),
- consumer);
-
- return consumer;
+ return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
+ client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
+ });
}
Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf) {
- Reader reader;
-
- waitForAsyncValue(std::function<void(ReaderCallback)>([&](ReaderCallback callback) {
- client.createReaderAsync(topic, startMessageId, conf, callback);
- }),
- reader);
-
- return reader;
+ return waitForAsyncValue<Reader>(
+ [&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
}
std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
- std::vector<std::string> partitions;
-
- waitForAsyncValue(std::function<void(GetPartitionsCallback)>([&](GetPartitionsCallback callback) {
- client.getPartitionsForTopicAsync(topic, callback);
- }),
- partitions);
-
- return partitions;
+ return waitForAsyncValue<std::vector<std::string>>(
+ [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
}
void Client_close(Client& client) {
diff --git a/src/consumer.cc b/src/consumer.cc
index 972bd0b..4b44775 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -29,13 +29,7 @@
}
Message Consumer_receive(Consumer& consumer) {
- Message msg;
-
- waitForAsyncValue(std::function<void(ReceiveCallback)>(
- [&consumer](ReceiveCallback callback) { consumer.receiveAsync(callback); }),
- msg);
-
- return msg;
+ return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
}
Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
@@ -59,32 +53,27 @@
void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
- Py_BEGIN_ALLOW_THREADS
- consumer.acknowledgeAsync(msgId, nullptr);
+ Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
}
void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
- Py_BEGIN_ALLOW_THREADS
- consumer.negativeAcknowledge(msg);
+ Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg);
Py_END_ALLOW_THREADS
}
void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
- Py_BEGIN_ALLOW_THREADS
- consumer.negativeAcknowledge(msgId);
+ Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
Py_END_ALLOW_THREADS
}
void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
- Py_BEGIN_ALLOW_THREADS
- consumer.acknowledgeCumulativeAsync(msg, nullptr);
+ Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
Py_END_ALLOW_THREADS
}
void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
- Py_BEGIN_ALLOW_THREADS
- consumer.acknowledgeCumulativeAsync(msgId, nullptr);
+ Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
}
diff --git a/src/future.h b/src/future.h
deleted file mode 100644
index 6754c89..0000000
--- a/src/future.h
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_FUTURE_H_
-#define LIB_FUTURE_H_
-
-#include <functional>
-#include <mutex>
-#include <memory>
-#include <condition_variable>
-
-#include <list>
-
-typedef std::unique_lock<std::mutex> Lock;
-
-namespace pulsar {
-
-template <typename Result, typename Type>
-struct InternalState {
- std::mutex mutex;
- std::condition_variable condition;
- Result result;
- Type value;
- bool complete;
-
- std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
- public:
- typedef std::function<void(Result, const Type&)> ListenerCallback;
-
- Future& addListener(ListenerCallback callback) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (state->complete) {
- lock.unlock();
- callback(state->result, state->value);
- } else {
- state->listeners.push_back(callback);
- }
-
- return *this;
- }
-
- Result get(Type& result) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (!state->complete) {
- // Wait for result
- while (!state->complete) {
- state->condition.wait(lock);
- }
- }
-
- result = state->value;
- return state->result;
- }
-
- template <typename Duration>
- bool get(Result& res, Type& value, Duration d) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (!state->complete) {
- // Wait for result
- while (!state->complete) {
- if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) {
- // Timeout while waiting for the future to complete
- return false;
- }
- }
- }
-
- value = state->value;
- res = state->result;
- return true;
- }
-
- private:
- typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
- Future(InternalStatePtr state) : state_(state) {}
-
- std::shared_ptr<InternalState<Result, Type> > state_;
-
- template <typename U, typename V>
- friend class Promise;
-};
-
-template <typename Result, typename Type>
-class Promise {
- public:
- Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
-
- bool setValue(const Type& value) const {
- static Result DEFAULT_RESULT;
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (state->complete) {
- return false;
- }
-
- state->value = value;
- state->result = DEFAULT_RESULT;
- state->complete = true;
-
- decltype(state->listeners) listeners;
- listeners.swap(state->listeners);
-
- lock.unlock();
-
- for (auto& callback : listeners) {
- callback(DEFAULT_RESULT, value);
- }
-
- state->condition.notify_all();
- return true;
- }
-
- bool setFailed(Result result) const {
- static Type DEFAULT_VALUE;
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (state->complete) {
- return false;
- }
-
- state->result = result;
- state->complete = true;
-
- decltype(state->listeners) listeners;
- listeners.swap(state->listeners);
-
- lock.unlock();
-
- for (auto& callback : listeners) {
- callback(result, DEFAULT_VALUE);
- }
-
- state->condition.notify_all();
- return true;
- }
-
- bool isComplete() const {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
- return state->complete;
- }
-
- Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
-
- private:
- typedef std::function<void(Result, const Type&)> ListenerCallback;
- std::shared_ptr<InternalState<Result, Type> > state_;
-};
-
-class Void {};
-
-} /* namespace pulsar */
-
-#endif /* LIB_FUTURE_H_ */
diff --git a/src/producer.cc b/src/producer.cc
index 1dd5a76..7027185 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -25,21 +25,15 @@
namespace py = pybind11;
MessageId Producer_send(Producer& producer, const Message& message) {
- MessageId messageId;
-
- waitForAsyncValue(std::function<void(SendCallback)>(
- [&](SendCallback callback) { producer.sendAsync(message, callback); }),
- messageId);
-
- return messageId;
+ return waitForAsyncValue<MessageId>(
+ [&](SendCallback callback) { producer.sendAsync(message, callback); });
}
void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
- Py_BEGIN_ALLOW_THREADS
- producer.sendAsync(msg, callback);
+ Py_BEGIN_ALLOW_THREADS producer.sendAsync(msg, callback);
Py_END_ALLOW_THREADS
- if (PyErr_CheckSignals() == -1) {
+ if (PyErr_CheckSignals() == -1) {
PyErr_SetInterrupt();
}
}
diff --git a/src/reader.cc b/src/reader.cc
index 7194c29..0126f3f 100644
--- a/src/reader.cc
+++ b/src/reader.cc
@@ -62,14 +62,8 @@
}
bool Reader_hasMessageAvailable(Reader& reader) {
- bool available = false;
-
- waitForAsyncValue(
- std::function<void(HasMessageAvailableCallback)>(
- [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }),
- available);
-
- return available;
+ return waitForAsyncValue<bool>(
+ [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); });
}
void Reader_close(Reader& reader) {
diff --git a/src/utils.cc b/src/utils.cc
index cf8f6f4..8ebc3f9 100644
--- a/src/utils.cc
+++ b/src/utils.cc
@@ -20,28 +20,29 @@
#include "utils.h"
void waitForAsyncResult(std::function<void(ResultCallback)> func) {
- Result res = ResultOk;
- bool b;
- Promise<bool, Result> promise;
- Future<bool, Result> future = promise.getFuture();
+ auto promise = std::make_shared<std::promise<Result>>();
+ func([promise](Result result) { promise->set_value(result); });
+ internal::waitForResult(*promise);
+}
- Py_BEGIN_ALLOW_THREADS func(WaitForCallback(promise));
- Py_END_ALLOW_THREADS
+namespace internal {
- bool isComplete;
+void waitForResult(std::promise<pulsar::Result>& promise) {
+ auto future = promise.get_future();
while (true) {
- // Check periodically for Python signals
- Py_BEGIN_ALLOW_THREADS isComplete = future.get(b, std::ref(res), std::chrono::milliseconds(100));
- Py_END_ALLOW_THREADS
-
- if (isComplete) {
- CHECK_RESULT(res);
- return;
+ {
+ py::gil_scoped_release release;
+ auto status = future.wait_for(std::chrono::milliseconds(100));
+ if (status == std::future_status::ready) {
+ CHECK_RESULT(future.get());
+ return;
+ }
}
-
- if (PyErr_CheckSignals() == -1) {
- PyErr_SetInterrupt();
- return;
+ py::gil_scoped_acquire acquire;
+ if (PyErr_CheckSignals() != 0) {
+ raiseException(ResultInterrupted);
}
}
}
+
+} // namespace internal
diff --git a/src/utils.h b/src/utils.h
index fb700c6..bbe202e 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -21,12 +21,14 @@
#include <pulsar/Client.h>
#include <pulsar/MessageBatch.h>
+#include <chrono>
#include <exception>
-#include <Python.h>
+#include <future>
+#include <pybind11/pybind11.h>
#include "exceptions.h"
-#include "future.h"
using namespace pulsar;
+namespace py = pybind11;
inline void CHECK_RESULT(Result res) {
if (res != ResultOk) {
@@ -34,56 +36,26 @@
}
}
-struct WaitForCallback {
- Promise<bool, Result> m_promise;
+namespace internal {
- WaitForCallback(Promise<bool, Result> promise) : m_promise(promise) {}
+void waitForResult(std::promise<pulsar::Result>& promise);
- void operator()(Result result) { m_promise.setValue(result); }
-};
-
-template <typename T>
-struct WaitForCallbackValue {
- Promise<Result, T>& m_promise;
-
- WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
-
- void operator()(Result result, const T& value) {
- if (result == ResultOk) {
- m_promise.setValue(value);
- } else {
- m_promise.setFailed(result);
- }
- }
-};
+} // namespace internal
void waitForAsyncResult(std::function<void(ResultCallback)> func);
-template <typename T, typename Callback>
-inline void waitForAsyncValue(std::function<void(Callback)> func, T& value) {
- Result res = ResultOk;
- Promise<Result, T> promise;
- Future<Result, T> future = promise.getFuture();
+template <typename T>
+inline T waitForAsyncValue(std::function<void(std::function<void(Result, const T&)>)> func) {
+ auto resultPromise = std::make_shared<std::promise<Result>>();
+ auto valuePromise = std::make_shared<std::promise<T>>();
- Py_BEGIN_ALLOW_THREADS func(WaitForCallbackValue<T>(promise));
- Py_END_ALLOW_THREADS
+ func([resultPromise, valuePromise](Result result, const T& value) {
+ valuePromise->set_value(value);
+ resultPromise->set_value(result);
+ });
- bool isComplete;
- while (true) {
- // Check periodically for Python signals
- Py_BEGIN_ALLOW_THREADS isComplete = future.get(res, std::ref(value), std::chrono::milliseconds(100));
- Py_END_ALLOW_THREADS
-
- if (isComplete) {
- CHECK_RESULT(res);
- return;
- }
-
- if (PyErr_CheckSignals() == -1) {
- PyErr_SetInterrupt();
- return;
- }
- }
+ internal::waitForResult(*resultPromise);
+ return valuePromise->get_future().get();
}
struct CryptoKeyReaderWrapper {
diff --git a/tests/interrupted_test.py b/tests/interrupted_test.py
new file mode 100644
index 0000000..6d61f99
--- /dev/null
+++ b/tests/interrupted_test.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import pulsar
+import signal
+import time
+import threading
+
+class InterruptedTest(TestCase):
+
+ service_url = 'pulsar://localhost:6650'
+
+ def test_sigint(self):
+ def thread_function():
+ time.sleep(1)
+ signal.raise_signal(signal.SIGINT)
+
+ client = pulsar.Client(self.service_url)
+ consumer = client.subscribe('test-sigint', "my-sub")
+ thread = threading.Thread(target=thread_function)
+ thread.start()
+
+ start = time.time()
+ with self.assertRaises(pulsar.Interrupted):
+ consumer.receive()
+ finish = time.time()
+ print(f"time: {finish - start}")
+ self.assertGreater(finish - start, 1)
+ self.assertLess(finish - start, 1.5)
+ client.close()
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 13349f9..5168f94 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -24,4 +24,5 @@
cd $ROOT_DIR/tests
python3 custom_logger_test.py
+python3 interrupted_test.py
python3 pulsar_test.py