Support set batchingMaxAllowedSizeInBytes on producer batch configuration (#436)

diff --git a/index.d.ts b/index.d.ts
index 72c89af..e9bf8e8 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -69,6 +69,7 @@
   accessMode?: ProducerAccessMode;
   batchingType?: ProducerBatchType;
   messageRouter?: MessageRouter;
+  batchingMaxAllowedSizeInBytes?: number;
 }
 
 export class Producer {
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 3889120..83afb9c 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -40,6 +40,7 @@
 static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
 static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
 static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
+static const std::string CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES = "batchingMaxAllowedSizeInBytes";
 static const std::string CFG_SCHEMA = "schema";
 static const std::string CFG_PROPS = "properties";
 static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
@@ -201,6 +202,16 @@
     }
   }
 
+  if (producerConfig.Has(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES) &&
+      producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).IsNumber()) {
+    int64_t batchingMaxAllowedSizeInBytes =
+        producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).ToNumber().Int64Value();
+    if (batchingMaxAllowedSizeInBytes > 0) {
+      pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+          this->cProducerConfig.get(), (unsigned long)batchingMaxAllowedSizeInBytes);
+    }
+  }
+
   if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) {
     SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
     schemaInfo->SetProducerSchema(this->cProducerConfig);
diff --git a/tests/producer.test.js b/tests/producer.test.js
index d094505..061d827 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -239,5 +239,79 @@
         expect(partitions.size).toBe(1);
       }, 30000);
     });
+    describe('Batching', () => {
+      function getBatchIndex(msgId) {
+        const parts = msgId.toString().split(':');
+        if (parts.length > 3) {
+          return Number(parts[3]);
+        }
+        return -1;
+      }
+
+      test('should batch messages based on max allowed size in bytes', async () => {
+        const topicName = `persistent://public/default/test-batch-size-in-bytes-${Date.now()}`;
+        const subName = 'subscription-name';
+        const numOfMessages = 30;
+        const prefix = '12345678'; // 8 bytes message prefix
+
+        let producer;
+        let consumer;
+
+        try {
+          // 1. Setup Producer with batching enabled and size limit
+          producer = await client.createProducer({
+            topic: topicName,
+            compressionType: 'LZ4',
+            batchingEnabled: true,
+            batchingMaxMessages: 10000,
+            batchingMaxAllowedSizeInBytes: 20,
+          });
+
+          // 2. Setup Consumer
+          consumer = await client.subscribe({
+            topic: topicName,
+            subscription: subName,
+          });
+
+          // 3. Send messages asynchronously
+          const sendPromises = [];
+          for (let i = 0; i < numOfMessages; i += 1) {
+            const messageContent = prefix + i;
+            const msg = {
+              data: Buffer.from(messageContent),
+              properties: { msgIndex: String(i) },
+            };
+            sendPromises.push(producer.send(msg));
+          }
+          await producer.flush();
+          await Promise.all(sendPromises);
+
+          // 4. Receive messages and run assertions
+          let receivedCount = 0;
+          for (let i = 0; i < numOfMessages; i += 1) {
+            const receivedMsg = await consumer.receive(5000);
+            const expectedMessageContent = prefix + i;
+
+            // Assert that batchIndex is 0 or 1, since batch size should be 2
+            const batchIndex = getBatchIndex(receivedMsg.getMessageId());
+            expect(batchIndex).toBeLessThan(2);
+
+            // Assert message properties and content
+            expect(receivedMsg.getProperties().msgIndex).toBe(String(i));
+            expect(receivedMsg.getData().toString()).toBe(expectedMessageContent);
+
+            await consumer.acknowledge(receivedMsg);
+            receivedCount += 1;
+          }
+
+          // 5. Final check on the number of consumed messages
+          expect(receivedCount).toBe(numOfMessages);
+        } finally {
+          // 6. Cleanup
+          if (producer) await producer.close();
+          if (consumer) await consumer.close();
+        }
+      }, 30000);
+    });
   });
 })();