Wrap the interruption to a custom exception when a blocking API is interrupted (#99)

### Motivation

Currently, when a blocking API is interrupted by a signal, `SystemError`
will be thrown. However, in this case, `PyErr_SetInterrupt` will be
called and next time a blocking API is called, `std::system_error` will
be somehow thrown.

The failure of
https://lists.apache.org/thread/cmzykd9qz9x1d0s35nc5912o3slwpxpv is
caused by this issue. The `SystemError` is not called, then
`client.close()` will be skipped, which leads to the `bad_weak_ptr`
error.

P.S. Currently we have to call `client.close()` on a `Client` instance,
otherwise, the `bad_weak_ptr` will be thrown.

However, even if we caught the `SystemError` like:

```python
    try:
        msg = consumer.receive()
        # ...
    except SystemError:
        break
```

we would still see the following error:

```
terminate called after throwing an instance of 'std::system_error'
  what():  Operation not permitted
Aborted
```

### Modifications

- Wrap `ResultInterrupted` into the `pulsar.Interrupted` exception.
- Refactor the `waitForAsyncValue` and `waitForAsyncResult` functions
  and raise `pulsar.Interrupted` when `PyErr_CheckSignals` detects a
  signal.
- Add `InterruptedTest` to cover this case.
- Remove `future.h` since we now use `std::future` instead of the
  manually implemented `Future`.
- Fix the `examples/consumer.py` to support stopping by Ctrl+C.

(cherry picked from commit ec05f50bf489aef85532d61f577c62649a5b71a6)
diff --git a/examples/consumer.py b/examples/consumer.py
index 8c2985e..d698f48 100755
--- a/examples/consumer.py
+++ b/examples/consumer.py
@@ -29,8 +29,12 @@
                             })
 
 while True:
-    msg = consumer.receive()
-    print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
-    consumer.acknowledge(msg)
+    try:
+        msg = consumer.receive()
+        print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
+        consumer.acknowledge(msg)
+    except pulsar.Interrupted:
+        print("Stop receiving messages")
+        break
 
 client.close()
diff --git a/pulsar/exceptions.py b/pulsar/exceptions.py
index d151564..1b425c8 100644
--- a/pulsar/exceptions.py
+++ b/pulsar/exceptions.py
@@ -25,4 +25,4 @@
     ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
     ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
     CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
-    NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
+    NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull, Interrupted
diff --git a/src/client.cc b/src/client.cc
index 206c4e2..0103309 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -24,73 +24,38 @@
 namespace py = pybind11;
 
 Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
-    Producer producer;
-
-    waitForAsyncValue(std::function<void(CreateProducerCallback)>([&](CreateProducerCallback callback) {
-                          client.createProducerAsync(topic, conf, callback);
-                      }),
-                      producer);
-
-    return producer;
+    return waitForAsyncValue<Producer>(
+        [&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
 }
 
 Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
                           const ConsumerConfiguration& conf) {
-    Consumer consumer;
-
-    waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
-                          client.subscribeAsync(topic, subscriptionName, conf, callback);
-                      }),
-                      consumer);
-
-    return consumer;
+    return waitForAsyncValue<Consumer>(
+        [&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
 }
 
 Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
                                  const std::string& subscriptionName, const ConsumerConfiguration& conf) {
-    Consumer consumer;
-
-    waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
-                          client.subscribeAsync(topics, subscriptionName, conf, callback);
-                      }),
-                      consumer);
-
-    return consumer;
+    return waitForAsyncValue<Consumer>(
+        [&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
 }
 
 Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
                                   const std::string& subscriptionName, const ConsumerConfiguration& conf) {
-    Consumer consumer;
-
-    waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
-                          client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
-                      }),
-                      consumer);
-
-    return consumer;
+    return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
+        client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
+    });
 }
 
 Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
