Allow to specify receiving message await duration (#84)

diff --git a/cpp/include/rocketmq/SimpleConsumer.h b/cpp/include/rocketmq/SimpleConsumer.h
index 0f7030f..0550f73 100644
--- a/cpp/include/rocketmq/SimpleConsumer.h
+++ b/cpp/include/rocketmq/SimpleConsumer.h
@@ -97,6 +97,11 @@
     return *this;
   }
 
+  SimpleConsumerBuilder& withAwaitDuration(std::chrono::milliseconds await_duration) {
+    await_duration_ = await_duration;
+    return *this;
+  }
+
   SimpleConsumer build();
 
 private:
@@ -106,6 +111,8 @@
   Configuration configuration_;
 
   std::unordered_map<std::string, FilterExpression> subscriptions_;
+
+  std::chrono::milliseconds await_duration_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 03204c6..7cdd665 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -17,11 +17,12 @@
 
 #include "ReceiveMessageStreamReader.h"
 
-#include "apache/rocketmq/v2/definition.pb.h"
+#include <chrono>
 
+#include "apache/rocketmq/v2/definition.pb.h"
+#include "rocketmq/ErrorCode.h"
 #include "rocketmq/Logger.h"
 #include "spdlog/spdlog.h"
-#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -38,7 +39,7 @@
   for (const auto& entry : context_->metadata) {
     client_context_.AddMetadata(entry.first, entry.second);
   }
-  client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout);
+  client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout + std::chrono::milliseconds(500));
 
   stub_->async()->ReceiveMessage(&client_context_, &request_, this);
   result_.source_host = peer_address_;
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp b/cpp/source/rocketmq/SimpleConsumer.cpp
index 0c902d4..d7e94ae 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -129,6 +129,7 @@
   simple_consumer.impl_->withRequestTimeout(configuration_.requestTimeout());
   simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
   simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
+  simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
 
   for (const auto& entry : subscriptions_) {
     simple_consumer.impl_->subscribe(entry.first, entry.second);
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 6a2dc75..1408d06 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -324,9 +324,8 @@
     callback(ec, result.messages);
   };
 
-  auto timeout = absl::ToChronoMilliseconds(config().subscriber.polling_timeout);
-  SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", timeout.count());
-  manager()->receiveMessage(target, metadata, request, timeout, cb);
+  SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", MixAll::millisecondsOf(long_polling_duration_));
+  manager()->receiveMessage(target, metadata, request, long_polling_duration_, cb);
 }
 
 void SimpleConsumerImpl::wrapAckRequest(const Message& message, AckMessageRequest& request) {
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 9fac1bb..7ef3d8e 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -16,6 +16,8 @@
  */
 #pragma once
 
+#include <chrono>
+
 #include "ClientImpl.h"
 #include "rocketmq/FilterExpression.h"
 #include "rocketmq/SimpleConsumer.h"
@@ -55,6 +57,10 @@
                                std::chrono::milliseconds duration,
                                ChangeInvisibleDurationCallback callback);
 
+  void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) {
+    long_polling_duration_ = receive_timeout;
+  }
+
 protected:
   void topicsOfInterest(std::vector<std::string> topics) override;
 
@@ -72,6 +78,8 @@
 
   static thread_local std::size_t assignment_index_;
 
+  std::chrono::milliseconds long_polling_duration_{MixAll::DefaultReceiveMessageTimeout};
+
   void refreshAssignments0() LOCKS_EXCLUDED(topic_assignments_mtx_, subscriptions_mtx_);
 
   void refreshAssignments() LOCKS_EXCLUDED(subscriptions_mtx_);