Support set batchingMaxAllowedSizeInBytes on producer batch configuration (#436)
diff --git a/index.d.ts b/index.d.ts
index 72c89af..e9bf8e8 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -69,6 +69,7 @@
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
+ batchingMaxAllowedSizeInBytes?: number;
}
export class Producer {
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 3889120..83afb9c 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -40,6 +40,7 @@
static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
+static const std::string CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES = "batchingMaxAllowedSizeInBytes";
static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PROPS = "properties";
static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
@@ -201,6 +202,16 @@
}
}
+ if (producerConfig.Has(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES) &&
+ producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).IsNumber()) {
+ int64_t batchingMaxAllowedSizeInBytes =
+ producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).ToNumber().Int64Value();
+ if (batchingMaxAllowedSizeInBytes > 0) {
+ pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+ this->cProducerConfig.get(), (unsigned long)batchingMaxAllowedSizeInBytes);
+ }
+ }
+
if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) {
SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
schemaInfo->SetProducerSchema(this->cProducerConfig);
diff --git a/tests/producer.test.js b/tests/producer.test.js
index d094505..061d827 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -239,5 +239,79 @@
expect(partitions.size).toBe(1);
}, 30000);
});
+ describe('Batching', () => {
+ function getBatchIndex(msgId) {
+ const parts = msgId.toString().split(':');
+ if (parts.length > 3) {
+ return Number(parts[3]);
+ }
+ return -1;
+ }
+
+ test('should batch messages based on max allowed size in bytes', async () => {
+ const topicName = `persistent://public/default/test-batch-size-in-bytes-${Date.now()}`;
+ const subName = 'subscription-name';
+ const numOfMessages = 30;
+ const prefix = '12345678'; // 8 bytes message prefix
+
+ let producer;
+ let consumer;
+
+ try {
+ // 1. Setup Producer with batching enabled and size limit
+ producer = await client.createProducer({
+ topic: topicName,
+ compressionType: 'LZ4',
+ batchingEnabled: true,
+ batchingMaxMessages: 10000,
+ batchingMaxAllowedSizeInBytes: 20,
+ });
+
+ // 2. Setup Consumer
+ consumer = await client.subscribe({
+ topic: topicName,
+ subscription: subName,
+ });
+
+ // 3. Send messages asynchronously
+ const sendPromises = [];
+ for (let i = 0; i < numOfMessages; i += 1) {
+ const messageContent = prefix + i;
+ const msg = {
+ data: Buffer.from(messageContent),
+ properties: { msgIndex: String(i) },
+ };
+ sendPromises.push(producer.send(msg));
+ }
+ await producer.flush();
+ await Promise.all(sendPromises);
+
+ // 4. Receive messages and run assertions
+ let receivedCount = 0;
+ for (let i = 0; i < numOfMessages; i += 1) {
+ const receivedMsg = await consumer.receive(5000);
+ const expectedMessageContent = prefix + i;
+
+ // Assert that batchIndex is 0 or 1, since batch size should be 2
+ const batchIndex = getBatchIndex(receivedMsg.getMessageId());
+ expect(batchIndex).toBeLessThan(2);
+
+ // Assert message properties and content
+ expect(receivedMsg.getProperties().msgIndex).toBe(String(i));
+ expect(receivedMsg.getData().toString()).toBe(expectedMessageContent);
+
+ await consumer.acknowledge(receivedMsg);
+ receivedCount += 1;
+ }
+
+ // 5. Final check on the number of consumed messages
+ expect(receivedCount).toBe(numOfMessages);
+ } finally {
+ // 6. Cleanup
+ if (producer) await producer.close();
+ if (consumer) await consumer.close();
+ }
+ }, 30000);
+ });
});
})();