fix: ReaderListenerProxy will make a segfault (#376)
diff --git a/src/Reader.cc b/src/Reader.cc
index bd6c7f0..4fe7c63 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -60,17 +60,19 @@
void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Reader *reader = data->reader;
-
- Napi::Value ret = jsCallback.Call({msg, reader->Value()});
- if (ret.IsPromise()) {
- Napi::Promise promise = ret.As<Napi::Promise>();
- Napi::Value thenValue = promise.Get("then");
- if (thenValue.IsFunction()) {
- Napi::Function then = thenValue.As<Napi::Function>();
- Napi::Function callback =
- Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
- then.Call(promise, {callback});
- return;
+ // `reader` might be null in certain cases, segmentation fault might happend without this null check.
+ if (reader) {
+ Napi::Value ret = jsCallback.Call({msg, reader->Value()});
+ if (ret.IsPromise()) {
+ Napi::Promise promise = ret.As<Napi::Promise>();
+ Napi::Value thenValue = promise.Get("then");
+ if (thenValue.IsFunction()) {
+ Napi::Function then = thenValue.As<Napi::Function>();
+ Napi::Function callback =
+ Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
+ then.Call(promise, {callback});
+ return;
+ }
}
}
data->callback();
diff --git a/tests/reader.test.js b/tests/reader.test.js
index 673ae60..56d1b48 100644
--- a/tests/reader.test.js
+++ b/tests/reader.test.js
@@ -130,5 +130,46 @@
await reader.close();
await client.close();
});
+
+ test('Reader should not throw segmentation fault when create and close', async () => {
+ const NUM_ITS = 1000;
+ const its = Array.from({ length: NUM_ITS }, (_, i) => i);
+
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ });
+
+ const producer = await client.createProducer({
+ topic: 'persistent://public/default/my-topic',
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+
+ // Send messages
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ console.log(`Sent message: ${msg}`);
+ }
+ await producer.flush();
+
+ await Promise.all(
+ its.map(async () => {
+ const reader = await client.createReader({
+ topic: 'persistent://public/default/my-topic',
+ startMessageId: Pulsar.MessageId.earliest(),
+ listener: (message) => {
+ console.log(message.getData().toString());
+ },
+ });
+ await reader.close();
+ }),
+ );
+ await producer.close();
+ await client.close();
+ expect(true).toBe(true);
+ });
});
})();