Set gRPC max message size
diff --git a/example/rocketmq/ExampleAsyncProducer.cpp b/example/rocketmq/ExampleAsyncProducer.cpp
index 8cc77db..3bf3801 100644
--- a/example/rocketmq/ExampleAsyncProducer.cpp
+++ b/example/rocketmq/ExampleAsyncProducer.cpp
@@ -174,7 +174,7 @@
}
Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
+ logger.setLevel(Level::Info);
logger.init();
const char* topic = "cpp_sdk_standard";
diff --git a/example/rocketmq/ExampleProducer.cpp b/example/rocketmq/ExampleProducer.cpp
index a65fc44..3551bf5 100644
--- a/example/rocketmq/ExampleProducer.cpp
+++ b/example/rocketmq/ExampleProducer.cpp
@@ -65,7 +65,7 @@
std::thread stats_thread(stats_lambda);
- std::string body = randomString(1024);
+ std::string body = randomString(1024 * 1024 * 4);
std::cout << "Message body: " << body << std::endl;
message.setBody(body);
diff --git a/example/rocketmq/ExamplePushConsumer.cpp b/example/rocketmq/ExamplePushConsumer.cpp
index 697954b..c792c9c 100644
--- a/example/rocketmq/ExamplePushConsumer.cpp
+++ b/example/rocketmq/ExamplePushConsumer.cpp
@@ -16,8 +16,9 @@
ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override {
for (const MQMessageExt& msg : msgs) {
SPDLOG_INFO("Consume message[Topic={}, MessageId={}] OK", msg.getTopic(), msg.getMsgId());
- std::cout << "Consume Message[MsgId=" << msg.getMsgId() << "] OK" << std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::cout << "Consume Message[MsgId=" << msg.getMsgId() << "] OK. Body Size: " << msg.getBody().size()
+ << std::endl;
+ // std::this_thread::sleep_for(std::chrono::seconds(1));
}
return ConsumeMessageResult::SUCCESS;
}
@@ -44,7 +45,7 @@
push_consumer.setConsumeThreadCount(4);
push_consumer.start();
- std::this_thread::sleep_for(std::chrono::seconds(300));
+ std::this_thread::sleep_for(std::chrono::minutes(30));
push_consumer.shutdown();
return EXIT_SUCCESS;
diff --git a/src/main/cpp/base/include/InvocationContext.h b/src/main/cpp/base/include/InvocationContext.h
index 3178d8c..bf6788c 100644
--- a/src/main/cpp/base/include/InvocationContext.h
+++ b/src/main/cpp/base/include/InvocationContext.h
@@ -34,6 +34,7 @@
std::string remote_address;
grpc::ClientContext context;
grpc::Status status;
+ std::string task_name;
absl::Time created_time{absl::Now()};
std::chrono::steady_clock::time_point start_time{std::chrono::steady_clock::now()};
};
@@ -42,6 +43,9 @@
struct InvocationContext : public BaseInvocationContext {
void onCompletion(bool ok) override {
+ auto elapsed =
+ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
+ SPDLOG_DEBUG("RPC[{}] costs {}ms", task_name, elapsed);
/// Client-side Read, Server-side Read, Client-side
/// RecvInitialMetadata (which is typically included in Read if not
/// done explicitly): ok indicates whether there is a valid message
@@ -60,8 +64,6 @@
}
if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code()) {
- auto elapsed =
- std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - context.deadline())
.count();
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index b662e26..4fe9743 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -54,6 +54,10 @@
tls_channel_credential_options_.watch_root_certs();
tls_channel_credential_options_.watch_identity_key_cert_pairs();
channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);
+
+ int max_message_size = 1024 * 1024 * 16;
+ channel_arguments_.SetMaxReceiveMessageSize(max_message_size);
+ channel_arguments_.SetMaxSendMessageSize(max_message_size);
SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", resource_namespace_);
}
@@ -172,6 +176,7 @@
SPDLOG_DEBUG("Prepare to send health-check to {}. Request: {}", target_host, request.DebugString());
auto invocation_context = new InvocationContext<HealthCheckResponse>();
+ invocation_context->task_name = fmt::format("HealthCheck to {}", target_host);
invocation_context->remote_address = target_host;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
@@ -289,6 +294,7 @@
SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.DebugString());
auto client = getRpcClient(target_host, true);
auto invocation_context = new InvocationContext<HeartbeatResponse>();
+ invocation_context->task_name = fmt::format("Heartbeat to {}", target_host);
invocation_context->remote_address = target_host;
for (const auto& item : metadata) {
invocation_context->context.AddMetadata(item.first, item.second);
@@ -390,6 +396,8 @@
RpcClientSharedPtr client = getRpcClient(target_host);
// Invocation context will be deleted in its onComplete() method.
auto invocation_context = new InvocationContext<SendMessageResponse>();
+ invocation_context->task_name =
+ fmt::format("Send message[] to {}", request.message().system_attribute().message_id(), target_host);
invocation_context->remote_address = target_host;
for (const auto& entry : metadata) {
invocation_context->context.AddMetadata(entry.first, entry.second);
@@ -551,6 +559,7 @@
}
auto invocation_context = new InvocationContext<QueryRouteResponse>();
+ invocation_context->task_name = fmt::format("Query route of topic={} from {}", request.topic().name(), target_host);
invocation_context->remote_address = target_host;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
for (const auto& item : metadata) {
@@ -703,6 +712,7 @@
};
auto invocation_context = new InvocationContext<QueryAssignmentResponse>();
+ invocation_context->task_name = fmt::format("QueryAssignment from {}", target);
invocation_context->remote_address = target;
for (const auto& item : metadata) {
invocation_context->context.AddMetadata(item.first, item.second);
@@ -719,6 +729,9 @@
RpcClientSharedPtr client = getRpcClient(target_host);
auto invocation_context = new InvocationContext<ReceiveMessageResponse>();
+ invocation_context->task_name = fmt::format("ReceiveMessage from queue[{}-{}-{}-{}], host={}", request.group().name(),
+ request.partition().topic().name(), request.partition().broker().name(),
+ request.partition().id(), target_host);
invocation_context->remote_address = target_host;
if (!metadata.empty()) {
for (const auto& item : metadata) {
@@ -1024,6 +1037,7 @@
RpcClientSharedPtr client = getRpcClient(target_host);
auto invocation_context = new InvocationContext<AckMessageResponse>();
+ invocation_context->task_name = fmt::format("Ack message[{}] against {}", request.message_id(), target);
invocation_context->remote_address = target_host;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
@@ -1078,6 +1092,7 @@
RpcClientSharedPtr client = getRpcClient(target_host);
assert(client);
auto invocation_context = new InvocationContext<NackMessageResponse>();
+ invocation_context->task_name = fmt::format("Nack Message[{}] against {}", request.message_id(), target_host);
invocation_context->remote_address = target_host;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
@@ -1144,6 +1159,8 @@
SPDLOG_DEBUG("Prepare to endTransaction. TargetHost={}, Request: {}", target_host.data(), request.DebugString());
auto invocation_context = new InvocationContext<EndTransactionResponse>();
+ invocation_context->task_name = fmt::format("End transaction[{}] of message[] against {}", request.transaction_id(),
+ request.message_id(), target_host);
invocation_context->remote_address = target_host;
for (const auto& item : metadata) {
invocation_context->context.AddMetadata(item.first, item.second);
@@ -1306,6 +1323,9 @@
SPDLOG_DEBUG("PullMessage Request: {}, target_host={}", request.DebugString(), target_host);
auto client = getRpcClient(target_host);
auto invocation_context = new InvocationContext<PullMessageResponse>();
+ invocation_context->task_name = fmt::format("PullMessage for queue[{}-{}-{}-{}] from {}", request.group().name(),
+ request.partition().topic().name(), request.partition().broker().name(),
+ request.partition().id(), target_host);
invocation_context->remote_address = target_host;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
for (const auto& item : metadata) {
@@ -1389,6 +1409,8 @@
SPDLOG_DEBUG("ForwardMessageToDeadLetterQueue Request: {}", request.DebugString());
auto client = getRpcClient(target_host);
auto invocation_context = new InvocationContext<ForwardMessageToDeadLetterQueueResponse>();
+ invocation_context->task_name =
+ fmt::format("Forward message[{}] to DLQ against {}", request.message_id(), target_host);
invocation_context->remote_address = target_host;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);