Fix consumer double close (#96)
We do `Unref` more than once in the old code, now we do `Unref` only if the consumer is closed.
diff --git a/src/Consumer.cc b/src/Consumer.cc
index e2ad7b2..225ed3e 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -68,7 +68,8 @@
if (listenerCallback->callback.Acquire() != napi_ok) {
return;
- };
+ }
+
MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
listenerCallback->callback.Release();
@@ -256,17 +257,25 @@
class ConsumerCloseWorker : public Napi::AsyncWorker {
public:
- ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+ ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
+ Consumer *consumer)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
- cConsumer(cConsumer) {}
+ cConsumer(cConsumer),
+ consumer(consumer) {}
+
~ConsumerCloseWorker() {}
void Execute() {
pulsar_consumer_pause_message_listener(this->cConsumer);
pulsar_result result = pulsar_consumer_close(this->cConsumer);
- if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+ if (result != pulsar_result_Ok) {
+ SetError(pulsar_result_str(result));
+ }
}
- void OnOK() { this->deferred.Resolve(Env().Null()); }
+ void OnOK() {
+ this->consumer->Cleanup();
+ this->deferred.Resolve(Env().Null());
+ }
void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value());
@@ -275,15 +284,19 @@
private:
Napi::Promise::Deferred deferred;
pulsar_consumer_t *cConsumer;
+ Consumer *consumer;
};
-Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
+void Consumer::Cleanup() {
if (this->listener) {
this->Unref();
+ this->listener = nullptr;
}
+}
+Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
- ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
+ ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer, this);
wk->Queue();
return deferred.Promise();
}
diff --git a/src/Consumer.h b/src/Consumer.h
index 63fb403..2c5f856 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -34,6 +34,7 @@
~Consumer();
void SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer);
void SetListenerCallback(ListenerCallback *listener);
+ void Cleanup();
private:
std::shared_ptr<CConsumerWrapper> wrapper;
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index 65bd792..e61120c 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -87,5 +87,22 @@
})).rejects.toThrow('NAck timeout should be greater than or equal to zero');
});
});
+
+ describe('Close', () => {
+ test('throws error on subsequent calls to close', async () => {
+ const consumer = await client.subscribe({
+ topic: 'persistent://public/default/my-topic',
+ subscription: 'sub1',
+ subscriptionType: 'Shared',
+ // Test with listener since it changes the flow of close
+ // and reproduces an issue
+ listener() {},
+ });
+
+ await expect(consumer.close()).resolves.toEqual(null);
+
+ await expect(consumer.close()).rejects.toThrow('Failed to close consumer: AlreadyClosed');
+ });
+ });
});
})();