Adapt to protocol v1.0-alpha1 (#29)
Adapt to protocol v1.0-alpha1, including the following changes
* Import latest protocol files
* Replace multiplex RPC with 3 separate RPCs, PollCommand, ReportThreadStackTrace, ReportMessageConsumptionVerificationResult
diff --git a/.bazelrc b/.bazelrc
index 0f8f862..1c53b01 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -117,7 +117,7 @@
# Use remote cache, --config=remote_cache See https://docs.bazel.build/versions/main/guide.html#--config
build:remote_cache --remote_cache=grpc://11.122.50.85:9092
build:remote_cache --experimental_remote_downloader=grpc://11.122.50.85:9092
-test:remote_cache --remote_cache=http://11.122.50.85:9090
+test:remote_cache --remote_cache=grpc://11.122.50.85:9090
test:remote_cache --remote_upload_local_results=true
# Coverage options
diff --git a/proto/apache/rocketmq/v1/definition.proto b/proto/apache/rocketmq/v1/definition.proto
index c92aad3..b6cb24b 100644
--- a/proto/apache/rocketmq/v1/definition.proto
+++ b/proto/apache/rocketmq/v1/definition.proto
@@ -207,6 +207,21 @@
reserved 3 to 64;
}
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishment, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messags subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
message Digest {
DigestType type = 1;
string checksum = 2;
@@ -229,7 +244,8 @@
repeated string keys = 2;
// Message identifier, client-side generated, remains unique.
- // if message_id is empty, the send message request will be aborted with status `INVALID_ARGUMENT`
+ // if message_id is empty, the send message request will be aborted with
+ // status `INVALID_ARGUMENT`
string message_id = 3;
// Message body digest
@@ -296,13 +312,18 @@
}
message Message {
+
Resource topic = 1;
+
// User defined key-value pairs.
// If user_attribute contains the reserved keys by RocketMQ,
- // the send message request will be aborted with status `INVALID_ARGUMENT`. See below links for the reserved keys
+ // the send message request will be aborted with status `INVALID_ARGUMENT`.
+ // See below links for the reserved keys
// https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
map<string, string> user_attribute = 2;
+
SystemAttribute system_attribute = 3;
+
bytes body = 4;
reserved 5 to 64;
diff --git a/proto/apache/rocketmq/v1/service.proto b/proto/apache/rocketmq/v1/service.proto
index 6220f51..9d8e3b0 100644
--- a/proto/apache/rocketmq/v1/service.proto
+++ b/proto/apache/rocketmq/v1/service.proto
@@ -41,13 +41,26 @@
reserved 7 to 64;
}
-// A QueryRouteRequest requests a set of Partitions of the specific topic with
-// necessary route infos.
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxillary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable partitions, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
message QueryRouteRequest {
Resource topic = 1;
- // The service access points used to issue QueryRouteRequest
- // The QueryRouteResponse will indicate the adress of subsequent RPCs.
Endpoints endpoints = 2;
reserved 3 to 64;
@@ -55,6 +68,7 @@
message QueryRouteResponse {
ResponseCommon common = 1;
+
repeated Partition partitions = 2;
reserved 3 to 64;
@@ -264,7 +278,57 @@
reserved 6 to 64;
}
-message GenericPollingRequest {
+message NoopCommand {
+ reserved 1 to 64;
+}
+
+message PrintThreadStackTraceCommand {
+ string command_id = 1;
+
+ reserved 2 to 64;
+}
+
+message ReportThreadStackTraceRequest {
+ string command_id = 1;
+ string thread_stack_trace = 2;
+
+ reserved 3 to 64;
+}
+
+message ReportThreadStackTraceResponse {
+ ResponseCommon common = 1;
+
+ reserved 2 to 64;
+}
+
+message VerifyMessageConsumptionCommand {
+ string command_id = 1;
+ Message message = 2;
+
+ reserved 3 to 64;
+}
+
+message ReportMessageConsumptionResultRequest {
+ string command_id = 1;
+ google.rpc.Status status = 2;
+
+ reserved 3 to 64;
+}
+
+message ReportMessageConsumptionResultResponse {
+ ResponseCommon common = 1;
+
+ reserved 2 to 64;
+}
+
+message RecoverOrphanedTransactionCommand {
+ Message orphaned_transactional_message = 1;
+ string transaction_id = 2;
+
+ reserved 3 to 64;
+}
+
+message PollCommandRequest {
string client_id = 1;
repeated Resource topics = 2;
oneof group {
@@ -275,63 +339,16 @@
reserved 5 to 64;
}
-message GenericPollingResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message PrintThreadStackRequest {
- string mid = 1;
-
- reserved 2 to 64;
-}
-
-message PrintThreadStackResponse {
- ResponseCommon common = 1;
- string mid = 2;
- string stack_trace = 3;
-
- reserved 4 to 64;
-}
-
-message VerifyMessageConsumptionRequest {
- string mid = 1;
- Message message = 2;
-
- reserved 3 to 64;
-}
-
-message VerifyMessageConsumptionResponse {
- string mid = 1;
- ResponseCommon common = 2;
-
- reserved 3 to 64;
-}
-
-message RecoverOrphanedTransactionRequest {
- Message orphaned_transactional_message = 1;
- string transaction_id = 2;
-
- reserved 3 to 64;
-}
-
-message MultiplexingRequest {
+message PollCommandResponse {
oneof type {
- GenericPollingRequest polling_request = 1;
- PrintThreadStackResponse print_thread_stack_response = 2;
- VerifyMessageConsumptionResponse verify_message_consumption_response = 3;
- }
-
- reserved 4 to 64;
-}
-
-message MultiplexingResponse {
- oneof type {
- GenericPollingResponse polling_response = 1;
- PrintThreadStackRequest print_thread_stack_request = 2;
- VerifyMessageConsumptionRequest verify_message_consumption_request = 3;
- RecoverOrphanedTransactionRequest recover_orphaned_transaction_request = 4;
+ // Default command when no new command need to be delivered.
+ NoopCommand noop_command = 1;
+ // Request client to print thread stack trace.
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+ // Request client to verify the consumption of the appointed message.
+ VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
}
reserved 5 to 64;
@@ -353,36 +370,58 @@
reserved 2 to 64;
}
-// For all the rpcs in MessagingService may return below erros:
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
//
-// If the request doesn't have a valid authentication credentials, returns `UNAUTHENTICATED`.
-// If the caller doesn't permission to execute the specified operation, returns `PERMISSION_DENIED`.
-// If the per-user rate quota has been exceeded, returns `RESOURCE_EXHAUSTED`.
-// If any unexpected server-side exception occurs, returns `INTERNAL`.
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
service MessagingService {
- // Querys the route info of a topic from specific endpoints, the server returns a set of partition if success.
+ // Querys the route entries of the requested topic in the perspective of the
+ // given endpoints. On success, servers should return a collection of
+ // addressable partitions. Note servers may return customized route entries
+ // based on endpoints provided.
//
- // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // If the requested topic doesn't exist, returns `NOT_FOUND`.
// If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
}
- // Producer or consumer sends HeartbeatRequest to server in order to report necessary
- // client-side information, like subscription data of consumer. Returns `OK` if success.
+ // Producer or consumer sends HeartbeatRequest to servers periodically to
+ // keep-alive. Additionally, it also reports client-side configuration,
+ // including topic subscription, load-balancing group name, etc.
//
- // If the client language info is invalid, returns `INVALID_ARGUMENT`
+ // Returns `OK` if success.
+ //
+ // If a client specifies a language that is not yet supported by servers,
+ // returns `INVALID_ARGUMENT`
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
}
- // Checks the health status of message server, returns `OK` if no network issues.
- // Clients could use this RPC to detect the availability of server, and adpot necessary isolation measures.
+ // Checks the health status of message server, returns `OK` if services are
+ // online and serving. Clients may use this RPC to detect availability of
+ // messaging service, and take isolation actions when necessary.
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {
}
- // Sends one message to the specific partition of a topic, returns message id or transaction id with status `OK`.
+ // Delivers messages to brokers.
+ // Clients may further:
+ // 1. Refine a message destination to topic partition which fulfills parts of
+ // FIFO semantic;
+ // 2. Flag a message as transactional, which keeps it invisible to consumers
+ // until it commits;
+ // 3. Time a message, making it invisible to consumers till specified
+ // time-point;
+ // 4. And more...
//
- // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // Returns message-id or transaction-id with status `OK` on success.
+ //
+ // If the destination topic doesn't exist, returns `NOT_FOUND`.
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
}
@@ -394,34 +433,40 @@
rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
}
- // Receives messages from the server in batch manner, returns a set of messages if success.
- // The received messages should be acked or uacked after processed.
+ // Receives messages from the server in batch manner, returns a set of
+ // messages if success. The received messages should be acked or uacked after
+ // processed.
//
- // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
- // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
- // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
- // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
- // may suffer from false empty responses.
+ // If the pending concurrent receive requests exceed the quota of the given
+ // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+ // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+ // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+ // message in the specific topic, returns `OK` with an empty message set.
+ // Please note that client may suffer from false empty responses.
rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {
}
- // Acknowledges the message associated with the `receipt_handle` or `offset` in the
- // `AckMessageRequest`, it means the message has been successfully processed.
- // Returns `OK` if the message server remove the relevant message successfully.
+ // Acknowledges the message associated with the `receipt_handle` or `offset`
+ // in the `AckMessageRequest`, it means the message has been successfully
+ // processed. Returns `OK` if the message server remove the relevant message
+ // successfully.
//
- // If the given receipt_handle is illegal or out of date, returns `INVALID_ARGUMENT`.
+ // If the given receipt_handle is illegal or out of date, returns
+ // `INVALID_ARGUMENT`.
rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
}
- // Signals that the message has not been successfully processed. The message server should resend the message
- // follow the retry policy defined at server-side.
+ // Signals that the message has not been successfully processed. The message
+ // server should resend the message follow the retry policy defined at
+ // server-side.
//
- // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+ // If the corresponding topic or consumer group doesn't exist, returns
+ // `NOT_FOUND`.
rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {
}
- // Forwards one message to dead letter queue if the DeadLetterPolicy is triggered by this message at client-side,
- // return `OK` if success.
+ // Forwards one message to dead letter queue if the DeadLetterPolicy is
+ // triggered by this message at client-side, return `OK` if success.
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
returns (ForwardMessageToDeadLetterQueueResponse) {
}
@@ -430,26 +475,57 @@
rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
}
- // Querys the offset of the specific partition, returns the offset with `OK` if success.
- // The message server should maintain a numerical offset for each message in a parition.
+ // Querys the offset of the specific partition, returns the offset with `OK`
+ // if success. The message server should maintain a numerical offset for each
+ // message in a parition.
rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {
}
- // Pulls messages from the specific partition, returns a set of messages with next pull offset.
- // The pulled messages can't be acked or nacked, while the client is responsible for manage offesets for consumer,
- // typically update consume offset to local memory or a third-party storage service.
+ // Pulls messages from the specific partition, returns a set of messages with
+ // next pull offset. The pulled messages can't be acked or nacked, while the
+ // client is responsible for manage offesets for consumer, typically update
+ // consume offset to local memory or a third-party storage service.
//
- // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
- // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
- // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
- // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
- // may suffer from false empty responses.
+ // If the pending concurrent receive requests exceed the quota of the given
+ // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+ // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+ // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+ // message in the specific topic, returns `OK` with an empty message set.
+ // Please note that client may suffer from false empty responses.
rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {
}
- rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {
+ // Multiplexing RPC(s) for various polling requests, which issue different
+ // commands to client.
+ //
+ // Sometimes client may need to receive and process the command from server.
+ // To prevent the complexity of streaming RPC(s), a unary RPC using
+ // long-polling is another solution.
+ //
+ // To mark the request-response of corresponding command, `command_id` in
+ // message is recorded in the subsequent RPC(s). For example, after receiving
+ // command of printing thread stack trace, client would send
+ // `ReportMessageConsumptionResultRequest` to server, which contain both of
+ // the stack trace and `command_id`.
+ //
+ // At same time, `NoopCommand` is delivered from server when no new command is
+ // needed, it is essential for client to maintain the ping-pong.
+ //
+ rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {
}
+ // After receiving the corresponding polling command, the thread stack trace
+ // is reported to the server.
+ rpc ReportThreadStackTrace(ReportThreadStackTraceRequest) returns (ReportThreadStackTraceResponse) {
+ }
+
+ // After receiving the corresponding polling command, the consumption result
+ // of appointed message is reported to the server.
+ rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
+ returns (ReportMessageConsumptionResultResponse) {
+ }
+
+ // Notify the server that the client is terminated.
rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
}
}
\ No newline at end of file
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 24d56be..d0143e5 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -55,7 +55,6 @@
tls_channel_credential_options_.watch_identity_key_cert_pairs();
channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);
-
// Use unlimited receive message size.
channel_arguments_.SetMaxReceiveMessageSize(-1);
@@ -1237,42 +1236,23 @@
client->asyncEndTransaction(request, invocation_context);
}
-void ClientManagerImpl::multiplexingCall(
- const std::string& target_host, const Metadata& metadata, const MultiplexingRequest& request,
- std::chrono::milliseconds timeout, const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) {
- RpcClientSharedPtr client = getRpcClient(target_host);
- if (!client) {
- SPDLOG_WARN("No RPC client for {}", target_host);
- cb(nullptr);
- return;
- }
+void ClientManagerImpl::pollCommand(const std::string& target, const Metadata& metadata,
+ const PollCommandRequest& request, std::chrono::milliseconds timeout,
+ const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) {
+ auto client = getRpcClient(target);
- SPDLOG_DEBUG("Prepare to endTransaction. TargetHost={}, Request: {}", target_host.data(), request.DebugString());
-
- auto invocation_context = new InvocationContext<MultiplexingResponse>();
- invocation_context->remote_address = target_host;
+ auto invocation_context = new InvocationContext<PollCommandResponse>();
+ invocation_context->remote_address = target;
for (const auto& item : metadata) {
invocation_context->context.AddMetadata(item.first, item.second);
}
-
- // Set RPC deadline.
auto deadline = std::chrono::system_clock::now() + timeout;
invocation_context->context.set_deadline(deadline);
- auto callback = [cb](const InvocationContext<MultiplexingResponse>* invocation_context) {
- if (!invocation_context->status.ok()) {
- SPDLOG_WARN("Failed to apply multiplexing-call. TargetHost={}, gRPC statusCode={}, errorMessage={}",
- invocation_context->remote_address, invocation_context->status.error_message(),
- invocation_context->status.error_message());
- cb(invocation_context);
- return;
- }
+ auto callback = [cb](const InvocationContext<PollCommandResponse>* invocation_context) { cb(invocation_context); };
- SPDLOG_DEBUG("endTransaction completed OK. Response: {}", invocation_context->response.DebugString());
- cb(invocation_context);
- };
invocation_context->callback = callback;
- client->asyncMultiplexingCall(request, invocation_context);
+ client->asyncPollCommand(request, invocation_context);
}
void ClientManagerImpl::queryOffset(const std::string& target_host, const Metadata& metadata,
@@ -1458,6 +1438,93 @@
client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
}
+std::error_code ClientManagerImpl::reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
+ const ReportThreadStackTraceRequest& request,
+ std::chrono::milliseconds timeout) {
+ std::error_code ec;
+ auto client = getRpcClient(target_host);
+ grpc::ClientContext context;
+ auto deadline = std::chrono::system_clock::now() + timeout;
+ context.set_deadline(deadline);
+
+ for (const auto& item : metadata) {
+ context.AddMetadata(item.first, item.second);
+ }
+
+ ReportThreadStackTraceResponse response;
+ auto status = client->reportThreadStackTrace(&context, request, &response);
+ if (!status.ok()) {
+ ec = ErrorCode::RequestTimeout;
+ SPDLOG_WARN("Failed to report thread-stack-trace to {}. Cause: {}", target_host, status.error_message());
+ return ec;
+ }
+
+ switch (response.common().status().code()) {
+ case google::rpc::Code::OK: {
+ return ec;
+ }
+ case google::rpc::Code::UNAUTHENTICATED: {
+ SPDLOG_WARN("Unauthorized. Host={}, Cause: {}", target_host, response.common().status().message());
+ ec = ErrorCode::Unauthorized;
+ break;
+ }
+ case google::rpc::Code::PERMISSION_DENIED: {
+ SPDLOG_WARN("Forbidden. Host={}, Cause: {}", target_host, response.common().status().message());
+ ec = ErrorCode::Forbidden;
+ break;
+ }
+ default: {
+ ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("Unsupported response code, please update client to latest release. Host={}", target_host);
+ }
+ }
+ return ec;
+}
+
+std::error_code ClientManagerImpl::reportMessageConsumptionResult(const std::string& target_host,
+ const Metadata& metadata,
+ const ReportMessageConsumptionResultRequest& request,
+ std::chrono::milliseconds timeout) {
+ std::error_code ec;
+ auto client = getRpcClient(target_host);
+ grpc::ClientContext context;
+ auto deadline = std::chrono::system_clock::now() + timeout;
+ context.set_deadline(deadline);
+
+ for (const auto& item : metadata) {
+ context.AddMetadata(item.first, item.second);
+ }
+
+ ReportMessageConsumptionResultResponse response;
+ auto status = client->reportMessageConsumptionResult(&context, request, &response);
+ if (!status.ok()) {
+ ec = ErrorCode::RequestTimeout;
+ SPDLOG_WARN("Failed to report thread-stack-trace to {}. Cause: {}", target_host, status.error_message());
+ return ec;
+ }
+
+ switch (response.common().status().code()) {
+ case google::rpc::Code::OK: {
+ return ec;
+ }
+ case google::rpc::Code::UNAUTHENTICATED: {
+ SPDLOG_WARN("Unauthorized. Host={}, Cause: {}", target_host, response.common().status().message());
+ ec = ErrorCode::Unauthorized;
+ break;
+ }
+ case google::rpc::Code::PERMISSION_DENIED: {
+ SPDLOG_WARN("Forbidden. Host={}, Cause: {}", target_host, response.common().status().message());
+ ec = ErrorCode::Forbidden;
+ break;
+ }
+ default: {
+ ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("Unsupported response code, please update client to latest release. Host={}", target_host);
+ }
+ }
+ return ec;
+}
+
std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, const Metadata& metadata,
const NotifyClientTerminationRequest& request,
std::chrono::milliseconds timeout) {
@@ -1523,6 +1590,10 @@
SPDLOG_INFO("{}", stats);
}
+void ClientManagerImpl::submit(std::function<void()> task) {
+ callback_thread_pool_->submit(task);
+}
+
const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task";
const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task";
const char* ClientManagerImpl::HEALTH_CHECK_TASK_NAME = "health-check-task";
diff --git a/src/main/cpp/client/RpcClientImpl.cpp b/src/main/cpp/client/RpcClientImpl.cpp
index 12af6d7..cabced2 100644
--- a/src/main/cpp/client/RpcClientImpl.cpp
+++ b/src/main/cpp/client/RpcClientImpl.cpp
@@ -119,16 +119,27 @@
need_heartbeat_ = need_heartbeat;
}
-void RpcClientImpl::asyncMultiplexingCall(const MultiplexingRequest& request,
- InvocationContext<MultiplexingResponse>* invocation_context) {
- assert(invocation_context);
+void RpcClientImpl::asyncPollCommand(const PollCommandRequest& request,
+ InvocationContext<PollCommandResponse>* invocation_context) {
invocation_context->response_reader =
- stub_->PrepareAsyncMultiplexingCall(&invocation_context->context, request, completion_queue_.get());
+ stub_->PrepareAsyncPollCommand(&invocation_context->context, request, completion_queue_.get());
invocation_context->response_reader->StartCall();
invocation_context->response_reader->Finish(&invocation_context->response, &invocation_context->status,
invocation_context);
}
+grpc::Status RpcClientImpl::reportThreadStackTrace(grpc::ClientContext* context,
+ const ReportThreadStackTraceRequest& request,
+ ReportThreadStackTraceResponse* response) {
+ return stub_->ReportThreadStackTrace(context, request, response);
+}
+
+grpc::Status RpcClientImpl::reportMessageConsumptionResult(grpc::ClientContext* context,
+ const ReportMessageConsumptionResultRequest& request,
+ ReportMessageConsumptionResultResponse* response) {
+ return stub_->ReportMessageConsumptionResult(context, request, response);
+}
+
grpc::Status RpcClientImpl::notifyClientTermination(grpc::ClientContext* context,
const NotifyClientTerminationRequest& request,
NotifyClientTerminationResponse* response) {
diff --git a/src/main/cpp/client/include/ClientManager.h b/src/main/cpp/client/include/ClientManager.h
index 20ea165..9212564 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -3,6 +3,7 @@
#include <chrono>
#include <memory>
#include <system_error>
+#include <functional>
#include "Client.h"
#include "ReceiveMessageCallback.h"
@@ -37,10 +38,10 @@
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) = 0;
- virtual void multiplexingCall(const std::string& target, const Metadata& metadata, const MultiplexingRequest& request,
- std::chrono::milliseconds timeout,
- const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) = 0;
-
+ virtual void pollCommand(const std::string& target, const Metadata& metadata, const PollCommandRequest& request,
+ std::chrono::milliseconds timeout,
+ const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) = 0;
+
virtual bool wrapMessage(const rmq::Message& item, MQMessageExt& message_ext) = 0;
virtual void ack(const std::string& target_host, const Metadata& metadata, const AckMessageRequest& request,
@@ -88,7 +89,19 @@
const NotifyClientTerminationRequest& request,
std::chrono::milliseconds timeout) = 0;
+ virtual std::error_code reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
+ const ReportThreadStackTraceRequest& request,
+ std::chrono::milliseconds timeout) = 0;
+
+ virtual std::error_code reportMessageConsumptionResult(const std::string& target_host, const Metadata& metadata,
+ const ReportMessageConsumptionResultRequest& request,
+ std::chrono::milliseconds timeout) = 0;
+
virtual State state() const = 0;
+
+
+ virtual void submit(std::function<void()> task) = 0;
+
};
using ClientManagerPtr = std::shared_ptr<ClientManager>;
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h b/src/main/cpp/client/include/ClientManagerImpl.h
index 321664c..086a4e1 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -165,9 +165,9 @@
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) override;
- void multiplexingCall(const std::string& target, const Metadata& metadata, const MultiplexingRequest& request,
- std::chrono::milliseconds timeout,
- const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) override;
+ void pollCommand(const std::string& target, const Metadata& metadata, const PollCommandRequest& request,
+ std::chrono::milliseconds timeout,
+ const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) override;
void queryOffset(const std::string& target_host, const Metadata& metadata, const QueryOffsetRequest& request,
std::chrono::milliseconds timeout,
@@ -181,6 +181,14 @@
const NotifyClientTerminationRequest& request,
std::chrono::milliseconds timeout) override;
+ std::error_code reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
+ const ReportThreadStackTraceRequest& request,
+ std::chrono::milliseconds timeout) override;
+
+ std::error_code reportMessageConsumptionResult(const std::string& target_host, const Metadata& metadata,
+ const ReportMessageConsumptionResultRequest& request,
+ std::chrono::milliseconds timeout) override;
+
void trace(bool trace) {
trace_ = trace;
}
@@ -191,6 +199,8 @@
State state() const override;
+ void submit(std::function<void()> task) override;
+
private:
void doHeartbeat();
diff --git a/src/main/cpp/client/include/RpcClient.h b/src/main/cpp/client/include/RpcClient.h
index 86522bd..2b93a08 100644
--- a/src/main/cpp/client/include/RpcClient.h
+++ b/src/main/cpp/client/include/RpcClient.h
@@ -40,24 +40,25 @@
using HealthCheckResponse = rmq::HealthCheckResponse;
using EndTransactionRequest = rmq::EndTransactionRequest;
using EndTransactionResponse = rmq::EndTransactionResponse;
-using MultiplexingRequest = rmq::MultiplexingRequest;
-using MultiplexingResponse = rmq::MultiplexingResponse;
-using GenericPollingRequest = rmq::GenericPollingRequest;
-using GenericPollingResponse = rmq::GenericPollingRequest;
-using PrintThreadStackRequest = rmq::PrintThreadStackRequest;
-using PrintThreadStackResponse = rmq::PrintThreadStackResponse;
-using VerifyMessageConsumptionRequest = rmq::VerifyMessageConsumptionRequest;
-using VerifyMessageConsumptionResponse = rmq::VerifyMessageConsumptionResponse;
-using RecoverOrphanedTransactionRequest = rmq::RecoverOrphanedTransactionRequest;
using QueryOffsetRequest = rmq::QueryOffsetRequest;
using QueryOffsetResponse = rmq::QueryOffsetResponse;
using PullMessageRequest = rmq::PullMessageRequest;
using PullMessageResponse = rmq::PullMessageResponse;
+using PollCommandRequest = rmq::PollCommandRequest;
+using PollCommandResponse = rmq::PollCommandResponse;
+using ReportThreadStackTraceRequest = rmq::ReportThreadStackTraceRequest;
+using ReportThreadStackTraceResponse = rmq::ReportThreadStackTraceResponse;
+using ReportMessageConsumptionResultRequest = rmq::ReportMessageConsumptionResultRequest;
+using ReportMessageConsumptionResultResponse = rmq::ReportMessageConsumptionResultResponse;
using ForwardMessageToDeadLetterQueueRequest = rmq::ForwardMessageToDeadLetterQueueRequest;
using ForwardMessageToDeadLetterQueueResponse = rmq::ForwardMessageToDeadLetterQueueResponse;
using NotifyClientTerminationRequest = rmq::NotifyClientTerminationRequest;
using NotifyClientTerminationResponse = rmq::NotifyClientTerminationResponse;
+/**
+ * @brief A RpcClient represents a session between client and a remote broker.
+ *
+ */
class RpcClient {
public:
RpcClient() = default;
@@ -93,8 +94,8 @@
virtual void asyncEndTransaction(const EndTransactionRequest& request,
InvocationContext<EndTransactionResponse>* invocation_context) = 0;
- virtual void asyncMultiplexingCall(const MultiplexingRequest& request,
- InvocationContext<MultiplexingResponse>* invocation_context) = 0;
+ virtual void asyncPollCommand(const PollCommandRequest& request,
+ InvocationContext<PollCommandResponse>* invocation_context) = 0;
virtual void asyncQueryOffset(const QueryOffsetRequest& request,
InvocationContext<QueryOffsetResponse>* invocation_context) = 0;
@@ -106,6 +107,14 @@
const ForwardMessageToDeadLetterQueueRequest& request,
InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) = 0;
+ virtual grpc::Status reportThreadStackTrace(grpc::ClientContext* context,
+ const ReportThreadStackTraceRequest& request,
+ ReportThreadStackTraceResponse* response) = 0;
+
+ virtual grpc::Status reportMessageConsumptionResult(grpc::ClientContext* context,
+ const ReportMessageConsumptionResultRequest& request,
+ ReportMessageConsumptionResultResponse* response) = 0;
+
virtual grpc::Status notifyClientTermination(grpc::ClientContext* context,
const NotifyClientTerminationRequest& request,
NotifyClientTerminationResponse* response) = 0;
diff --git a/src/main/cpp/client/include/RpcClientImpl.h b/src/main/cpp/client/include/RpcClientImpl.h
index 7cc782b..e478dda 100644
--- a/src/main/cpp/client/include/RpcClientImpl.h
+++ b/src/main/cpp/client/include/RpcClientImpl.h
@@ -50,8 +50,8 @@
void asyncEndTransaction(const EndTransactionRequest& request,
InvocationContext<EndTransactionResponse>* invocation_context) override;
- void asyncMultiplexingCall(const MultiplexingRequest& request,
- InvocationContext<MultiplexingResponse>* invocation_context) override;
+ void asyncPollCommand(const PollCommandRequest& request,
+ InvocationContext<PollCommandResponse>* invocation_context) override;
void asyncQueryOffset(const QueryOffsetRequest& request,
InvocationContext<QueryOffsetResponse>* invocation_context) override;
@@ -63,6 +63,13 @@
const ForwardMessageToDeadLetterQueueRequest& request,
InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) override;
+ grpc::Status reportThreadStackTrace(grpc::ClientContext* context, const ReportThreadStackTraceRequest& request,
+ ReportThreadStackTraceResponse* response) override;
+
+ grpc::Status reportMessageConsumptionResult(grpc::ClientContext* context,
+ const ReportMessageConsumptionResultRequest& request,
+ ReportMessageConsumptionResultResponse* response) override;
+
grpc::Status notifyClientTermination(grpc::ClientContext* context, const NotifyClientTerminationRequest& request,
NotifyClientTerminationResponse* response) override;
diff --git a/src/main/cpp/client/mocks/include/ClientManagerMock.h b/src/main/cpp/client/mocks/include/ClientManagerMock.h
index 0f1fe7d..03f827e 100644
--- a/src/main/cpp/client/mocks/include/ClientManagerMock.h
+++ b/src/main/cpp/client/mocks/include/ClientManagerMock.h
@@ -27,9 +27,9 @@
(const std::function<void(const std::error_code&, const HeartbeatResponse&)>&)),
(override));
- MOCK_METHOD(void, multiplexingCall,
- (const std::string&, const Metadata&, const MultiplexingRequest&, std::chrono::milliseconds,
- (const std::function<void(const InvocationContext<MultiplexingResponse>*)>&)),
+ MOCK_METHOD(void, pollCommand,
+ (const std::string&, const Metadata&, const PollCommandRequest&, std::chrono::milliseconds,
+ const std::function<void(const InvocationContext<PollCommandResponse>*)>&),
(override));
MOCK_METHOD(bool, wrapMessage, (const rmq::Message&, MQMessageExt&), (override));
@@ -88,7 +88,18 @@
(const std::string&, const Metadata&, const NotifyClientTerminationRequest&, std::chrono::milliseconds),
(override));
+ MOCK_METHOD(std::error_code, reportThreadStackTrace,
+ (const std::string&, const Metadata&, const ReportThreadStackTraceRequest&, std::chrono::milliseconds),
+ (override));
+
+ MOCK_METHOD(std::error_code, reportMessageConsumptionResult,
+ (const std::string&, const Metadata&, const ReportMessageConsumptionResultRequest&,
+ std::chrono::milliseconds),
+ (override));
+
MOCK_METHOD(State, state, (), (const override));
+
+ MOCK_METHOD(void, submit, (std::function<void()>), (override));
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/mocks/include/RpcClientMock.h b/src/main/cpp/client/mocks/include/RpcClientMock.h
index a4363fd..dadafe0 100644
--- a/src/main/cpp/client/mocks/include/RpcClientMock.h
+++ b/src/main/cpp/client/mocks/include/RpcClientMock.h
@@ -38,9 +38,6 @@
MOCK_METHOD(void, asyncEndTransaction, (const EndTransactionRequest&, InvocationContext<EndTransactionResponse>*),
(override));
- MOCK_METHOD(void, asyncMultiplexingCall, (const MultiplexingRequest&, InvocationContext<MultiplexingResponse>*),
- (override));
-
MOCK_METHOD(void, asyncQueryOffset, (const QueryOffsetRequest&, InvocationContext<QueryOffsetResponse>*), (override));
MOCK_METHOD(void, asyncPull, (const PullMessageRequest&, InvocationContext<PullMessageResponse>*), (override));
@@ -50,6 +47,17 @@
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*),
(override));
+ MOCK_METHOD(void, asyncPollCommand, (const PollCommandRequest&, InvocationContext<PollCommandResponse>*), (override));
+
+ MOCK_METHOD(grpc::Status, reportThreadStackTrace,
+ (grpc::ClientContext*, const ReportThreadStackTraceRequest&, ReportThreadStackTraceResponse*),
+ (override));
+
+ MOCK_METHOD(grpc::Status, reportMessageConsumptionResult,
+ (grpc::ClientContext*, const ReportMessageConsumptionResultRequest&,
+ ReportMessageConsumptionResultResponse*),
+ (override));
+
MOCK_METHOD(grpc::Status, notifyClientTermination,
(grpc::ClientContext*, const NotifyClientTerminationRequest&, NotifyClientTerminationResponse* response),
(override));
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 294de27..920187c 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -24,6 +24,7 @@
#include "MessageAccessor.h"
#include "Signature.h"
#include "rocketmq/MQMessageExt.h"
+#include "rocketmq/MessageListener.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -303,94 +304,133 @@
}
}
-void ClientImpl::multiplexing(const std::string& target, const MultiplexingRequest& request) {
+void ClientImpl::pollCommand(const std::string& target) {
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
- client_manager_->multiplexingCall(target, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
- std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
+
+ PollCommandRequest request;
+ auto&& resource_bundle = resourceBundle();
+ request.set_client_id(resource_bundle.client_id);
+ switch (resource_bundle.group_type) {
+ case GroupType::PUBLISHER:
+ request.mutable_producer_group()->set_resource_namespace(resource_namespace_);
+ request.mutable_producer_group()->set_name(group_name_);
+ break;
+
+ case GroupType::SUBSCRIBER:
+ request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
+ request.mutable_consumer_group()->set_name(group_name_);
+ break;
+ }
+ auto topics = request.mutable_topics();
+ for (const auto& item : resource_bundle.topics) {
+ auto topic = new rmq::Resource();
+ topic->set_resource_namespace(resource_namespace_);
+ topic->set_name(item);
+ topics->AddAllocated(topic);
+ }
+
+ client_manager_->pollCommand(target, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
+ std::bind(&ClientImpl::onPollCommandResponse, this, std::placeholders::_1));
}
-void ClientImpl::onMultiplexingResponse(const InvocationContext<MultiplexingResponse>* ctx) {
+void ClientImpl::verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message) {
+ MessageListener* listener = messageListener();
+
+ Metadata metadata;
+ Signature::sign(this, metadata);
+ ReportMessageConsumptionResultRequest request;
+ request.set_command_id(command_id);
+
+ if (!listener) {
+ request.mutable_status()->set_code(google::rpc::Code::FAILED_PRECONDITION);
+ request.mutable_status()->set_message("Target is not a push consumer client");
+ client_manager_->reportMessageConsumptionResult(remote_address, metadata, request,
+ absl::ToChronoMilliseconds(io_timeout_));
+ return;
+ }
+
+ if (MessageListenerType::FIFO == listener->listenerType()) {
+ request.mutable_status()->set_code(google::rpc::Code::FAILED_PRECONDITION);
+ request.mutable_status()->set_message("FIFO message does NOT support verification of message consumption");
+ client_manager_->reportMessageConsumptionResult(remote_address, metadata, request,
+ absl::ToChronoMilliseconds(io_timeout_));
+ return;
+ }
+
+ // Execute the actual verification task in dedicated thread-pool.
+ client_manager_->submit(std::bind(&ClientImpl::doVerify, this, remote_address, command_id, message));
+}
+
+void ClientImpl::onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx) {
if (!ctx->status.ok()) {
- std::string remote_address = ctx->remote_address;
- auto multiplexingLater = [this, remote_address]() {
- MultiplexingRequest request;
- fillGenericPollingRequest(request);
- multiplexing(remote_address, request);
- };
- static std::string task_name = "Initiate multiplex request later";
- client_manager_->getScheduler().schedule(multiplexingLater, task_name, std::chrono::seconds(3),
- std::chrono::seconds(0));
+ static std::string task_name = "Poll-Command-Later";
+ client_manager_->getScheduler().schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,
+ std::chrono::seconds(3), std::chrono::seconds(0));
return;
}
switch (ctx->response.type_case()) {
- case MultiplexingResponse::TypeCase::kPrintThreadStackRequest: {
- MultiplexingRequest request;
- request.mutable_print_thread_stack_response()->mutable_common()->mutable_status()->set_code(
- google::rpc::Code::UNIMPLEMENTED);
- request.mutable_print_thread_stack_response()->set_stack_trace(
- "Print thread stack trace is not supported by C++ SDK");
+ case PollCommandResponse::TypeCase::kPrintThreadStackTraceCommand: {
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
- client_manager_->multiplexingCall(ctx->remote_address, metadata, request, absl::ToChronoMilliseconds(io_timeout_),
- std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
+ ReportThreadStackTraceRequest request;
+ auto command_id = ctx->response.print_thread_stack_trace_command().command_id();
+ request.set_command_id(command_id);
+ request.set_thread_stack_trace("--RocketMQ-Client-CPP does NOT support thread stack trace report--");
+ client_manager_->reportThreadStackTrace(ctx->remote_address, metadata, request,
+ absl::ToChronoMilliseconds(io_timeout_));
break;
}
- case MultiplexingResponse::TypeCase::kVerifyMessageConsumptionRequest: {
- auto data = ctx->response.verify_message_consumption_request().message();
+ case PollCommandResponse::TypeCase::kVerifyMessageConsumptionCommand: {
+ auto command_id = ctx->response.verify_message_consumption_command().command_id();
+ auto data = ctx->response.verify_message_consumption_command().message();
MQMessageExt message;
- MultiplexingRequest request;
+ ReportMessageConsumptionResultRequest request;
+ request.set_command_id(command_id);
+ Metadata metadata;
+ Signature::sign(this, metadata);
if (!client_manager_->wrapMessage(data, message)) {
SPDLOG_WARN("Message to verify consumption is corrupted");
- request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_code(
- google::rpc::Code::INVALID_ARGUMENT);
- request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_message(
- "Message to verify is corrupted");
- multiplexing(ctx->remote_address, request);
- return;
+ request.mutable_status()->set_code(google::rpc::Code::INVALID_ARGUMENT);
+ request.mutable_status()->set_message("Data corrupted");
+ client_manager_->reportMessageConsumptionResult(ctx->remote_address, metadata, request,
+ absl::ToChronoMilliseconds(io_timeout_));
}
- std::string&& result = verifyMessageConsumption(message);
- request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_code(
- google::rpc::Code::OK);
- request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_message(result);
- multiplexing(ctx->remote_address, request);
+ verifyMessageConsumption(std::move(ctx->remote_address), std::move(command_id), std::move(message));
break;
}
- case MultiplexingResponse::TypeCase::kRecoverOrphanedTransactionRequest: {
- auto orphan = ctx->response.recover_orphaned_transaction_request().orphaned_transactional_message();
+ case PollCommandResponse::TypeCase::kRecoverOrphanedTransactionCommand: {
+ auto orphan = ctx->response.recover_orphaned_transaction_command().orphaned_transactional_message();
MQMessageExt message;
if (client_manager_->wrapMessage(orphan, message)) {
MessageAccessor::setTargetEndpoint(message, ctx->remote_address);
- const std::string& transaction_id = ctx->response.recover_orphaned_transaction_request().transaction_id();
- resolveOrphanedTransactionalMessage(transaction_id, message);
+ const std::string& transaction_id = ctx->response.recover_orphaned_transaction_command().transaction_id();
+ // Dispatch task to thread-pool.
+ client_manager_->submit(
+ std::bind(&ClientImpl::resolveOrphanedTransactionalMessage, this, transaction_id, message));
} else {
SPDLOG_WARN("Failed to resolve orphaned transactional message, potentially caused by message-body checksum "
"verification failure.");
}
- MultiplexingRequest request;
- fillGenericPollingRequest(request);
- multiplexing(ctx->remote_address, request);
break;
}
- case MultiplexingResponse::TypeCase::kPollingResponse: {
- MultiplexingRequest request;
- fillGenericPollingRequest(request);
- multiplexing(ctx->remote_address, request);
+ case PollCommandResponse::TypeCase::kNoopCommand: {
+ SPDLOG_DEBUG("A long-polling-command period completed.");
break;
}
default: {
SPDLOG_WARN("Unsupported multiplex type");
- MultiplexingRequest request;
- fillGenericPollingRequest(request);
- multiplexing(ctx->remote_address, request);
break;
}
}
+
+ // Initiate next round of long-polling-command immediately.
+ pollCommand(ctx->remote_address);
}
void ClientImpl::onRemoteEndpointRemoval(const std::vector<std::string>& hosts) {
@@ -451,30 +491,6 @@
}
}
-void ClientImpl::fillGenericPollingRequest(MultiplexingRequest& request) {
- auto&& resource_bundle = resourceBundle();
- auto polling_request = request.mutable_polling_request();
- polling_request->set_client_id(clientId());
- switch (resource_bundle.group_type) {
- case GroupType::PUBLISHER:
- polling_request->mutable_producer_group()->set_resource_namespace(resource_namespace_);
- polling_request->mutable_producer_group()->set_name(group_name_);
- break;
-
- case GroupType::SUBSCRIBER:
- polling_request->mutable_consumer_group()->set_resource_namespace(resource_namespace_);
- polling_request->mutable_consumer_group()->set_name(group_name_);
- break;
- }
- auto topics = polling_request->mutable_topics();
- for (const auto& item : resource_bundle.topics) {
- auto topic = new rmq::Resource();
- topic->set_resource_namespace(resource_namespace_);
- topic->set_name(item);
- topics->AddAllocated(topic);
- }
-}
-
void ClientImpl::notifyClientTermination() {
SPDLOG_WARN("Should NOT reach here. Subclass should have overridden this function.");
std::abort();
@@ -492,4 +508,38 @@
}
}
+void ClientImpl::doVerify(std::string target, std::string command_id, MQMessageExt message) {
+ ReportMessageConsumptionResultRequest request;
+ request.set_command_id(command_id);
+ StandardMessageListener* callback = reinterpret_cast<StandardMessageListener*>(messageListener());
+ try {
+ std::vector<MQMessageExt> batch = {message};
+ auto result = callback->consumeMessage(batch);
+ switch (result) {
+ case ConsumeMessageResult::SUCCESS: {
+ SPDLOG_DEBUG("Verify message[MsgId={}] OK", message.getMsgId());
+ request.mutable_status()->set_message("Consume Success");
+ break;
+ }
+ case ConsumeMessageResult::FAILURE: {
+ SPDLOG_WARN("Message Listener failed to consume message[MsgId={}] when verifying", message.getMsgId());
+ request.mutable_status()->set_code(google::rpc::Code::INTERNAL);
+ request.mutable_status()->set_message("Consume Failed");
+ break;
+ }
+ }
+ } catch (...) {
+ SPDLOG_WARN("Exception raised when invoking message listener provided by application developer. MsgId of message "
+ "to verify: {}",
+ message.getMsgId());
+ request.mutable_status()->set_code(google::rpc::Code::INTERNAL);
+ request.mutable_status()->set_message(
+ "Unexpected exception raised while invoking message listener provided by application developer");
+ }
+
+ Metadata metadata;
+ Signature::sign(this, metadata);
+ client_manager_->reportMessageConsumptionResult(target, metadata, request, absl::ToChronoMilliseconds(io_timeout_));
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index 998d357..679f7b3 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -18,6 +18,7 @@
#include "NameServerResolver.h"
#include "OtlpExporter.h"
#include "rocketmq/MQMessageExt.h"
+#include "rocketmq/MessageListener.h"
#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -100,10 +101,16 @@
virtual void prepareHeartbeatData(HeartbeatRequest& request) = 0;
- virtual std::string verifyMessageConsumption(const MQMessageExt& message) {
- return "Unsupported";
- }
+ virtual void verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message);
+ /**
+ * @brief Execute transaction-state-checker to commit or roll-back the orphan transactional message.
+ *
+ * It is no-op by default and Producer-subclass is supposed to override it.
+ *
+ * @param transaction_id
+ * @param message
+ */
virtual void resolveOrphanedTransactionalMessage(const std::string& transaction_id, const MQMessageExt& message) {
}
@@ -124,6 +131,17 @@
void notifyClientTermination(const NotifyClientTerminationRequest& request);
+ /**
+ * @brief Return application developer provided message listener if this client is of PushConsumer type.
+ *
+ * By default, it returns nullptr such that error messages are generated and directed to server immediately.
+ *
+ * @return nullptr by default.
+ */
+ virtual MessageListener* messageListener() {
+ return nullptr;
+ }
+
private:
/**
* This is a low-level API that fetches route data from name server through
@@ -153,14 +171,14 @@
void updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route)
LOCKS_EXCLUDED(topic_route_table_mtx_);
- void multiplexing(const std::string& target, const MultiplexingRequest& request);
+ void pollCommand(const std::string& target);
- void onMultiplexingResponse(const InvocationContext<MultiplexingResponse>* ctx);
+ void onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx);
void onHealthCheckResponse(const std::error_code& endpoint, const InvocationContext<HealthCheckResponse>* ctx)
LOCKS_EXCLUDED(isolated_endpoints_mtx_);
- void fillGenericPollingRequest(MultiplexingRequest& request);
+ void doVerify(std::string target, std::string command_id, MQMessageExt message);
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file