Includes Support for ZSTD and SNAPPY Compression (#121)
* Includes Support for ZSTD and SNAPPY Compression
A version bump to the latest version of the Pulsar C++ library (2.6.1) was required to make use of the compression types. As the latest version of the C++ library requires GLIBCXX_3.4.22, the testing suite code was altered to automatically update the Docker image with the latest version of libstdc++6 and gcc-4.9 before executing the library tests.
* Update pulsar-version.txt
Co-authored-by: savearray2 <savearray2>
diff --git a/pulsar-version.txt b/pulsar-version.txt
index 437459c..e70b452 100755
--- a/pulsar-version.txt
+++ b/pulsar-version.txt
@@ -1 +1 @@
-2.5.0
+2.6.0
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 7f2accc..5e2994f 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -30,6 +30,10 @@
for pkg in apache-pulsar-client-dev.deb apache-pulsar-client.deb;do
curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg
done;
+apt-get -y update
+apt-get install -y software-properties-common
+add-apt-repository ppa:ubuntu-toolchain-r/test && apt-get -y update
+apt-get -y install gcc-4.9 && apt-get upgrade -y libstdc++6
apt install $PULSAR_PKG_DIR/apache-pulsar-client*.deb
./pulsar-test-service-start.sh
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 76fd25c..b9c0a75 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -46,8 +46,12 @@
{"JavaStringHash", pulsar_JavaStringHash},
};
-static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {{"Zlib", pulsar_CompressionZLib},
- {"LZ4", pulsar_CompressionLZ4}};
+static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {
+ {"Zlib", pulsar_CompressionZLib},
+ {"LZ4", pulsar_CompressionLZ4},
+ {"ZSTD", pulsar_CompressionZSTD},
+ {"SNAPPY", pulsar_CompressionSNAPPY},
+};
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
this->cProducerConfig = pulsar_producer_configuration_create();
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 1a6b60c..b17e67e 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -438,5 +438,53 @@
await producer.close();
await client.close();
});
+
+ test('Produce/Read (Compression)', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ expect(client).not.toBeNull();
+
+ const topic = 'persistent://public/default/produce-read-compression';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ compressionType: 'ZSTD',
+ });
+ expect(producer).not.toBeNull();
+
+ const reader = await client.createReader({
+ topic,
+ startMessageId: Pulsar.MessageId.latest(),
+ });
+ expect(reader).not.toBeNull();
+
+ const messages = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ expect(reader.hasNext()).toBe(true);
+
+ const results = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await reader.readNext();
+ results.push(msg.getData().toString());
+ }
+ expect(lodash.difference(messages, results)).toEqual([]);
+
+ expect(reader.hasNext()).toBe(false);
+
+ await producer.close();
+ await reader.close();
+ await client.close();
+ });
});
})();