Allow to specify receiving message await duration (#84)
diff --git a/cpp/include/rocketmq/SimpleConsumer.h b/cpp/include/rocketmq/SimpleConsumer.h
index 0f7030f..0550f73 100644
--- a/cpp/include/rocketmq/SimpleConsumer.h
+++ b/cpp/include/rocketmq/SimpleConsumer.h
@@ -97,6 +97,11 @@
return *this;
}
+ SimpleConsumerBuilder& withAwaitDuration(std::chrono::milliseconds await_duration) {
+ await_duration_ = await_duration;
+ return *this;
+ }
+
SimpleConsumer build();
private:
@@ -106,6 +111,8 @@
Configuration configuration_;
std::unordered_map<std::string, FilterExpression> subscriptions_;
+
+ std::chrono::milliseconds await_duration_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 03204c6..7cdd665 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -17,11 +17,12 @@
#include "ReceiveMessageStreamReader.h"
-#include "apache/rocketmq/v2/definition.pb.h"
+#include <chrono>
+#include "apache/rocketmq/v2/definition.pb.h"
+#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
-#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -38,7 +39,7 @@
for (const auto& entry : context_->metadata) {
client_context_.AddMetadata(entry.first, entry.second);
}
- client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout);
+ client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout + std::chrono::milliseconds(500));
stub_->async()->ReceiveMessage(&client_context_, &request_, this);
result_.source_host = peer_address_;
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp b/cpp/source/rocketmq/SimpleConsumer.cpp
index 0c902d4..d7e94ae 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -129,6 +129,7 @@
simple_consumer.impl_->withRequestTimeout(configuration_.requestTimeout());
simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
+ simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
for (const auto& entry : subscriptions_) {
simple_consumer.impl_->subscribe(entry.first, entry.second);
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 6a2dc75..1408d06 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -324,9 +324,8 @@
callback(ec, result.messages);
};
- auto timeout = absl::ToChronoMilliseconds(config().subscriber.polling_timeout);
- SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", timeout.count());
- manager()->receiveMessage(target, metadata, request, timeout, cb);
+ SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", MixAll::millisecondsOf(long_polling_duration_));
+ manager()->receiveMessage(target, metadata, request, long_polling_duration_, cb);
}
void SimpleConsumerImpl::wrapAckRequest(const Message& message, AckMessageRequest& request) {
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 9fac1bb..7ef3d8e 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -16,6 +16,8 @@
*/
#pragma once
+#include <chrono>
+
#include "ClientImpl.h"
#include "rocketmq/FilterExpression.h"
#include "rocketmq/SimpleConsumer.h"
@@ -55,6 +57,10 @@
std::chrono::milliseconds duration,
ChangeInvisibleDurationCallback callback);
+ void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) {
+ long_polling_duration_ = receive_timeout;
+ }
+
protected:
void topicsOfInterest(std::vector<std::string> topics) override;
@@ -72,6 +78,8 @@
static thread_local std::size_t assignment_index_;
+ std::chrono::milliseconds long_polling_duration_{MixAll::DefaultReceiveMessageTimeout};
+
void refreshAssignments0() LOCKS_EXCLUDED(topic_assignments_mtx_, subscriptions_mtx_);
void refreshAssignments() LOCKS_EXCLUDED(subscriptions_mtx_);