Allow configuring custom offset-store
diff --git a/api/rocketmq/DefaultMQPushConsumer.h b/api/rocketmq/DefaultMQPushConsumer.h
index 03ac7d1..0693b8d 100644
--- a/api/rocketmq/DefaultMQPushConsumer.h
+++ b/api/rocketmq/DefaultMQPushConsumer.h
@@ -28,6 +28,7 @@
#include "MessageListener.h"
#include "rocketmq/Executor.h"
#include "rocketmq/MessageModel.h"
+#include "rocketmq/OffsetStore.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -110,6 +111,8 @@
std::string groupName() const;
+ void setOffsetStore(std::unique_ptr<OffsetStore> offset_store);
+
private:
std::shared_ptr<PushConsumerImpl> impl_;
};
diff --git a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
index 7a50dac..890221c 100644
--- a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
+++ b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
@@ -127,4 +127,8 @@
return impl_->getGroupName();
}
+void DefaultMQPushConsumer::setOffsetStore(std::unique_ptr<OffsetStore> offset_store) {
+ impl_->setOffsetStore(std::move(offset_store));
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/PushConsumer.h b/src/main/cpp/rocketmq/include/PushConsumer.h
index fee3136..cf2da65 100644
--- a/src/main/cpp/rocketmq/include/PushConsumer.h
+++ b/src/main/cpp/rocketmq/include/PushConsumer.h
@@ -25,6 +25,7 @@
#include "rocketmq/Executor.h"
#include "rocketmq/MessageListener.h"
#include "rocketmq/MessageModel.h"
+#include "rocketmq/OffsetStore.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -53,6 +54,8 @@
virtual bool receiveMessage(const MQMessageQueue& message_queue, const FilterExpression& filter_expression) = 0;
virtual MessageListener* messageListener() = 0;
+
+ virtual void setOffsetStore(std::unique_ptr<OffsetStore> offset_store) = 0;
};
using PushConsumerSharedPtr = std::shared_ptr<PushConsumer>;
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 238e377..399b87e 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -158,7 +158,7 @@
message_model_ = message_model;
}
- void offsetStore(std::unique_ptr<OffsetStore> offset_store) {
+ void setOffsetStore(std::unique_ptr<OffsetStore> offset_store) override {
offset_store_ = std::move(offset_store);
}
diff --git a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
index 63275ab..e4508ba 100644
--- a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
+++ b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
@@ -46,6 +46,8 @@
MOCK_METHOD(bool, receiveMessage, (const MQMessageQueue&, const FilterExpression&), (override));
MOCK_METHOD(MessageListener*, messageListener, (), (override));
+
+ MOCK_METHOD(void, setOffsetStore, (std::unique_ptr<OffsetStore>), (override));
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file