Use latest protocol specification
diff --git a/example/rocketmq/ExampleProducer.cpp b/example/rocketmq/ExampleProducer.cpp
index 8dee436..6452509 100644
--- a/example/rocketmq/ExampleProducer.cpp
+++ b/example/rocketmq/ExampleProducer.cpp
@@ -66,12 +66,12 @@
std::thread stats_thread(stats_lambda);
std::string body = randomString(1024 * 4);
- std::cout << "Message body: " << body << std::endl;
+ std::cout << "Message body size: " << body.length() << std::endl;
message.setBody(body);
try {
producer.start();
- for (int i = 0; i < 102400; ++i) {
+ for (int i = 0; i < 16; ++i) {
SendResult sendResult = producer.send(message);
std::cout << sendResult.getMessageQueue().simpleName() << ": " << sendResult.getMsgId() << std::endl;
count++;
diff --git a/proto/apache/rocketmq/v1/admin.proto b/proto/apache/rocketmq/v1/admin.proto
index 8ed2037..554207b 100644
--- a/proto/apache/rocketmq/v1/admin.proto
+++ b/proto/apache/rocketmq/v1/admin.proto
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
syntax = "proto3";
package apache.rocketmq.v1;
@@ -7,9 +22,7 @@
option java_package = "apache.rocketmq.v1";
option java_generate_equals_and_hash = true;
option java_string_check_utf8 = true;
-
-// Ali Cloud Service
-option java_outer_classname = "ACS";
+option java_outer_classname = "MQAdmin";
message ChangeLogLevelRequest {
enum Level {
diff --git a/proto/apache/rocketmq/v1/definition.proto b/proto/apache/rocketmq/v1/definition.proto
index 9ecd46a..c92aad3 100644
--- a/proto/apache/rocketmq/v1/definition.proto
+++ b/proto/apache/rocketmq/v1/definition.proto
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
syntax = "proto3";
import "google/protobuf/timestamp.proto";
@@ -9,8 +24,6 @@
option java_package = "apache.rocketmq.v1";
option java_generate_equals_and_hash = true;
option java_string_check_utf8 = true;
-
-// Ali Cloud Service
option java_outer_classname = "MQDomain";
enum Permission {
@@ -46,7 +59,7 @@
//
// This field will be honored on a best effort basis.
//
- // If this parameter is 0, a default value of 5 is used.
+ // If this parameter is 0, a default value of 16 is used.
int32 max_delivery_attempts = 1;
reserved 2 to 64;
@@ -216,6 +229,7 @@
repeated string keys = 2;
// Message identifier, client-side generated, remains unique.
+ // if message_id is empty, the send message request will be aborted with status `INVALID_ARGUMENT`
string message_id = 3;
// Message body digest
@@ -284,6 +298,9 @@
message Message {
Resource topic = 1;
// User defined key-value pairs.
+ // If user_attribute contains the reserved keys by RocketMQ,
+ // the send message request will be aborted with status `INVALID_ARGUMENT`. See below links for the reserved keys
+ // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
map<string, string> user_attribute = 2;
SystemAttribute system_attribute = 3;
bytes body = 4;
diff --git a/proto/apache/rocketmq/v1/service.proto b/proto/apache/rocketmq/v1/service.proto
index 5deb63c..6220f51 100644
--- a/proto/apache/rocketmq/v1/service.proto
+++ b/proto/apache/rocketmq/v1/service.proto
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
syntax = "proto3";
import "google/protobuf/duration.proto";
@@ -13,8 +28,6 @@
option java_package = "apache.rocketmq.v1";
option java_generate_equals_and_hash = true;
option java_string_check_utf8 = true;
-
-// Ali Cloud Service
option java_outer_classname = "MQService";
message ResponseCommon {
@@ -28,10 +41,13 @@
reserved 7 to 64;
}
+// A QueryRouteRequest requests a set of Partitions of the specific topic with
+// necessary route infos.
message QueryRouteRequest {
Resource topic = 1;
- // Service access point
+ // The service access points used to issue QueryRouteRequest
+ // The QueryRouteResponse will indicate the adress of subsequent RPCs.
Endpoints endpoints = 2;
reserved 3 to 64;
@@ -248,20 +264,6 @@
reserved 6 to 64;
}
-message UpdateOffsetRequest {
- Resource group = 1;
- Partition partition = 2;
- int64 partition_offset = 3;
-
- reserved 4 to 64;
-}
-
-message UpdateOffsetResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
message GenericPollingRequest {
string client_id = 1;
repeated Resource topics = 2;
@@ -336,10 +338,13 @@
}
message NotifyClientTerminationRequest {
- Resource group = 1;
- string client_id = 2;
+ oneof group {
+ Resource producer_group = 1;
+ Resource consumer_group = 2;
+ }
+ string client_id = 3;
- reserved 3 to 64;
+ reserved 4 to 64;
}
message NotifyClientTerminationResponse {
@@ -348,37 +353,103 @@
reserved 2 to 64;
}
+// For all the rpcs in MessagingService may return below erros:
+//
+// If the request doesn't have a valid authentication credentials, returns `UNAUTHENTICATED`.
+// If the caller doesn't permission to execute the specified operation, returns `PERMISSION_DENIED`.
+// If the per-user rate quota has been exceeded, returns `RESOURCE_EXHAUSTED`.
+// If any unexpected server-side exception occurs, returns `INTERNAL`.
service MessagingService {
- rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
- rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+ // Querys the route info of a topic from specific endpoints, the server returns a set of partition if success.
+ //
+ // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+ rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
+ }
- rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
+ // Producer or consumer sends HeartbeatRequest to server in order to report necessary
+ // client-side information, like subscription data of consumer. Returns `OK` if success.
+ //
+ // If the client language info is invalid, returns `INVALID_ARGUMENT`
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
+ }
- rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+ // Checks the health status of message server, returns `OK` if no network issues.
+ // Clients could use this RPC to detect the availability of server, and adpot necessary isolation measures.
+ rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {
+ }
- rpc QueryAssignment(QueryAssignmentRequest)
- returns (QueryAssignmentResponse) {}
+ // Sends one message to the specific partition of a topic, returns message id or transaction id with status `OK`.
+ //
+ // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
+ }
- rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
+ // Querys the assigned partition route info of a topic for current consumer,
+ // the returned assignment result is descided by server-side load balacner.
+ //
+ // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+ rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+ }
- rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+ // Receives messages from the server in batch manner, returns a set of messages if success.
+ // The received messages should be acked or uacked after processed.
+ //
+ // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
+ // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
+ // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+ // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
+ // may suffer from false empty responses.
+ rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {
+ }
- rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
+ // Acknowledges the message associated with the `receipt_handle` or `offset` in the
+ // `AckMessageRequest`, it means the message has been successfully processed.
+ // Returns `OK` if the message server remove the relevant message successfully.
+ //
+ // If the given receipt_handle is illegal or out of date, returns `INVALID_ARGUMENT`.
+ rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
+ }
+ // Signals that the message has not been successfully processed. The message server should resend the message
+ // follow the retry policy defined at server-side.
+ //
+ // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+ rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {
+ }
+
+ // Forwards one message to dead letter queue if the DeadLetterPolicy is triggered by this message at client-side,
+ // return `OK` if success.
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
- returns (ForwardMessageToDeadLetterQueueResponse) {}
+ returns (ForwardMessageToDeadLetterQueueResponse) {
+ }
- rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+ // Commits or rollback one transactional message.
+ rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
+ }
- rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
+ // Querys the offset of the specific partition, returns the offset with `OK` if success.
+ // The message server should maintain a numerical offset for each message in a parition.
+ rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {
+ }
- rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
+ // Pulls messages from the specific partition, returns a set of messages with next pull offset.
+ // The pulled messages can't be acked or nacked, while the client is responsible for manage offesets for consumer,
+ // typically update consume offset to local memory or a third-party storage service.
+ //
+ // If the pending concurrent receive requests exceed the quota of the given consumer group, returns `UNAVAILABLE`.
+ // If the upstream store server hangs, return `DEADLINE_EXCEEDED` in a timely manner.
+ // If the corresponding topic or consumer group doesn't exist, returns `NOT_FOUND`.
+ // If there is no new message in the specific topic, returns `OK` with an empty message set. Please note that client
+ // may suffer from false empty responses.
+ rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {
+ }
- rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+ rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {
+ }
- rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {}
-
- rpc NotifyClientTermination(NotifyClientTerminationRequest)
- returns (NotifyClientTerminationResponse) {}
+ rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+ }
}
\ No newline at end of file
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 0dfc00b..24d56be 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -55,9 +55,12 @@
tls_channel_credential_options_.watch_identity_key_cert_pairs();
channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);
- int max_message_size = 1024 * 1024 * 16;
- channel_arguments_.SetMaxReceiveMessageSize(max_message_size);
- channel_arguments_.SetMaxSendMessageSize(max_message_size);
+
+ // Use unlimited receive message size.
+ channel_arguments_.SetMaxReceiveMessageSize(-1);
+
+ int max_send_message_size = 1024 * 1024 * 16;
+ channel_arguments_.SetMaxSendMessageSize(max_send_message_size);
/*
* Keep-alive settings:
diff --git a/src/main/cpp/client/include/Client.h b/src/main/cpp/client/include/Client.h
index 9a14e8c..917115d 100644
--- a/src/main/cpp/client/include/Client.h
+++ b/src/main/cpp/client/include/Client.h
@@ -12,23 +12,27 @@
public:
~Client() override = default;
- virtual void endpointsInUse(absl::flat_hash_set<std::string>& endpoints) = 0;
+ virtual void endpointsInUse(absl::flat_hash_set<std::string> &endpoints) = 0;
virtual void heartbeat() = 0;
virtual bool active() = 0;
- virtual void onRemoteEndpointRemoval(const std::vector<std::string>&) = 0;
+ virtual void onRemoteEndpointRemoval(const std::vector<std::string> &) = 0;
/**
- * For endpoints that are marked as inactive due to one or multiple business operation failure, this function is to
- * initiate health-check RPCs; Once the health-check passes, they are conceptually add back to serve further business
- * workload.
+ * For endpoints that are marked as inactive due to one or multiple business
+ * operation failure, this function is to initiate health-check RPCs; Once the
+ * health-check passes, they are conceptually add back to serve further
+ * business workload.
*/
virtual void healthCheck() = 0;
- virtual void schedule(const std::string& task_name, const std::function<void(void)>& task,
+ virtual void schedule(const std::string &task_name,
+ const std::function<void(void)> &task,
std::chrono::milliseconds delay) = 0;
+
+ virtual void notifyClientTermination() = 0;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/mocks/include/ClientMock.h b/src/main/cpp/client/mocks/include/ClientMock.h
index dfd7a66..1765992 100644
--- a/src/main/cpp/client/mocks/include/ClientMock.h
+++ b/src/main/cpp/client/mocks/include/ClientMock.h
@@ -19,6 +19,8 @@
MOCK_METHOD(void, schedule, (const std::string&, const std::function<void()>&, std::chrono::milliseconds),
(override));
+
+ MOCK_METHOD(void, notifyClientTermination, (), (override));
};
ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index e868c4c..294de27 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -10,6 +10,7 @@
#include <system_error>
#include <utility>
+#include "RpcClient.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "apache/rocketmq/v1/definition.pb.h"
@@ -69,7 +70,6 @@
if (route_update_handle_) {
client_manager_->getScheduler().cancel(route_update_handle_);
}
- notifyClientTermination();
client_manager_.reset();
} else {
SPDLOG_ERROR("Try to shutdown ClientImpl, but its state is not as expected. Expecting: {}, Actual: {}",
@@ -476,15 +476,16 @@
}
void ClientImpl::notifyClientTermination() {
+ SPDLOG_WARN("Should NOT reach here. Subclass should have overridden this function.");
+ std::abort();
+}
+
+void ClientImpl::notifyClientTermination(const NotifyClientTerminationRequest& request) {
absl::flat_hash_set<std::string> endpoints;
endpointsInUse(endpoints);
Metadata metadata;
Signature::sign(this, metadata);
- NotifyClientTerminationRequest request;
- request.mutable_group()->set_resource_namespace(resource_namespace_);
- request.mutable_group()->set_name(group_name_);
- request.set_client_id(clientId());
for (const auto& endpoint : endpoints) {
client_manager_->notifyClientTermination(endpoint, metadata, request, absl::ToChronoMilliseconds(io_timeout_));
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 99a8c23..8a6ae27 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -5,6 +5,7 @@
#include <system_error>
#include <utility>
+#include "Client.h"
#include "absl/strings/str_join.h"
#include "opencensus/trace/propagation/trace_context.h"
#include "opencensus/trace/span.h"
@@ -59,11 +60,21 @@
return;
}
+ notifyClientTermination();
+
ClientImpl::shutdown();
assert(State::STOPPED == state_.load());
SPDLOG_INFO("Producer instance stopped");
}
+void ProducerImpl::notifyClientTermination() {
+ NotifyClientTerminationRequest request;
+ request.mutable_producer_group()->set_resource_namespace(resource_namespace_);
+ request.mutable_producer_group()->set_name(group_name_);
+ request.set_client_id(clientId());
+ ClientImpl::notifyClientTermination(request);
+}
+
bool ProducerImpl::isRunning() const {
return State::STARTED == state_.load(std::memory_order_relaxed);
}
diff --git a/src/main/cpp/rocketmq/PullConsumerImpl.cpp b/src/main/cpp/rocketmq/PullConsumerImpl.cpp
index 7bb747e..43f8203 100644
--- a/src/main/cpp/rocketmq/PullConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PullConsumerImpl.cpp
@@ -24,6 +24,8 @@
void PullConsumerImpl::shutdown() {
// Shutdown services started by current tier
+ notifyClientTermination();
+
// Shutdown services that are started by the parent
ClientImpl::shutdown();
State expected = State::STOPPING;
@@ -160,4 +162,12 @@
}
}
+void PullConsumerImpl::notifyClientTermination() {
+ NotifyClientTerminationRequest request;
+ request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
+ request.mutable_consumer_group()->set_name(group_name_);
+ request.set_client_id(clientId());
+ ClientImpl::notifyClientTermination(request);
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 4386b84..a8cbc7b 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -630,4 +630,12 @@
return std::move(resource_bundle);
}
+void PushConsumerImpl::notifyClientTermination() {
+ NotifyClientTerminationRequest request;
+ request.mutable_consumer_group()->set_resource_namespace(resource_namespace_);
+ request.mutable_consumer_group()->set_name(group_name_);
+ request.set_client_id(clientId());
+ ClientImpl::notifyClientTermination(request);
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index 3de7452..998d357 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -3,8 +3,10 @@
#include <atomic>
#include <chrono>
#include <cstdint>
+#include <cstdlib>
#include <system_error>
+#include "RpcClient.h"
#include "absl/strings/string_view.h"
#include "apache/rocketmq/v1/definition.pb.h"
@@ -118,7 +120,9 @@
void setAccessPoint(rmq::Endpoints* endpoints);
- virtual void notifyClientTermination();
+ void notifyClientTermination() override;
+
+ void notifyClientTermination(const NotifyClientTerminationRequest& request);
private:
/**
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index 8d03e35..dea27a2 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -108,6 +108,8 @@
void resolveOrphanedTransactionalMessage(const std::string& transaction_id, const MQMessageExt& message) override;
+ void notifyClientTermination() override;
+
private:
absl::flat_hash_map<std::string, TopicPublishInfoPtr> topic_publish_info_table_ GUARDED_BY(topic_publish_info_mtx_);
absl::Mutex topic_publish_info_mtx_; // protects topic_publish_info_
diff --git a/src/main/cpp/rocketmq/include/PullConsumerImpl.h b/src/main/cpp/rocketmq/include/PullConsumerImpl.h
index 058743b..c009113 100644
--- a/src/main/cpp/rocketmq/include/PullConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PullConsumerImpl.h
@@ -36,6 +36,8 @@
return shared_from_this();
}
+ void notifyClientTermination() override;
+
MessageModel message_model_{MessageModel::CLUSTERING};
};
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 44e1b54..e695810 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -183,6 +183,8 @@
ClientResourceBundle resourceBundle() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_) override;
+ void notifyClientTermination() override;
+
private:
absl::flat_hash_map<std::string, FilterExpression>
topic_filter_expression_table_ GUARDED_BY(topic_filter_expression_table_mtx_);