-    Reader reader;
-
-    waitForAsyncValue(std::function<void(ReaderCallback)>([&](ReaderCallback callback) {
-                          client.createReaderAsync(topic, startMessageId, conf, callback);
-                      }),
-                      reader);
-
-    return reader;
+    return waitForAsyncValue<Reader>(
+        [&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
 }
 
 std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
-    std::vector<std::string> partitions;
-
-    waitForAsyncValue(std::function<void(GetPartitionsCallback)>([&](GetPartitionsCallback callback) {
-                          client.getPartitionsForTopicAsync(topic, callback);
-                      }),
-                      partitions);
-
-    return partitions;
+    return waitForAsyncValue<std::vector<std::string>>(
+        [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
 }
 
 void Client_close(Client& client) {
diff --git a/src/consumer.cc b/src/consumer.cc
index 972bd0b..4b44775 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -29,13 +29,7 @@
 }
 
 Message Consumer_receive(Consumer& consumer) {
-    Message msg;
-
-    waitForAsyncValue(std::function<void(ReceiveCallback)>(
-                          [&consumer](ReceiveCallback callback) { consumer.receiveAsync(callback); }),
-                      msg);
-
-    return msg;
+    return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
 }
 
 Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
@@ -59,32 +53,27 @@
 void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.acknowledgeAsync(msgId, nullptr);
+    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.negativeAcknowledge(msg);
+    Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.negativeAcknowledge(msgId);
+    Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.acknowledgeCumulativeAsync(msg, nullptr);
+    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.acknowledgeCumulativeAsync(msgId, nullptr);
+    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
     Py_END_ALLOW_THREADS
 }
 
diff --git a/src/future.h b/src/future.h
deleted file mode 100644
index 6754c89..0000000
--- a/src/future.h
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_FUTURE_H_
-#define LIB_FUTURE_H_
-
-#include <functional>
-#include <mutex>
-#include <memory>
-#include <condition_variable>
-
-#include <list>
-
-typedef std::unique_lock<std::mutex> Lock;
-
-namespace pulsar {
-
-template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
-   public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
-
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            lock.unlock();
-            callback(state->result, state->value);
-        } else {
-            state->listeners.push_back(callback);
-        }
-
-        return *this;
-    }
-
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
-        }
-
-        result = state->value;
-        return state->result;
-    }
-
-    template <typename Duration>
-    bool get(Result& res, Type& value, Duration d) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) {
-                    // Timeout while waiting for the future to complete
-                    return false;
-                }
-            }
-        }
-
-        value = state->value;
-        res = state->result;
-        return true;
-    }
-
-   private:
-    typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
-    Future(InternalStatePtr state) : state_(state) {}
-
-    std::shared_ptr<InternalState<Result, Type> > state_;
-
-    template <typename U, typename V>
-    friend class Promise;
-};
-
-template <typename Result, typename Type>
-class Promise {
-   public:
-    Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
-
-    bool setValue(const Type& value) const {
-        static Result DEFAULT_RESULT;
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            return false;
-        }
-
-        state->value = value;
-        state->result = DEFAULT_RESULT;
-        state->complete = true;
-
-        decltype(state->listeners) listeners;
-        listeners.swap(state->listeners);
-
-        lock.unlock();
-
-        for (auto& callback : listeners) {
-            callback(DEFAULT_RESULT, value);
-        }
-
-        state->condition.notify_all();
-        return true;
-    }
-
-    bool setFailed(Result result) const {
-        static Type DEFAULT_VALUE;
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            return false;
-        }
-
-        state->result = result;
-        state->complete = true;
-
-        decltype(state->listeners) listeners;
-        listeners.swap(state->listeners);
-
-        lock.unlock();
-
-        for (auto& callback : listeners) {
-            callback(result, DEFAULT_VALUE);
-        }
-
-        state->condition.notify_all();
-        return true;
-    }
-
-    bool isComplete() const {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-        return state->complete;
-    }
-
-    Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
-
-   private:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
-    std::shared_ptr<InternalState<Result, Type> > state_;
-};
-
-class Void {};
-
-} /* namespace pulsar */
-
-#endif /* LIB_FUTURE_H_ */
diff --git a/src/producer.cc b/src/producer.cc
index 1dd5a76..7027185 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -25,21 +25,15 @@
 namespace py = pybind11;
 
 MessageId Producer_send(Producer& producer, const Message& message) {
-    MessageId messageId;
-
-    waitForAsyncValue(std::function<void(SendCallback)>(
-                          [&](SendCallback callback) { producer.sendAsync(message, callback); }),
-                      messageId);
-
-    return messageId;
+    return waitForAsyncValue<MessageId>(
+        [&](SendCallback callback) { producer.sendAsync(message, callback); });
 }
 
 void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
-    Py_BEGIN_ALLOW_THREADS
-    producer.sendAsync(msg, callback);
+    Py_BEGIN_ALLOW_THREADS producer.sendAsync(msg, callback);
     Py_END_ALLOW_THREADS
 
-    if (PyErr_CheckSignals() == -1) {
+        if (PyErr_CheckSignals() == -1) {
         PyErr_SetInterrupt();
     }
 }
