Adapt to protocol v1.0-alpha1 (#29)

Adapt to protocol v1.0-alpha1, including the following changes
* Import latest protocol files
* Replace multiplex RPC with 3 separate RPCs, PollCommand, ReportThreadStackTrace, ReportMessageConsumptionVerificationResult
diff --git a/.bazelrc b/.bazelrc
index 0f8f862..1c53b01 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -117,7 +117,7 @@
 # Use remote cache, --config=remote_cache See https://docs.bazel.build/versions/main/guide.html#--config
 build:remote_cache --remote_cache=grpc://11.122.50.85:9092
 build:remote_cache --experimental_remote_downloader=grpc://11.122.50.85:9092
-test:remote_cache --remote_cache=http://11.122.50.85:9090
+test:remote_cache --remote_cache=grpc://11.122.50.85:9090
 test:remote_cache --remote_upload_local_results=true
 
 # Coverage options
diff --git a/proto/apache/rocketmq/v1/definition.proto b/proto/apache/rocketmq/v1/definition.proto
index c92aad3..b6cb24b 100644
--- a/proto/apache/rocketmq/v1/definition.proto
+++ b/proto/apache/rocketmq/v1/definition.proto
@@ -207,6 +207,21 @@
   reserved 3 to 64;
 }
 
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishment, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messags subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
 message Digest {
   DigestType type = 1;
   string checksum = 2;
@@ -229,7 +244,8 @@
   repeated string keys = 2;
 
   // Message identifier, client-side generated, remains unique.
-  // if message_id is empty, the send message request will be aborted with status `INVALID_ARGUMENT`
+  // if message_id is empty, the send message request will be aborted with
+  // status `INVALID_ARGUMENT`
   string message_id = 3;
 
   // Message body digest
@@ -296,13 +312,18 @@
 }
 
 message Message {
+
   Resource topic = 1;
+
   // User defined key-value pairs.
   // If user_attribute contains the reserved keys by RocketMQ,
-  // the send message request will be aborted with status `INVALID_ARGUMENT`. See below links for the reserved keys
+  // the send message request will be aborted with status `INVALID_ARGUMENT`.
+  // See below links for the reserved keys
   // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
   map<string, string> user_attribute = 2;
+
   SystemAttribute system_attribute = 3;
+
   bytes body = 4;
 
   reserved 5 to 64;
diff --git a/proto/apache/rocketmq/v1/service.proto b/proto/apache/rocketmq/v1/service.proto
index 6220f51..9d8e3b0 100644
--- a/proto/apache/rocketmq/v1/service.proto
+++ b/proto/apache/rocketmq/v1/service.proto
@@ -41,13 +41,26 @@
   reserved 7 to 64;
 }
 
