Add read compacted option to consumer options. (#73)
* 1.1.0-rc.1
* Add read compacted option to consumer options.
* Adds read compacted to reader.
Co-authored-by: hrsakai <hsakai@yahoo-corp.jp>
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 10963bd..1f734df 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -35,6 +35,7 @@
static const std::string CFG_CONSUMER_NAME = "consumerName";
static const std::string CFG_PROPS = "properties";
static const std::string CFG_LISTENER = "listener";
+static const std::string CFG_READ_COMPACTED = "readCompacted";
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
@@ -163,6 +164,13 @@
pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, &MessageListener,
this->listener);
}
+
+ if (consumerConfig.Has(CFG_READ_COMPACTED) && consumerConfig.Get(CFG_READ_COMPACTED).IsBoolean()) {
+ bool readCompacted = consumerConfig.Get(CFG_READ_COMPACTED).ToBoolean();
+ if (readCompacted) {
+ pulsar_consumer_set_read_compacted(this->cConsumerConfig, 1);
+ }
+ }
}
ConsumerConfig::~ConsumerConfig() {
diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc
index 7e15807..040d98f 100644
--- a/src/ReaderConfig.cc
+++ b/src/ReaderConfig.cc
@@ -26,6 +26,7 @@
static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
static const std::string CFG_READER_NAME = "readerName";
static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix";
+static const std::string CFG_READ_COMPACTED = "readCompacted";
ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) {
this->cReaderConfig = pulsar_reader_configuration_create();
@@ -59,6 +60,13 @@
if (!subscriptionRolePrefix.empty())
pulsar_reader_configuration_set_reader_name(this->cReaderConfig, subscriptionRolePrefix.c_str());
}
+
+ if (readerConfig.Has(CFG_READ_COMPACTED) && readerConfig.Get(CFG_READ_COMPACTED).IsBoolean()) {
+ bool readCompacted = readerConfig.Get(CFG_READ_COMPACTED).ToBoolean();
+ if (readCompacted) {
+ pulsar_reader_configuration_set_read_compacted(this->cReaderConfig, 1);
+ }
+ }
}
ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); }