diff --git a/src/reader.cc b/src/reader.cc
index 7194c29..0126f3f 100644
--- a/src/reader.cc
+++ b/src/reader.cc
@@ -62,14 +62,8 @@
 }
 
 bool Reader_hasMessageAvailable(Reader& reader) {
-    bool available = false;
-
-    waitForAsyncValue(
-        std::function<void(HasMessageAvailableCallback)>(
-            [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }),
-        available);
-
-    return available;
+    return waitForAsyncValue<bool>(
+        [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); });
 }
 
 void Reader_close(Reader& reader) {
diff --git a/src/utils.cc b/src/utils.cc
index cf8f6f4..8ebc3f9 100644
--- a/src/utils.cc
+++ b/src/utils.cc
@@ -20,28 +20,29 @@
 #include "utils.h"
 
 void waitForAsyncResult(std::function<void(ResultCallback)> func) {
-    Result res = ResultOk;
-    bool b;
-    Promise<bool, Result> promise;
-    Future<bool, Result> future = promise.getFuture();
+    auto promise = std::make_shared<std::promise<Result>>();
+    func([promise](Result result) { promise->set_value(result); });
+    internal::waitForResult(*promise);
+}
 
-    Py_BEGIN_ALLOW_THREADS func(WaitForCallback(promise));
-    Py_END_ALLOW_THREADS
+namespace internal {
 
-        bool isComplete;
+void waitForResult(std::promise<pulsar::Result>& promise) {
+    auto future = promise.get_future();
     while (true) {
-        // Check periodically for Python signals
-        Py_BEGIN_ALLOW_THREADS isComplete = future.get(b, std::ref(res), std::chrono::milliseconds(100));
-        Py_END_ALLOW_THREADS
-
-            if (isComplete) {
-            CHECK_RESULT(res);
-            return;
+        {
+            py::gil_scoped_release release;
+            auto status = future.wait_for(std::chrono::milliseconds(100));
+            if (status == std::future_status::ready) {
+                CHECK_RESULT(future.get());
+                return;
+            }
         }
-
-        if (PyErr_CheckSignals() == -1) {
-            PyErr_SetInterrupt();
-            return;
+        py::gil_scoped_acquire acquire;
+        if (PyErr_CheckSignals() != 0) {
+            raiseException(ResultInterrupted);
         }
     }
 }
+
+}  // namespace internal
diff --git a/src/utils.h b/src/utils.h
index fb700c6..bbe202e 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -21,12 +21,14 @@
 
 #include <pulsar/Client.h>
 #include <pulsar/MessageBatch.h>
+#include <chrono>
 #include <exception>
-#include <Python.h>
+#include <future>
+#include <pybind11/pybind11.h>
 #include "exceptions.h"
-#include "future.h"
 
 using namespace pulsar;
+namespace py = pybind11;
 
 inline void CHECK_RESULT(Result res) {
     if (res != ResultOk) {
@@ -34,56 +36,26 @@
     }
 }
 
-struct WaitForCallback {
-    Promise<bool, Result> m_promise;
+namespace internal {
 
-    WaitForCallback(Promise<bool, Result> promise) : m_promise(promise) {}
+void waitForResult(std::promise<pulsar::Result>& promise);
 
-    void operator()(Result result) { m_promise.setValue(result); }
-};
-
-template <typename T>
-struct WaitForCallbackValue {
-    Promise<Result, T>& m_promise;
-
-    WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
-
-    void operator()(Result result, const T& value) {
-        if (result == ResultOk) {
-            m_promise.setValue(value);
-        } else {
-            m_promise.setFailed(result);
-        }
-    }
-};
+}  // namespace internal
 
 void waitForAsyncResult(std::function<void(ResultCallback)> func);
 
-template <typename T, typename Callback>
-inline void waitForAsyncValue(std::function<void(Callback)> func, T& value) {
-    Result res = ResultOk;
-    Promise<Result, T> promise;
-    Future<Result, T> future = promise.getFuture();
+template <typename T>
+inline T waitForAsyncValue(std::function<void(std::function<void(Result, const T&)>)> func) {
+    auto resultPromise = std::make_shared<std::promise<Result>>();
+    auto valuePromise = std::make_shared<std::promise<T>>();
 
-    Py_BEGIN_ALLOW_THREADS func(WaitForCallbackValue<T>(promise));
-    Py_END_ALLOW_THREADS
+    func([resultPromise, valuePromise](Result result, const T& value) {
+        valuePromise->set_value(value);
+        resultPromise->set_value(result);
+    });
 
-        bool isComplete;
-    while (true) {
-        // Check periodically for Python signals
-        Py_BEGIN_ALLOW_THREADS isComplete = future.get(res, std::ref(value), std::chrono::milliseconds(100));
-        Py_END_ALLOW_THREADS
-
-            if (isComplete) {
-            CHECK_RESULT(res);
-            return;
-        }
-
-        if (PyErr_CheckSignals() == -1) {
-            PyErr_SetInterrupt();
-            return;
-        }
-    }
+    internal::waitForResult(*resultPromise);
+    return valuePromise->get_future().get();
 }
 
 struct CryptoKeyReaderWrapper {
diff --git a/tests/interrupted_test.py b/tests/interrupted_test.py
new file mode 100644
index 0000000..6d61f99
--- /dev/null
+++ b/tests/interrupted_test.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import pulsar
+import signal
+import time
+import threading
+
+class InterruptedTest(TestCase):
+
+    service_url = 'pulsar://localhost:6650'
+
+    def test_sigint(self):
+        def thread_function():
+            time.sleep(1)
+            signal.raise_signal(signal.SIGINT)
+
+        client = pulsar.Client(self.service_url)
+        consumer = client.subscribe('test-sigint', "my-sub")
+        thread = threading.Thread(target=thread_function)
+        thread.start()
+        
+        start = time.time()
+        with self.assertRaises(pulsar.Interrupted):
+            consumer.receive()
+        finish = time.time()
+        print(f"time: {finish - start}")
+        self.assertGreater(finish - start, 1)
+        self.assertLess(finish - start, 1.5)
+        client.close()
+
+if __name__ == '__main__':
+    main()
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 13349f9..5168f94 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -24,4 +24,5 @@
 cd $ROOT_DIR/tests
 
 python3 custom_logger_test.py
+python3 interrupted_test.py
 python3 pulsar_test.py