-// A QueryRouteRequest requests a set of Partitions of the specific topic with
-// necessary route infos.
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxillary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable partitions, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
 message QueryRouteRequest {
   Resource topic = 1;
 
-  // The service access points used to issue QueryRouteRequest
-  // The QueryRouteResponse will indicate the adress of subsequent RPCs.
   Endpoints endpoints = 2;
 
   reserved 3 to 64;
@@ -55,6 +68,7 @@
 
 message QueryRouteResponse {
   ResponseCommon common = 1;
+
   repeated Partition partitions = 2;
 
   reserved 3 to 64;
@@ -264,7 +278,57 @@
   reserved 6 to 64;
 }
 
-message GenericPollingRequest {
+message NoopCommand {
+  reserved 1 to 64;
+}
+
+message PrintThreadStackTraceCommand {
+  string command_id = 1;
+
+  reserved 2 to 64;
+}
+
+message ReportThreadStackTraceRequest {
+  string command_id = 1;
+  string thread_stack_trace = 2;
+
+  reserved 3 to 64;
+}
+
+message ReportThreadStackTraceResponse {
+  ResponseCommon common = 1;
+
+  reserved 2 to 64;
+}
+
+message VerifyMessageConsumptionCommand {
+  string command_id = 1;
+  Message message = 2;
+
+  reserved 3 to 64;
+}
+
+message ReportMessageConsumptionResultRequest {
+  string command_id = 1;
+  google.rpc.Status status = 2;
+
+  reserved 3 to 64;
+}
+
+message ReportMessageConsumptionResultResponse {
+  ResponseCommon common = 1;
+
+  reserved 2 to 64;
+}
+
+message RecoverOrphanedTransactionCommand {
+  Message orphaned_transactional_message = 1;
+  string transaction_id = 2;
+
+  reserved 3 to 64;
+}
+
+message PollCommandRequest {
   string client_id = 1;
   repeated Resource topics = 2;
   oneof group {
@@ -275,63 +339,16 @@
   reserved 5 to 64;
 }
 
-message GenericPollingResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message PrintThreadStackRequest {
-  string mid = 1;
-
-  reserved 2 to 64;
-}
-
-message PrintThreadStackResponse {
-  ResponseCommon common = 1;
-  string mid = 2;
-  string stack_trace = 3;
-
-  reserved 4 to 64;
-}
-
-message VerifyMessageConsumptionRequest {
-  string mid = 1;
-  Message message = 2;
-
-  reserved 3 to 64;
-}
-
-message VerifyMessageConsumptionResponse {
-  string mid = 1;
-  ResponseCommon common = 2;
-
-  reserved 3 to 64;
-}
-
-message RecoverOrphanedTransactionRequest {
-  Message orphaned_transactional_message = 1;
-  string transaction_id = 2;
-
-  reserved 3 to 64;
-}
-
-message MultiplexingRequest {
+message PollCommandResponse {
   oneof type {
-    GenericPollingRequest polling_request = 1;
-    PrintThreadStackResponse print_thread_stack_response = 2;
-    VerifyMessageConsumptionResponse verify_message_consumption_response = 3;
-  }
-
-  reserved 4 to 64;
-}
-
-message MultiplexingResponse {
-  oneof type {
-    GenericPollingResponse polling_response = 1;
-    PrintThreadStackRequest print_thread_stack_request = 2;
-    VerifyMessageConsumptionRequest verify_message_consumption_request = 3;
-    RecoverOrphanedTransactionRequest recover_orphaned_transaction_request = 4;
+    // Default command when no new command need to be delivered.
+    NoopCommand noop_command = 1;
+    // Request client to print thread stack trace.
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+    // Request client to verify the consumption of the appointed message.
+    VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
   }
 
   reserved 5 to 64;
@@ -353,36 +370,58 @@
   reserved 2 to 64;
 }
 
-// For all the rpcs in MessagingService may return below erros:
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
 //
-// If the request doesn't have a valid authentication credentials, returns `UNAUTHENTICATED`.
-// If the caller doesn't permission to execute the specified operation, returns `PERMISSION_DENIED`.
-// If the per-user rate quota has been exceeded, returns `RESOURCE_EXHAUSTED`.
-// If any unexpected server-side exception occurs, returns `INTERNAL`.
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
 service MessagingService {
 
-  // Querys the route info of a topic from specific endpoints, the server returns a set of partition if success.
+  // Querys the route entries of the requested topic in the perspective of the
+  // given endpoints. On success, servers should return a collection of
+  // addressable partitions. Note servers may return customized route entries
+  // based on endpoints provided.
   //
-  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the requested topic doesn't exist, returns `NOT_FOUND`.
   // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
   rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
   }
 
-  // Producer or consumer sends HeartbeatRequest to server in order to report necessary
-  // client-side information, like subscription data of consumer. Returns `OK` if success.
+  // Producer or consumer sends HeartbeatRequest to servers periodically to
+  // keep-alive. Additionally, it also reports client-side configuration,
+  // including topic subscription, load-balancing group name, etc.
   //
-  // If the client language info is invalid, returns `INVALID_ARGUMENT`
+  // Returns `OK` if success.
+  //
+  // If a client specifies a language that is not yet supported by servers,
+  // returns `INVALID_ARGUMENT`
   rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
   }
 
-  // Checks the health status of message server, returns `OK` if no network issues.
-  // Clients could use this RPC to detect the availability of server, and adpot necessary isolation measures.
+  // Checks the health status of message server, returns `OK` if services are
+  // online and serving. Clients may use this RPC to detect availability of
+  // messaging service, and take isolation actions when necessary.
   rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {
   }
 
-  // Sends one message to the specific partition of a topic, returns message id or transaction id with status `OK`.
+  // Delivers messages to brokers.
+  // Clients may further:
+  // 1. Refine a message destination to topic partition which fulfills parts of
+  // FIFO semantic;
+  // 2. Flag a message as transactional, which keeps it invisible to consumers
+  // until it commits;
+  // 3. Time a message, making it invisible to consumers till specified
+  // time-point;
+  // 4. And more...
   //
-  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // Returns message-id or transaction-id with status `OK` on success.
+  //
+  // If the destination topic doesn't exist, returns `NOT_FOUND`.
   rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
   }
 
@@ -394,34 +433,40 @@
   rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
   }
 
-  // Receives messages from the server in batch manner, returns a set of messages if success.
-  // The received messages should be acked or uacked after processed.
+  // Receives messages from the server in batch manner, returns a set of
+  // messages if success. The received messages should be acked or uacked after
+  // processed.
   //
-  // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
-  // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
-  // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
-  // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
-  // may suffer from false empty responses.
+  // If the pending concurrent receive requests exceed the quota of the given
+  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+  // message in the specific topic, returns `OK` with an empty message set.
+  // Please note that client may suffer from false empty responses.
   rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {
   }
 
-  // Acknowledges the message associated with the `receipt_handle` or `offset` in the
-  // `AckMessageRequest`, it means the message has been successfully processed.
-  // Returns `OK` if the message server remove the relevant message successfully.
+  // Acknowledges the message associated with the `receipt_handle` or `offset`
+  // in the `AckMessageRequest`, it means the message has been successfully
+  // processed. Returns `OK` if the message server remove the relevant message
+  // successfully.
   //
-  // If the given receipt_handle is illegal or out of date, returns `INVALID_ARGUMENT`.
+  // If the given receipt_handle is illegal or out of date, returns
+  // `INVALID_ARGUMENT`.
   rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
   }
 
-  // Signals that the message has not been successfully processed. The message server should resend the message
-  // follow the retry policy defined at server-side.
+  // Signals that the message has not been successfully processed. The message
+  // server should resend the message follow the retry policy defined at
+  // server-side.
   //
-  // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+  // If the corresponding topic or consumer group doesn't exist, returns
+  // `NOT_FOUND`.
   rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {
   }
 
-  // Forwards one message to dead letter queue if the DeadLetterPolicy is triggered by this message at client-side,
-  // return `OK` if success.
+  // Forwards one message to dead letter queue if the DeadLetterPolicy is
+  // triggered by this message at client-side, return `OK` if success.
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
       returns (ForwardMessageToDeadLetterQueueResponse) {
   }
@@ -430,26 +475,57 @@
   rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
   }
 
-  // Querys the offset of the specific partition, returns the offset with `OK` if success.
-  // The message server should maintain a numerical offset for each message in a parition.
+  // Querys the offset of the specific partition, returns the offset with `OK`
+  // if success. The message server should maintain a numerical offset for each
+  // message in a parition.
   rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {
   }
 
-  // Pulls messages from the specific partition, returns a set of messages with next pull offset.
-  // The pulled messages can't be acked or nacked, while the client is responsible for manage offesets for consumer,
-  // typically update consume offset to local memory or a third-party storage service.
+  // Pulls messages from the specific partition, returns a set of messages with
+  // next pull offset. The pulled messages can't be acked or nacked, while the
+  // client is responsible for manage offesets for consumer, typically update
+  // consume offset to local memory or a third-party storage service.
   //
-  // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
-  // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
-  // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
-  // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
-  // may suffer from false empty responses.
+  // If the pending concurrent receive requests exceed the quota of the given
+  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+  // message in the specific topic, returns `OK` with an empty message set.
+  // Please note that client may suffer from false empty responses.
   rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {
   }
 
