// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/rpc/error_details.proto";
import "google/rpc/status.proto";
import "apache/rocketmq/v1/definition.proto";
package apache.rocketmq.v1;
option java_multiple_files = true;
option java_package = "apache.rocketmq.v1";
option java_generate_equals_and_hash = true;
option java_string_check_utf8 = true;
option java_outer_classname = "MQService";
option csharp_namespace = "apache.rocketmq.v1";
message ResponseCommon {
google.rpc.Status status = 1;
google.rpc.RequestInfo request_info = 2;
google.rpc.Help help = 3;
google.rpc.RetryInfo retry_info = 4;
google.rpc.DebugInfo debug_info = 5;
google.rpc.ErrorInfo error_info = 6;
reserved 7 to 64;
// Topics are destination of messages to publish to or subscribe from. Similar
// to domain names, they will be addressable after resolution through the
// provided access point.
// Access points are usually the addresses of name servers, which fulfill
// service discovery, load-balancing and other auxillary services. Name servers
// receive periodic heartbeats from affiliate brokers and erase those which
// failed to maintain alive status.
// Name servers answer queries of QueryRouteRequest, responding clients with
// addressable partitions, which they may directly publish messages to or
// subscribe messages from.
// QueryRouteRequest shall include source endpoints, aka, configured
// access-point, which annotates tenant-id, instance-id or other
// vendor-specific settings. Purpose-built name servers may respond customized
// results based on these particular requirements.
message QueryRouteRequest {
Resource topic = 1;
Endpoints endpoints = 2;
reserved 3 to 64;
message QueryRouteResponse {
ResponseCommon common = 1;
repeated Partition partitions = 2;
reserved 3 to 64;
message SendMessageRequest {
Message message = 1;
Partition partition = 2;
reserved 3 to 64;
message SendMessageResponse {
ResponseCommon common = 1;
string message_id = 2;
string transaction_id = 3;
reserved 4 to 64;
message QueryAssignmentRequest {
Resource topic = 1;
Resource group = 2;
string client_id = 3;
// Service access point
Endpoints endpoints = 4;
reserved 5 to 64;
message QueryAssignmentResponse {
ResponseCommon common = 1;
repeated Assignment assignments = 2;
reserved 3 to 64;
message ReceiveMessageRequest {
Resource group = 1;
string client_id = 2;
Partition partition = 3;
FilterExpression filter_expression = 4;
ConsumePolicy consume_policy = 5;
google.protobuf.Timestamp initialization_timestamp = 6;
int32 batch_size = 7;
google.protobuf.Duration invisible_duration = 8;
google.protobuf.Duration await_time = 9;
bool fifo_flag = 10;
reserved 11 to 64;
message ReceiveMessageResponse {
ResponseCommon common = 1;
repeated Message messages = 2;
google.protobuf.Timestamp delivery_timestamp = 3;
google.protobuf.Duration invisible_duration = 4;
reserved 5 to 64;
message AckMessageRequest {
Resource group = 1;
Resource topic = 2;
string client_id = 3;
oneof handle {
string receipt_handle = 4;
int64 offset = 5;
string message_id = 6;
reserved 7 to 64;
message AckMessageResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message NackMessageRequest {
Resource group = 1;
Resource topic = 2;
string client_id = 3;
string receipt_handle = 4;
string message_id = 5;
int32 delivery_attempt = 6;
int32 max_delivery_attempts = 7;
reserved 8 to 64;
message NackMessageResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message ForwardMessageToDeadLetterQueueRequest {
Resource group = 1;
Resource topic = 2;
string client_id = 3;
string receipt_handle = 4;
string message_id = 5;
int32 delivery_attempt = 6;
int32 max_delivery_attempts = 7;
reserved 8 to 64;
message ForwardMessageToDeadLetterQueueResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message HeartbeatRequest {
string client_id = 1;
oneof client_data {
ProducerData producer_data = 2;
ConsumerData consumer_data = 3;
bool fifo_flag = 4;
reserved 5 to 64;
message HeartbeatResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message HealthCheckRequest {
Resource group = 1;
string client_host = 2;
reserved 3 to 64;
message HealthCheckResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message EndTransactionRequest {
Resource group = 1;
string message_id = 2;
string transaction_id = 3;
enum TransactionResolution {
TransactionResolution resolution = 4;
enum Source {
Source source = 5;
string trace_context = 6;
reserved 7 to 64;
message EndTransactionResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message QueryOffsetRequest {
Partition partition = 1;
QueryOffsetPolicy policy = 2;
google.protobuf.Timestamp time_point = 3;
reserved 4 to 64;
message QueryOffsetResponse {
ResponseCommon common = 1;
int64 offset = 2;
reserved 3 to 64;
message PullMessageRequest {
Resource group = 1;
Partition partition = 2;
int64 offset = 3;
int32 batch_size = 4;
google.protobuf.Duration await_time = 5;
FilterExpression filter_expression = 6;
string client_id = 7;
reserved 8 to 64;
message PullMessageResponse {
ResponseCommon common = 1;
int64 min_offset = 2;
int64 next_offset = 3;
int64 max_offset = 4;
repeated Message messages = 5;
reserved 6 to 64;
message NoopCommand { reserved 1 to 64; }
message PrintThreadStackTraceCommand {
string command_id = 1;
reserved 2 to 64;
message ReportThreadStackTraceRequest {
string command_id = 1;
string thread_stack_trace = 2;
reserved 3 to 64;
message ReportThreadStackTraceResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message VerifyMessageConsumptionCommand {
string command_id = 1;
Message message = 2;
reserved 3 to 64;
message ReportMessageConsumptionResultRequest {
string command_id = 1;
// 1. Return `INVALID_ARGUMENT` if message is corrupted.
// 2. Return `INTERNAL` if failed to consume message.
// 3. Return `OK` if success.
google.rpc.Status status = 2;
reserved 3 to 64;
message ReportMessageConsumptionResultResponse {
ResponseCommon common = 1;
reserved 2 to 64;
message RecoverOrphanedTransactionCommand {
Message orphaned_transactional_message = 1;
string transaction_id = 2;
reserved 3 to 64;
message PollCommandRequest {
string client_id = 1;
repeated Resource topics = 2;
oneof group {
Resource producer_group = 3;
Resource consumer_group = 4;
reserved 5 to 64;
message PollCommandResponse {
oneof type {
// Default command when no new command need to be delivered.
NoopCommand noop_command = 1;
// Request client to print thread stack trace.
PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
// Request client to verify the consumption of the appointed message.
VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
// Request client to recover the orphaned transaction message.
RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
reserved 5 to 64;
message NotifyClientTerminationRequest {
oneof group {
Resource producer_group = 1;
Resource consumer_group = 2;
string client_id = 3;
reserved 4 to 64;
message NotifyClientTerminationResponse {
ResponseCommon common = 1;
reserved 2 to 64;
// For all the RPCs in MessagingService, the following error handling policies
// apply:
// If the request doesn't bear a valid authentication credential, return a
// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
// user is not granted with sufficient permission to execute the requested
// operation, return a response with common.status.code == `PERMISSION_DENIED`.
// If the per-user-resource-based quota is exhausted, return a response with
// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
// errors raise, return a response with common.status.code == `INTERNAL`.
service MessagingService {
// Queries the route entries of the requested topic in the perspective of the
// given endpoints. On success, servers should return a collection of
// addressable partitions. Note servers may return customized route entries
// based on endpoints provided.
// If the requested topic doesn't exist, returns `NOT_FOUND`.
// If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
// Producer or consumer sends HeartbeatRequest to servers periodically to
// keep-alive. Additionally, it also reports client-side configuration,
// including topic subscription, load-balancing group name, etc.
// Returns `OK` if success.
// If a client specifies a language that is not yet supported by servers,
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
// Checks the health status of message server, returns `OK` if services are
// online and serving. Clients may use this RPC to detect availability of
// messaging service, and take isolation actions when necessary.
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
// Delivers messages to brokers.
// Clients may further:
// 1. Refine a message destination to topic partition which fulfills parts of
// FIFO semantic;
// 2. Flag a message as transactional, which keeps it invisible to consumers
// until it commits;
// 3. Time a message, making it invisible to consumers till specified
// time-point;
// 4. And more...
// Returns message-id or transaction-id with status `OK` on success.
// If the destination topic doesn't exist, returns `NOT_FOUND`.
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
// Queries the assigned partition route info of a topic for current consumer,
// the returned assignment result is decided by server-side load balancer.
// If the corresponding topic doesn't exist, returns `NOT_FOUND`.
// If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
rpc QueryAssignment(QueryAssignmentRequest)
returns (QueryAssignmentResponse) {}
// Receives messages from the server in batch manner, returns a batch of
// messages if success. The received messages should be ACKed or NACKed after
// processing.
// If the pending concurrent receive requests exceed the quota of the given
// consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
// return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
// or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
// message in the specific topic, returns `OK` with an empty message set.
// Please note that client may suffer from false empty responses.
rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
// Acknowledges the message associated with the `receipt_handle` or `offset`
// in the `AckMessageRequest`, it means the message has been successfully
// processed. Returns `OK` if the message server remove the relevant message
// successfully.
// If the given receipt_handle is illegal or out of date, returns
rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
// Signals that the message has not been successfully processed. The message
// server should resend the message follow the retry policy defined at
// server-side.
// If the corresponding topic or consumer group doesn't exist, returns
rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
// Forwards one message to dead letter queue if the DeadLetterPolicy is
// triggered by this message at client-side, return `OK` if success.
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
returns (ForwardMessageToDeadLetterQueueResponse) {}
// Commits or rollback one transactional message.
rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
// Queries the offset of the specific partition, returns the offset with `OK`
// if success. The message server should maintain a numerical offset for each
// message in a partition.
rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
// Pulls messages from the specific partition, returns a set of messages with
// next pull offset. The pulled messages can't be ACKed or NACKed, while the
// client is responsible for manage offsets for consumer, typically update
// consume offset to local memory or a third-party storage service.
// If the pending concurrent receive requests exceed the quota of the given
// consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
// return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
// or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
// message in the specific topic, returns `OK` with an empty message set.
// Please note that client may suffer from false empty responses.
rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
// Multiplexing RPC(s) for various polling requests, which issue different
// commands to client.
// Sometimes client may need to receive and process the command from server.
// To prevent the complexity of streaming RPC(s), a unary RPC using
// long-polling is another solution.
// To mark the request-response of corresponding command, `command_id` in
// message is recorded in the subsequent RPC(s). For example, after receiving
// command of printing thread stack trace, client would send
// `ReportMessageConsumptionResultRequest` to server, which contain both of
// the stack trace and `command_id`.
// At same time, `NoopCommand` is delivered from server when no new command is
// needed, it is essential for client to maintain the ping-pong.
rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
// After receiving the corresponding polling command, the thread stack trace
// is reported to the server.
rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
returns (ReportThreadStackTraceResponse) {}
// After receiving the corresponding polling command, the consumption result
// of appointed message is reported to the server.
rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
returns (ReportMessageConsumptionResultResponse) {}
// Notify the server that the client is terminated.
rpc NotifyClientTermination(NotifyClientTerminationRequest)
returns (NotifyClientTerminationResponse) {}