Return MessageId as promise resolve at Producer#send (#137)
* feat: return MessageId as promise resolve at Producer#send
* test: delete unused conf from Produce/Consume and validate MessageId test
diff --git a/pulsar-version.txt b/pulsar-version.txt
index e70b452..24ba9a3 100755
--- a/pulsar-version.txt
+++ b/pulsar-version.txt
@@ -1 +1 @@
-2.6.0
+2.7.0
diff --git a/src/MessageId.cc b/src/MessageId.cc
index 729b481..ad9a195 100644
--- a/src/MessageId.cc
+++ b/src/MessageId.cc
@@ -58,6 +58,13 @@
return obj;
}
+Napi::Object MessageId::NewInstance(pulsar_message_id_t *cMessageId) {
+ Napi::Object obj = constructor.New({});
+ MessageId *msgId = Unwrap(obj);
+ msgId->cMessageId = cMessageId;
+ return obj;
+}
+
void MessageId::Free(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
MessageId *msgId = Unwrap(obj);
diff --git a/src/MessageId.h b/src/MessageId.h
index 35fdbee..e4df2db 100644
--- a/src/MessageId.h
+++ b/src/MessageId.h
@@ -28,6 +28,7 @@
public:
static Napi::Object Init(Napi::Env env, Napi::Object exports);
static Napi::Object NewInstance(Napi::Value arg);
+ static Napi::Object NewInstance(pulsar_message_id_t *cMessageId);
static Napi::Object NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage);
static Napi::Value Earliest(const Napi::CallbackInfo &info);
static Napi::Value Latest(const Napi::CallbackInfo &info);
diff --git a/src/Producer.cc b/src/Producer.cc
index f623c34..b5881ac 100644
--- a/src/Producer.cc
+++ b/src/Producer.cc
@@ -20,6 +20,7 @@
#include "Producer.h"
#include "ProducerConfig.h"
#include "Message.h"
+#include "MessageId.h"
#include <pulsar/c/result.h>
#include <memory>
Napi::FunctionReference Producer::constructor;
@@ -103,7 +104,10 @@
pulsar_result result = pulsar_producer_send(this->cProducer, this->cMessage);
if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
}
- void OnOK() { this->deferred.Resolve(Env().Null()); }
+ void OnOK() {
+ Napi::Object messageId = MessageId::NewInstance(pulsar_message_get_message_id(this->cMessage));
+ this->deferred.Resolve(messageId);
+ }
void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to send message: ") + e.Message()).Value());
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index e7fb8f6..8ca3f84 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -665,5 +665,53 @@
await consumer.close();
await client.close();
});
+
+ test('Produce/Consume and validate MessageId', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/produce-consume-message-id';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ const consumer = await client.subscribe({
+ topic,
+ subscription: 'sub1',
+ });
+
+ expect(consumer).not.toBeNull();
+
+ const messages = [];
+ const messageIds = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ const msgId = await producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ messageIds.push(msgId.toString());
+ }
+
+ const results = [];
+ const resultIds = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await consumer.receive();
+ consumer.acknowledge(msg);
+ results.push(msg.getData().toString());
+ resultIds.push(msg.getMessageId().toString());
+ }
+ expect(lodash.difference(messages, results)).toEqual([]);
+ expect(lodash.difference(messageIds, resultIds)).toEqual([]);
+
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
});
})();