-  rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {
+  // Multiplexing RPC(s) for various polling requests, which issue different
+  // commands to client.
+  //
+  // Sometimes client may need to receive and process the command from server.
+  // To prevent the complexity of streaming RPC(s), a unary RPC using
+  // long-polling is another solution.
+  //
+  // To mark the request-response of corresponding command, `command_id` in
+  // message is recorded in the subsequent RPC(s). For example, after receiving
+  // command of printing thread stack trace, client would send
+  // `ReportMessageConsumptionResultRequest` to server, which contain both of
+  // the stack trace and `command_id`.
+  //
+  // At same time, `NoopCommand` is delivered from server when no new command is
+  // needed, it is essential for client to maintain the ping-pong.
+  //
+  rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {
   }
 
+  // After receiving the corresponding polling command, the thread stack trace
+  // is reported to the server.
+  rpc ReportThreadStackTrace(ReportThreadStackTraceRequest) returns (ReportThreadStackTraceResponse) {
+  }
+
+  // After receiving the corresponding polling command, the consumption result
+  // of appointed message is reported to the server.
+  rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
+      returns (ReportMessageConsumptionResultResponse) {
+  }
+
+  // Notify the server that the client is terminated.
   rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
   }
 }
\ No newline at end of file
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 24d56be..d0143e5 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -55,7 +55,6 @@
   tls_channel_credential_options_.watch_identity_key_cert_pairs();
   channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);
 
-
   // Use unlimited receive message size.
   channel_arguments_.SetMaxReceiveMessageSize(-1);
 
@@ -1237,42 +1236,23 @@
   client->asyncEndTransaction(request, invocation_context);
 }
 
-void ClientManagerImpl::multiplexingCall(
-    const std::string& target_host, const Metadata& metadata, const MultiplexingRequest& request,
-    std::chrono::milliseconds timeout, const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) {
-  RpcClientSharedPtr client = getRpcClient(target_host);
-  if (!client) {
-    SPDLOG_WARN("No RPC client for {}", target_host);
-    cb(nullptr);
-    return;
-  }
+void ClientManagerImpl::pollCommand(const std::string& target, const Metadata& metadata,
+                                    const PollCommandRequest& request, std::chrono::milliseconds timeout,
+                                    const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) {
+  auto client = getRpcClient(target);
 
-  SPDLOG_DEBUG("Prepare to endTransaction. TargetHost={}, Request: {}", target_host.data(), request.DebugString());
-
-  auto invocation_context = new InvocationContext<MultiplexingResponse>();
-  invocation_context->remote_address = target_host;
+  auto invocation_context = new InvocationContext<PollCommandResponse>();
+  invocation_context->remote_address = target;
   for (const auto& item : metadata) {
     invocation_context->context.AddMetadata(item.first, item.second);
   }
-
-  // Set RPC deadline.
   auto deadline = std::chrono::system_clock::now() + timeout;
   invocation_context->context.set_deadline(deadline);
 
-  auto callback = [cb](const InvocationContext<MultiplexingResponse>* invocation_context) {
-    if (!invocation_context->status.ok()) {
-      SPDLOG_WARN("Failed to apply multiplexing-call. TargetHost={}, gRPC statusCode={}, errorMessage={}",
-                  invocation_context->remote_address, invocation_context->status.error_message(),
-                  invocation_context->status.error_message());
-      cb(invocation_context);
-      return;
-    }
+  auto callback = [cb](const InvocationContext<PollCommandResponse>* invocation_context) { cb(invocation_context); };
 
-    SPDLOG_DEBUG("endTransaction completed OK. Response: {}", invocation_context->response.DebugString());
-    cb(invocation_context);
-  };
   invocation_context->callback = callback;
-  client->asyncMultiplexingCall(request, invocation_context);
+  client->asyncPollCommand(request, invocation_context);
 }
 
 void ClientManagerImpl::queryOffset(const std::string& target_host, const Metadata& metadata,
@@ -1458,6 +1438,93 @@
   client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
 }
 
+std::error_code ClientManagerImpl::reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
+                                                          const ReportThreadStackTraceRequest& request,
+                                                          std::chrono::milliseconds timeout) {
+  std::error_code ec;
+  auto client = getRpcClient(target_host);
+  grpc::ClientContext context;
+  auto deadline = std::chrono::system_clock::now() + timeout;
+  context.set_deadline(deadline);
+
+  for (const auto& item : metadata) {
+    context.AddMetadata(item.first, item.second);
+  }
+
+  ReportThreadStackTraceResponse response;
+  auto status = client->reportThreadStackTrace(&context, request, &response);
+  if (!status.ok()) {
+    ec = ErrorCode::RequestTimeout;
+    SPDLOG_WARN("Failed to report thread-stack-trace to {}. Cause: {}", target_host, status.error_message());
+    return ec;
+  }
+
+  switch (response.common().status().code()) {
+    case google::rpc::Code::OK: {
+      return ec;
+    }
+    case google::rpc::Code::UNAUTHENTICATED: {
+      SPDLOG_WARN("Unauthorized. Host={}, Cause: {}", target_host, response.common().status().message());
+      ec = ErrorCode::Unauthorized;
+      break;
+    }
+    case google::rpc::Code::PERMISSION_DENIED: {
+      SPDLOG_WARN("Forbidden. Host={}, Cause: {}", target_host, response.common().status().message());
+      ec = ErrorCode::Forbidden;
+      break;
+    }
+    default: {
+      ec = ErrorCode::NotImplemented;
+      SPDLOG_WARN("Unsupported response code, please update client to latest release. Host={}", target_host);
+    }
+  }
+  return ec;
+}
+
+std::error_code ClientManagerImpl::reportMessageConsumptionResult(const std::string& target_host,
+                                                                  const Metadata& metadata,
+                                                                  const ReportMessageConsumptionResultRequest& request,
+                                                                  std::chrono::milliseconds timeout) {
+  std::error_code ec;
+  auto client = getRpcClient(target_host);
+  grpc::ClientContext context;
+  auto deadline = std::chrono::system_clock::now() + timeout;
+  context.set_deadline(deadline);
+
+  for (const auto& item : metadata) {
+    context.AddMetadata(item.first, item.second);
+  }
+
+  ReportMessageConsumptionResultResponse response;
+  auto status = client->reportMessageConsumptionResult(&context, request, &response);
+  if (!status.ok()) {
+    ec = ErrorCode::RequestTimeout;
+    SPDLOG_WARN("Failed to report thread-stack-trace to {}. Cause: {}", target_host, status.error_message());
+    return ec;
+  }
+
+  switch (response.common().status().code()) {
+    case google::rpc::Code::OK: {
+      return ec;
+    }
+    case google::rpc::Code::UNAUTHENTICATED: {
+      SPDLOG_WARN("Unauthorized. Host={}, Cause: {}", target_host, response.common().status().message());
+      ec = ErrorCode::Unauthorized;
+      break;
+    }
+    case google::rpc::Code::PERMISSION_DENIED: {
+      SPDLOG_WARN("Forbidden. Host={}, Cause: {}", target_host, response.common().status().message());
+      ec = ErrorCode::Forbidden;
+      break;
+    }
+    default: {
+      ec = ErrorCode::NotImplemented;
+      SPDLOG_WARN("Unsupported response code, please update client to latest release. Host={}", target_host);
+    }
+  }
+  return ec;
+}
+
 std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, const Metadata& metadata,
                                                            const NotifyClientTerminationRequest& request,
                                                            std::chrono::milliseconds timeout) {
@@ -1523,6 +1590,10 @@
   SPDLOG_INFO("{}", stats);
 }
 
+void ClientManagerImpl::submit(std::function<void()> task) {
+  callback_thread_pool_->submit(task);
+}
+
 const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task";
 const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task";
 const char* ClientManagerImpl::HEALTH_CHECK_TASK_NAME = "health-check-task";
diff --git a/src/main/cpp/client/RpcClientImpl.cpp b/src/main/cpp/client/RpcClientImpl.cpp
index 12af6d7..cabced2 100644
--- a/src/main/cpp/client/RpcClientImpl.cpp
+++ b/src/main/cpp/client/RpcClientImpl.cpp
@@ -119,16 +119,27 @@
   need_heartbeat_ = need_heartbeat;
 }
 
-void RpcClientImpl::asyncMultiplexingCall(const MultiplexingRequest& request,
-                                          InvocationContext<MultiplexingResponse>* invocation_context) {
-  assert(invocation_context);
+void RpcClientImpl::asyncPollCommand(const PollCommandRequest& request,
+                                     InvocationContext<PollCommandResponse>* invocation_context) {
   invocation_context->response_reader =
-      stub_->PrepareAsyncMultiplexingCall(&invocation_context->context, request, completion_queue_.get());
+      stub_->PrepareAsyncPollCommand(&invocation_context->context, request, completion_queue_.get());
   invocation_context->response_reader->StartCall();
   invocation_context->response_reader->Finish(&invocation_context->response, &invocation_context->status,
                                               invocation_context);
 }
 
+grpc::Status RpcClientImpl::reportThreadStackTrace(grpc::ClientContext* context,
+                                                   const ReportThreadStackTraceRequest& request,
+                                                   ReportThreadStackTraceResponse* response) {
+  return stub_->ReportThreadStackTrace(context, request, response);
+}
+
+grpc::Status RpcClientImpl::reportMessageConsumptionResult(grpc::ClientContext* context,
+                                                           const ReportMessageConsumptionResultRequest& request,
+                                                           ReportMessageConsumptionResultResponse* response) {
+  return stub_->ReportMessageConsumptionResult(context, request, response);
+}
+
 grpc::Status RpcClientImpl::notifyClientTermination(grpc::ClientContext* context,
                                                     const NotifyClientTerminationRequest& request,
                                                     NotifyClientTerminationResponse* response) {
diff --git a/src/main/cpp/client/include/ClientManager.h b/src/main/cpp/client/include/ClientManager.h
index 20ea165..9212564 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -3,6 +3,7 @@
 #include <chrono>
 #include <memory>
 #include <system_error>
+#include <functional>
 
 #include "Client.h"
 #include "ReceiveMessageCallback.h"
@@ -37,10 +38,10 @@
                          std::chrono::milliseconds timeout,
                          const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) = 0;
 
-  virtual void multiplexingCall(const std::string& target, const Metadata& metadata, const MultiplexingRequest& request,
-                                std::chrono::milliseconds timeout,
-                                const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) = 0;
-
+  virtual void pollCommand(const std::string& target, const Metadata& metadata, const PollCommandRequest& request,
+                           std::chrono::milliseconds timeout,
+                           const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) = 0;
+  
   virtual bool wrapMessage(const rmq::Message& item, MQMessageExt& message_ext) = 0;
 
   virtual void ack(const std::string& target_host, const Metadata& metadata, const AckMessageRequest& request,
@@ -88,7 +89,19 @@
                                                   const NotifyClientTerminationRequest& request,
                                                   std::chrono::milliseconds timeout) = 0;
 
+  virtual std::error_code reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
+                                                 const ReportThreadStackTraceRequest& request,
+                                                 std::chrono::milliseconds timeout) = 0;
+  
+  virtual std::error_code reportMessageConsumptionResult(const std::string& target_host, const Metadata& metadata,
+                                                         const ReportMessageConsumptionResultRequest& request,
+                                                         std::chrono::milliseconds timeout) = 0;
+
   virtual State state() const = 0;
+
+
+  virtual void submit(std::function<void()> task) = 0;
+  
 };
 
 using ClientManagerPtr = std::shared_ptr<ClientManager>;
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h b/src/main/cpp/client/include/ClientManagerImpl.h
index 321664c..086a4e1 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -165,9 +165,9 @@
                       std::chrono::milliseconds timeout,
                       const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) override;
 
-  void multiplexingCall(const std::string& target, const Metadata& metadata, const MultiplexingRequest& request,
-                        std::chrono::milliseconds timeout,
-                        const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) override;
+  void pollCommand(const std::string& target, const Metadata& metadata, const PollCommandRequest& request,
+                   std::chrono::milliseconds timeout,
+                   const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) override;
 
   void queryOffset(const std::string& target_host, const Metadata& metadata, const QueryOffsetRequest& request,
                    std::chrono::milliseconds timeout,
@@ -181,6 +181,14 @@
                                           const NotifyClientTerminationRequest& request,
                                           std::chrono::milliseconds timeout) override;
 
+  std::error_code reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
+                                         const ReportThreadStackTraceRequest& request,
+                                         std::chrono::milliseconds timeout) override;
+
+  std::error_code reportMessageConsumptionResult(const std::string& target_host, const Metadata& metadata,
+                                                 const ReportMessageConsumptionResultRequest& request,
+                                                 std::chrono::milliseconds timeout) override;
+
   void trace(bool trace) {
     trace_ = trace;
   }
@@ -191,6 +199,8 @@
 
   State state() const override;
 
+  void submit(std::function<void()> task) override;
+
 private:
   void doHeartbeat();
 
diff --git a/src/main/cpp/client/include/RpcClient.h b/src/main/cpp/client/include/RpcClient.h
index 86522bd..2b93a08 100644
--- a/src/main/cpp/client/include/RpcClient.h
+++ b/src/main/cpp/client/include/RpcClient.h
@@ -40,24 +40,25 @@
 using HealthCheckResponse = rmq::HealthCheckResponse;
 using EndTransactionRequest = rmq::EndTransactionRequest;
 using EndTransactionResponse = rmq::EndTransactionResponse;
-using MultiplexingRequest = rmq::MultiplexingRequest;
-using MultiplexingResponse = rmq::MultiplexingResponse;
-using GenericPollingRequest = rmq::GenericPollingRequest;
-using GenericPollingResponse = rmq::GenericPollingRequest;
-using PrintThreadStackRequest = rmq::PrintThreadStackRequest;
-using PrintThreadStackResponse = rmq::PrintThreadStackResponse;
-using VerifyMessageConsumptionRequest = rmq::VerifyMessageConsumptionRequest;
-using VerifyMessageConsumptionResponse = rmq::VerifyMessageConsumptionResponse;
-using RecoverOrphanedTransactionRequest = rmq::RecoverOrphanedTransactionRequest;
 using QueryOffsetRequest = rmq::QueryOffsetRequest;
 using QueryOffsetResponse = rmq::QueryOffsetResponse;
 using PullMessageRequest = rmq::PullMessageRequest;
 using PullMessageResponse = rmq::PullMessageResponse;
+using PollCommandRequest = rmq::PollCommandRequest;
+using PollCommandResponse = rmq::PollCommandResponse;
+using ReportThreadStackTraceRequest = rmq::ReportThreadStackTraceRequest;
+using ReportThreadStackTraceResponse = rmq::ReportThreadStackTraceResponse;
+using ReportMessageConsumptionResultRequest = rmq::ReportMessageConsumptionResultRequest;
+using ReportMessageConsumptionResultResponse = rmq::ReportMessageConsumptionResultResponse;
 using ForwardMessageToDeadLetterQueueRequest = rmq::ForwardMessageToDeadLetterQueueRequest;
 using ForwardMessageToDeadLetterQueueResponse = rmq::ForwardMessageToDeadLetterQueueResponse;
 using NotifyClientTerminationRequest = rmq::NotifyClientTerminationRequest;
 using NotifyClientTerminationResponse = rmq::NotifyClientTerminationResponse;
 
+/**
+ * @brief A RpcClient represents a session between client and a remote broker.
+ *
+ */
 class RpcClient {
 public:
   RpcClient() = default;
@@ -93,8 +94,8 @@
   virtual void asyncEndTransaction(const EndTransactionRequest& request,
                                    InvocationContext<EndTransactionResponse>* invocation_context) = 0;
 
-  virtual void asyncMultiplexingCall(const MultiplexingRequest& request,
-                                     InvocationContext<MultiplexingResponse>* invocation_context) = 0;
+  virtual void asyncPollCommand(const PollCommandRequest& request,
+                                InvocationContext<PollCommandResponse>* invocation_context) = 0;
 
   virtual void asyncQueryOffset(const QueryOffsetRequest& request,
                                 InvocationContext<QueryOffsetResponse>* invocation_context) = 0;
@@ -106,6 +107,14 @@
       const ForwardMessageToDeadLetterQueueRequest& request,
       InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) = 0;
 
+  virtual grpc::Status reportThreadStackTrace(grpc::ClientContext* context,
+                                              const ReportThreadStackTraceRequest& request,
+                                              ReportThreadStackTraceResponse* response) = 0;
+
+  virtual grpc::Status reportMessageConsumptionResult(grpc::ClientContext* context,
+                                                      const ReportMessageConsumptionResultRequest& request,
+                                                      ReportMessageConsumptionResultResponse* response) = 0;
+
   virtual grpc::Status notifyClientTermination(grpc::ClientContext* context,
                                                const NotifyClientTerminationRequest& request,
                                                NotifyClientTerminationResponse* response) = 0;
diff --git a/src/main/cpp/client/include/RpcClientImpl.h b/src/main/cpp/client/include/RpcClientImpl.h
index 7cc782b..e478dda 100644
--- a/src/main/cpp/client/include/RpcClientImpl.h
+++ b/src/main/cpp/client/include/RpcClientImpl.h
@@ -50,8 +50,8 @@
   void asyncEndTransaction(const EndTransactionRequest& request,
                            InvocationContext<EndTransactionResponse>* invocation_context) override;
 
-  void asyncMultiplexingCall(const MultiplexingRequest& request,
-                             InvocationContext<MultiplexingResponse>* invocation_context) override;
+  void asyncPollCommand(const PollCommandRequest& request,
+                        InvocationContext<PollCommandResponse>* invocation_context) override;
 
   void asyncQueryOffset(const QueryOffsetRequest& request,
                         InvocationContext<QueryOffsetResponse>* invocation_context) override;
@@ -63,6 +63,13 @@
       const ForwardMessageToDeadLetterQueueRequest& request,
       InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) override;
 
+  grpc::Status reportThreadStackTrace(grpc::ClientContext* context, const ReportThreadStackTraceRequest& request,
+                                      ReportThreadStackTraceResponse* response) override;
+
+  grpc::Status reportMessageConsumptionResult(grpc::ClientContext* context,
+                                              const ReportMessageConsumptionResultRequest& request,
+                                              ReportMessageConsumptionResultResponse* response) override;
+
   grpc::Status notifyClientTermination(grpc::ClientContext* context, const NotifyClientTerminationRequest& request,
                                        NotifyClientTerminationResponse* response) override;
 
diff --git a/src/main/cpp/client/mocks/include/ClientManagerMock.h b/src/main/cpp/client/mocks/include/ClientManagerMock.h
index 0f1fe7d..03f827e 100644
--- a/src/main/cpp/client/mocks/include/ClientManagerMock.h
+++ b/src/main/cpp/client/mocks/include/ClientManagerMock.h
@@ -27,9 +27,9 @@
                (const std::function<void(const std::error_code&, const HeartbeatResponse&)>&)),
               (override));
 
-  MOCK_METHOD(void, multiplexingCall,
-              (const std::string&, const Metadata&, const MultiplexingRequest&, std::chrono::milliseconds,
-               (const std::function<void(const InvocationContext<MultiplexingResponse>*)>&)),
+  MOCK_METHOD(void, pollCommand,
+              (const std::string&, const Metadata&, const PollCommandRequest&, std::chrono::milliseconds,
+               const std::function<void(const InvocationContext<PollCommandResponse>*)>&),
               (override));
 
   MOCK_METHOD(bool, wrapMessage, (const rmq::Message&, MQMessageExt&), (override));
@@ -88,7 +88,18 @@
               (const std::string&, const Metadata&, const NotifyClientTerminationRequest&, std::chrono::milliseconds),
               (override));
 
+  MOCK_METHOD(std::error_code, reportThreadStackTrace,
+              (const std::string&, const Metadata&, const ReportThreadStackTraceRequest&, std::chrono::milliseconds),
+              (override));
+
+  MOCK_METHOD(std::error_code, reportMessageConsumptionResult,
+              (const std::string&, const Metadata&, const ReportMessageConsumptionResultRequest&,
+               std::chrono::milliseconds),
+              (override));
+
   MOCK_METHOD(State, state, (), (const override));
+
+  MOCK_METHOD(void, submit, (std::function<void()>), (override));
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/mocks/include/RpcClientMock.h b/src/main/cpp/client/mocks/include/RpcClientMock.h
index a4363fd..dadafe0 100644
--- a/src/main/cpp/client/mocks/include/RpcClientMock.h
+++ b/src/main/cpp/client/mocks/include/RpcClientMock.h
@@ -38,9 +38,6 @@
   MOCK_METHOD(void, asyncEndTransaction, (const EndTransactionRequest&, InvocationContext<EndTransactionResponse>*),
               (override));
 
-  MOCK_METHOD(void, asyncMultiplexingCall, (const MultiplexingRequest&, InvocationContext<MultiplexingResponse>*),
-              (override));
-
   MOCK_METHOD(void, asyncQueryOffset, (const QueryOffsetRequest&, InvocationContext<QueryOffsetResponse>*), (override));
 
   MOCK_METHOD(void, asyncPull, (const PullMessageRequest&, InvocationContext<PullMessageResponse>*), (override));
@@ -50,6 +47,17 @@
                InvocationContext<ForwardMessageToDeadLetterQueueResponse>*),
               (override));
 
+  MOCK_METHOD(void, asyncPollCommand, (const PollCommandRequest&, InvocationContext<PollCommandResponse>*), (override));
+
+  MOCK_METHOD(grpc::Status, reportThreadStackTrace,
+              (grpc::ClientContext*, const ReportThreadStackTraceRequest&, ReportThreadStackTraceResponse*),
+              (override));
+
+  MOCK_METHOD(grpc::Status, reportMessageConsumptionResult,
+              (grpc::ClientContext*, const ReportMessageConsumptionResultRequest&,
+               ReportMessageConsumptionResultResponse*),
+              (override));
+
   MOCK_METHOD(grpc::Status, notifyClientTermination,
               (grpc::ClientContext*, const NotifyClientTerminationRequest&, NotifyClientTerminationResponse* response),
               (override));
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 294de27..920187c 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -24,6 +24,7 @@
 #include "MessageAccessor.h"
 #include "Signature.h"
 #include "rocketmq/MQMessageExt.h"
+#include "rocketmq/MessageListener.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -303,94 +304,133 @@
   }
 }
 
-void ClientImpl::multiplexing(const std::string& target, const MultiplexingRequest& request) {
+void ClientImpl::pollCommand(const std::string& target) {
   absl::flat_hash_map<std::string, std::string> metadata;
   Signature::sign(this, metadata);
-  client_manager_->multiplexingCall(target, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
-                                    std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
+
+  PollCommandRequest request;
+  auto&& resource_bundle = resourceBundle();
+  request.set_client_id(resource_bundle.client_id);
+  switch (resource_bundle.group_type) {
+    case GroupType::PUBLISHER:
+      request.mutable_producer_group()->set_resource_namespace(resource_namespace_);
+      request.mutable_producer_group()->set_name(group_name_);
+      break;
+
+    case GroupType::SUBSCRIBER:
+      request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
+      request.mutable_consumer_group()->set_name(group_name_);
+      break;
+  }
+  auto topics = request.mutable_topics();
+  for (const auto& item : resource_bundle.topics) {
+    auto topic = new rmq::Resource();
+    topic->set_resource_namespace(resource_namespace_);
+    topic->set_name(item);
+    topics->AddAllocated(topic);
+  }
+
+  client_manager_->pollCommand(target, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
+                               std::bind(&ClientImpl::onPollCommandResponse, this, std::placeholders::_1));
 }
 
-void ClientImpl::onMultiplexingResponse(const InvocationContext<MultiplexingResponse>* ctx) {
+void ClientImpl::verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message) {
+  MessageListener* listener = messageListener();
+
+  Metadata metadata;
+  Signature::sign(this, metadata);
+  ReportMessageConsumptionResultRequest request;
+  request.set_command_id(command_id);
+
+  if (!listener) {
+    request.mutable_status()->set_code(google::rpc::Code::FAILED_PRECONDITION);
+    request.mutable_status()->set_message("Target is not a push consumer client");
+    client_manager_->reportMessageConsumptionResult(remote_address, metadata, request,
+                                                    absl::ToChronoMilliseconds(io_timeout_));
+    return;
+  }
+
+  if (MessageListenerType::FIFO == listener->listenerType()) {
+    request.mutable_status()->set_code(google::rpc::Code::FAILED_PRECONDITION);
+    request.mutable_status()->set_message("FIFO message does NOT support verification of message consumption");
+    client_manager_->reportMessageConsumptionResult(remote_address, metadata, request,
+                                                    absl::ToChronoMilliseconds(io_timeout_));
+    return;
+  }
+
+  // Execute the actual verification task in dedicated thread-pool.
+  client_manager_->submit(std::bind(&ClientImpl::doVerify, this, remote_address, command_id, message));
+}
+
+void ClientImpl::onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx) {
   if (!ctx->status.ok()) {
-    std::string remote_address = ctx->remote_address;
-    auto multiplexingLater = [this, remote_address]() {
-      MultiplexingRequest request;
-      fillGenericPollingRequest(request);
-      multiplexing(remote_address, request);
-    };
-    static std::string task_name = "Initiate multiplex request later";
-    client_manager_->getScheduler().schedule(multiplexingLater, task_name, std::chrono::seconds(3),
-                                             std::chrono::seconds(0));
+    static std::string task_name = "Poll-Command-Later";
+    client_manager_->getScheduler().schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,
+                                             std::chrono::seconds(3), std::chrono::seconds(0));
     return;
   }
 
   switch (ctx->response.type_case()) {
-    case MultiplexingResponse::TypeCase::kPrintThreadStackRequest: {
-      MultiplexingRequest request;
-      request.mutable_print_thread_stack_response()->mutable_common()->mutable_status()->set_code(
-          google::rpc::Code::UNIMPLEMENTED);
-      request.mutable_print_thread_stack_response()->set_stack_trace(
-          "Print thread stack trace is not supported by C++ SDK");
+    case PollCommandResponse::TypeCase::kPrintThreadStackTraceCommand: {
       absl::flat_hash_map<std::string, std::string> metadata;
       Signature::sign(this, metadata);
-      client_manager_->multiplexingCall(ctx->remote_address, metadata, request, absl::ToChronoMilliseconds(io_timeout_),
-                                        std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
+      ReportThreadStackTraceRequest request;
+      auto command_id = ctx->response.print_thread_stack_trace_command().command_id();
+      request.set_command_id(command_id);
+      request.set_thread_stack_trace("--RocketMQ-Client-CPP does NOT support thread stack trace report--");
+      client_manager_->reportThreadStackTrace(ctx->remote_address, metadata, request,
+                                              absl::ToChronoMilliseconds(io_timeout_));
       break;
     }
 
-    case MultiplexingResponse::TypeCase::kVerifyMessageConsumptionRequest: {
-      auto data = ctx->response.verify_message_consumption_request().message();
+    case PollCommandResponse::TypeCase::kVerifyMessageConsumptionCommand: {
+      auto command_id = ctx->response.verify_message_consumption_command().command_id();
+      auto data = ctx->response.verify_message_consumption_command().message();
       MQMessageExt message;
-      MultiplexingRequest request;
+      ReportMessageConsumptionResultRequest request;
+      request.set_command_id(command_id);
+      Metadata metadata;
+      Signature::sign(this, metadata);
       if (!client_manager_->wrapMessage(data, message)) {
         SPDLOG_WARN("Message to verify consumption is corrupted");
-        request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_code(
-            google::rpc::Code::INVALID_ARGUMENT);
-        request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_message(
-            "Message to verify is corrupted");
-        multiplexing(ctx->remote_address, request);
-        return;
+        request.mutable_status()->set_code(google::rpc::Code::INVALID_ARGUMENT);
+        request.mutable_status()->set_message("Data corrupted");
+        client_manager_->reportMessageConsumptionResult(ctx->remote_address, metadata, request,
+                                                        absl::ToChronoMilliseconds(io_timeout_));
       }
-      std::string&& result = verifyMessageConsumption(message);
-      request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_code(
-          google::rpc::Code::OK);
-      request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_message(result);
-      multiplexing(ctx->remote_address, request);
+      verifyMessageConsumption(std::move(ctx->remote_address), std::move(command_id), std::move(message));
       break;
     }
 
-    case MultiplexingResponse::TypeCase::kRecoverOrphanedTransactionRequest: {
-      auto orphan = ctx->response.recover_orphaned_transaction_request().orphaned_transactional_message();
+    case PollCommandResponse::TypeCase::kRecoverOrphanedTransactionCommand: {
+      auto orphan = ctx->response.recover_orphaned_transaction_command().orphaned_transactional_message();
       MQMessageExt message;
       if (client_manager_->wrapMessage(orphan, message)) {
         MessageAccessor::setTargetEndpoint(message, ctx->remote_address);
-        const std::string& transaction_id = ctx->response.recover_orphaned_transaction_request().transaction_id();
-        resolveOrphanedTransactionalMessage(transaction_id, message);
+        const std::string& transaction_id = ctx->response.recover_orphaned_transaction_command().transaction_id();
+        // Dispatch task to thread-pool.
+        client_manager_->submit(
+            std::bind(&ClientImpl::resolveOrphanedTransactionalMessage, this, transaction_id, message));
       } else {
         SPDLOG_WARN("Failed to resolve orphaned transactional message, potentially caused by message-body checksum "
                     "verification failure.");
       }
-      MultiplexingRequest request;
-      fillGenericPollingRequest(request);
-      multiplexing(ctx->remote_address, request);
       break;
     }
 
-    case MultiplexingResponse::TypeCase::kPollingResponse: {
-      MultiplexingRequest request;
-      fillGenericPollingRequest(request);
-      multiplexing(ctx->remote_address, request);
+    case PollCommandResponse::TypeCase::kNoopCommand: {
+      SPDLOG_DEBUG("A long-polling-command period completed.");
       break;
     }
 
     default: {
       SPDLOG_WARN("Unsupported multiplex type");
-      MultiplexingRequest request;
-      fillGenericPollingRequest(request);
-      multiplexing(ctx->remote_address, request);
       break;
     }
   }
+
+  // Initiate next round of long-polling-command immediately.
+  pollCommand(ctx->remote_address);
 }
 
 void ClientImpl::onRemoteEndpointRemoval(const std::vector<std::string>& hosts) {
@@ -451,30 +491,6 @@
   }
 }
 
-void ClientImpl::fillGenericPollingRequest(MultiplexingRequest& request) {
-  auto&& resource_bundle = resourceBundle();
-  auto polling_request = request.mutable_polling_request();
-  polling_request->set_client_id(clientId());
-  switch (resource_bundle.group_type) {
-    case GroupType::PUBLISHER:
-      polling_request->mutable_producer_group()->set_resource_namespace(resource_namespace_);
-      polling_request->mutable_producer_group()->set_name(group_name_);
-      break;
-
-    case GroupType::SUBSCRIBER:
-      polling_request->mutable_consumer_group()->set_resource_namespace(resource_namespace_);
-      polling_request->mutable_consumer_group()->set_name(group_name_);
-      break;
-  }
-  auto topics = polling_request->mutable_topics();
-  for (const auto& item : resource_bundle.topics) {
-    auto topic = new rmq::Resource();
-    topic->set_resource_namespace(resource_namespace_);
-    topic->set_name(item);
-    topics->AddAllocated(topic);
-  }
-}
-
 void ClientImpl::notifyClientTermination() {
   SPDLOG_WARN("Should NOT reach here. Subclass should have overridden this function.");
   std::abort();
@@ -492,4 +508,38 @@
   }
 }
 
+void ClientImpl::doVerify(std::string target, std::string command_id, MQMessageExt message) {
+  ReportMessageConsumptionResultRequest request;
+  request.set_command_id(command_id);
+  StandardMessageListener* callback = reinterpret_cast<StandardMessageListener*>(messageListener());
+  try {
+    std::vector<MQMessageExt> batch = {message};
+    auto result = callback->consumeMessage(batch);
+    switch (result) {
+      case ConsumeMessageResult::SUCCESS: {
+        SPDLOG_DEBUG("Verify message[MsgId={}] OK", message.getMsgId());
+        request.mutable_status()->set_message("Consume Success");
+        break;
+      }
+      case ConsumeMessageResult::FAILURE: {
+        SPDLOG_WARN("Message Listener failed to consume message[MsgId={}] when verifying", message.getMsgId());
+        request.mutable_status()->set_code(google::rpc::Code::INTERNAL);
+        request.mutable_status()->set_message("Consume Failed");
+        break;
+      }
+    }
+  } catch (...) {
+    SPDLOG_WARN("Exception raised when invoking message listener provided by application developer. MsgId of message "
+                "to verify: {}",
+                message.getMsgId());
+    request.mutable_status()->set_code(google::rpc::Code::INTERNAL);
+    request.mutable_status()->set_message(
+        "Unexpected exception raised while invoking message listener provided by application developer");
+  }
+
+  Metadata metadata;
+  Signature::sign(this, metadata);
+  client_manager_->reportMessageConsumptionResult(target, metadata, request, absl::ToChronoMilliseconds(io_timeout_));
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index 998d357..679f7b3 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -18,6 +18,7 @@
 #include "NameServerResolver.h"
 #include "OtlpExporter.h"
 #include "rocketmq/MQMessageExt.h"
+#include "rocketmq/MessageListener.h"
 #include "rocketmq/State.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -100,10 +101,16 @@
 
   virtual void prepareHeartbeatData(HeartbeatRequest& request) = 0;
 
-  virtual std::string verifyMessageConsumption(const MQMessageExt& message) {
-    return "Unsupported";
-  }
+  virtual void verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message);
 
+  /**
+   * @brief Execute transaction-state-checker to commit or roll-back the orphan transactional message.
+   *
+   * It is no-op by default and Producer-subclass is supposed to override it.
+   *
+   * @param transaction_id
+   * @param message
+   */
   virtual void resolveOrphanedTransactionalMessage(const std::string& transaction_id, const MQMessageExt& message) {
   }
 
@@ -124,6 +131,17 @@
 
   void notifyClientTermination(const NotifyClientTerminationRequest& request);
 
+  /**
+   * @brief Return application developer provided message listener if this client is of PushConsumer type.
+   *
+   * By default, it returns nullptr such that error messages are generated and directed to server immediately.
+   *
+   * @return nullptr by default.
+   */
+  virtual MessageListener* messageListener() {
+    return nullptr;
+  }
+
 private:
   /**
    * This is a low-level API that fetches route data from name server through
@@ -153,14 +171,14 @@
   void updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route)
       LOCKS_EXCLUDED(topic_route_table_mtx_);
 
-  void multiplexing(const std::string& target, const MultiplexingRequest& request);
+  void pollCommand(const std::string& target);
 
-  void onMultiplexingResponse(const InvocationContext<MultiplexingResponse>* ctx);
+  void onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx);
 
   void onHealthCheckResponse(const std::error_code& endpoint, const InvocationContext<HealthCheckResponse>* ctx)
       LOCKS_EXCLUDED(isolated_endpoints_mtx_);
 
-  void fillGenericPollingRequest(MultiplexingRequest& request);
+  void doVerify(std::string target, std::string command_id, MQMessageExt message);
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file