blob: 8bd75729c87e08867cd04247e205cb2766271c33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto2";
package pulsar.proto;
option java_package = "org.apache.pulsar.common.api.proto";
option go_package = "./proto";
option optimize_for = LITE_RUNTIME;
message Schema {
enum Type {
None = 0;
String = 1;
Json = 2;
Protobuf = 3;
Avro = 4;
Bool = 5;
Int8 = 6;
Int16 = 7;
Int32 = 8;
Int64 = 9;
Float = 10;
Double = 11;
Date = 12;
Time = 13;
Timestamp = 14;
KeyValue = 15;
Instant = 16;
LocalDate = 17;
LocalTime = 18;
LocalDateTime = 19;
ProtobufNative = 20;
}
required string name = 1;
required bytes schema_data = 3;
required Type type = 4;
repeated KeyValue properties = 5;
}
message MessageIdData {
required uint64 ledgerId = 1;
required uint64 entryId = 2;
optional int32 partition = 3 [default = -1];
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5;
optional int32 batch_size = 6;
// For the chunk message id, we need to specify the first chunk message id.
optional MessageIdData first_chunk_message_id = 7;
}
message KeyValue {
required string key = 1;
required string value = 2;
}
message KeyLongValue {
required string key = 1;
required uint64 value = 2;
}
message IntRange {
required int32 start = 1;
required int32 end = 2;
}
message EncryptionKeys {
required string key = 1;
required bytes value = 2;
repeated KeyValue metadata = 3;
}
enum CompressionType {
NONE = 0;
LZ4 = 1;
ZLIB = 2;
ZSTD = 3;
SNAPPY = 4;
}
enum ProducerAccessMode {
Shared = 0; // By default multiple producers can publish on a topic
Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected.
WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access
ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer.
}
message MessageMetadata {
required string producer_name = 1;
required uint64 sequence_id = 2;
required uint64 publish_time = 3;
repeated KeyValue properties = 4;
// Property set on replicated message,
// includes the source cluster name
optional string replicated_from = 5;
//key to decide partition for the msg
optional string partition_key = 6;
// Override namespace's replication
repeated string replicate_to = 7;
optional CompressionType compression = 8 [default = NONE];
optional uint32 uncompressed_size = 9 [default = 0];
// Removed below checksum field from Metadata as
// it should be part of send-command which keeps checksum of header + payload
//optional sfixed64 checksum = 10;
// differentiate single and batch message metadata
optional int32 num_messages_in_batch = 11 [default = 1];
// the timestamp that this event occurs. it is typically set by applications.
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
optional uint64 event_time = 12 [default = 0];
// Contains encryption key name, encrypted key and metadata to describe the key
repeated EncryptionKeys encryption_keys = 13;
// Algorithm used to encrypt data key
optional string encryption_algo = 14;
// Additional parameters required by encryption
optional bytes encryption_param = 15;
optional bytes schema_version = 16;
optional bool partition_key_b64_encoded = 17 [ default = false ];
// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
optional bytes ordering_key = 18;
// Mark the message to be delivered at or after the specified timestamp
optional int64 deliver_at_time = 19;
// Identify whether a message is a "marker" message used for
// internal metadata instead of application published data.
// Markers will generally not be propagated back to clients
optional int32 marker_type = 20;
// transaction related message info
optional uint64 txnid_least_bits = 22;
optional uint64 txnid_most_bits = 23;
/// Add highest sequence id to support batch message with external sequence id
optional uint64 highest_sequence_id = 24 [default = 0];
// Indicate if the message payload value is set
optional bool null_value = 25 [default = false];
optional string uuid = 26;
optional int32 num_chunks_from_msg = 27;
optional int32 total_chunk_msg_size = 28;
optional int32 chunk_id = 29;
// Indicate if the message partition key is set
optional bool null_partition_key = 30 [default = false];
}
message SingleMessageMetadata {
repeated KeyValue properties = 1;
optional string partition_key = 2;
required int32 payload_size = 3;
optional bool compacted_out = 4 [default = false];
// the timestamp that this event occurs. it is typically set by applications.
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
optional uint64 event_time = 5 [default = 0];
optional bool partition_key_b64_encoded = 6 [ default = false ];
// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
optional bytes ordering_key = 7;
// Allows consumer retrieve the sequence id that the producer set.
optional uint64 sequence_id = 8;
// Indicate if the message payload value is set
optional bool null_value = 9 [ default = false ];
// Indicate if the message partition key is set
optional bool null_partition_key = 10 [ default = false];
}
// metadata added for entry from broker
message BrokerEntryMetadata {
optional uint64 broker_timestamp = 1;
optional uint64 index = 2;
}
enum ServerError {
UnknownError = 0;
MetadataError = 1; // Error with ZK/metadata
PersistenceError = 2; // Error writing reading from BK
AuthenticationError = 3; // Non valid authentication
AuthorizationError = 4; // Not authorized to use resource
ConsumerBusy = 5; // Unable to subscribe/unsubscribe because
// other consumers are connected
ServiceNotReady = 6; // Any error that requires client retry operation with a fresh lookup
ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded
ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded
ChecksumError = 9; // Error while verifying message checksum
UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature
TopicNotFound = 11; // Topic not found
SubscriptionNotFound = 12; // Subscription not found
ConsumerNotFound = 13; // Consumer not found
TooManyRequests = 14; // Error with too many simultaneously request
TopicTerminatedError = 15; // The topic has been terminated
ProducerBusy = 16; // Producer with same name is already connected
InvalidTopicName = 17; // The topic name is not valid
IncompatibleSchema = 18; // Specified schema was incompatible with topic schema
ConsumerAssignError = 19; // Dispatcher assign consumer error
TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
InvalidTxnStatus = 21; // Invalid txn status error
NotAllowedError = 22; // Not allowed error
TransactionConflict = 23; // Ack with transaction conflict
TransactionNotFound = 24; // Transaction not found
ProducerFenced = 25; // When a producer asks and fail to get exclusive producer access,
// or loses the eclusive status after a reconnection, the broker will
// use this error to indicate that this producer is now permanently
// fenced. Applications are now supposed to close it and create a
// new producer
}
enum AuthMethod {
AuthMethodNone = 0;
AuthMethodYcaV1 = 1;
AuthMethodAthens = 2;
}
// Each protocol version identify new features that are
// incrementally added to the protocol
enum ProtocolVersion {
v0 = 0; // Initial versioning
v1 = 1; // Added application keep-alive
v2 = 2; // Added RedeliverUnacknowledgedMessages Command
v3 = 3; // Added compression with LZ4 and ZLib
v4 = 4; // Added batch message support
v5 = 5; // Added disconnect client w/o closing connection
v6 = 6; // Added checksum computation for metadata + payload
v7 = 7; // Added CommandLookupTopic - Binary Lookup
v8 = 8; // Added CommandConsumerStats - Client fetches broker side consumer stats
v9 = 9; // Added end of topic notification
v10 = 10;// Added proxy to broker
v11 = 11;// C++ consumers before this version are not correctly handling the checksum field
v12 = 12;// Added get topic's last messageId from broker
// Added CommandActiveConsumerChange
// Added CommandGetTopicsOfNamespace
v13 = 13; // Schema-registry : added avro schema format for json
v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth
// Added Key_Shared subscription
v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse
v16 = 16; // Add support for broker entry metadata
v17 = 17; // Added support ack receipt
v18 = 18; // Add client support for broker entry metadata
v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse
}
message CommandConnect {
required string client_version = 1;
optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead.
optional string auth_method_name = 5;
optional bytes auth_data = 3;
optional int32 protocol_version = 4 [default = 0];
// Client can ask to be proxyied to a specific broker
// This is only honored by a Pulsar proxy
optional string proxy_to_broker_url = 6;
// Original principal that was verified by
// a Pulsar proxy. In this case the auth info above
// will be the auth of the proxy itself
optional string original_principal = 7;
// Original auth role and auth Method that was passed
// to the proxy. In this case the auth info above
// will be the auth of the proxy itself
optional string original_auth_data = 8;
optional string original_auth_method = 9;
// Feature flags
optional FeatureFlags feature_flags = 10;
}
message FeatureFlags {
optional bool supports_auth_refresh = 1 [default = false];
optional bool supports_broker_entry_metadata = 2 [default = false];
optional bool supports_partial_producer = 3 [default = false];
optional bool supports_topic_watchers = 4 [default = false];
}
message CommandConnected {
required string server_version = 1;
optional int32 protocol_version = 2 [default = 0];
optional int32 max_message_size = 3;
optional FeatureFlags feature_flags = 4;
}
message CommandAuthResponse {
optional string client_version = 1;
optional AuthData response = 2;
optional int32 protocol_version = 3 [default = 0];
}
message CommandAuthChallenge {
optional string server_version = 1;
optional AuthData challenge = 2;
optional int32 protocol_version = 3 [default = 0];
}
// To support mutual authentication type, such as Sasl, reuse this command to mutual auth.
message AuthData {
optional string auth_method_name = 1;
optional bytes auth_data = 2;
}
enum KeySharedMode {
AUTO_SPLIT = 0;
STICKY = 1;
}
message KeySharedMeta {
required KeySharedMode keySharedMode = 1;
repeated IntRange hashRanges = 3;
optional bool allowOutOfOrderDelivery = 4 [default = false];
}
message CommandSubscribe {
enum SubType {
Exclusive = 0;
Shared = 1;
Failover = 2;
Key_Shared = 3;
}
required string topic = 1;
required string subscription = 2;
required SubType subType = 3;
required uint64 consumer_id = 4;
required uint64 request_id = 5;
optional string consumer_name = 6;
optional int32 priority_level = 7;
// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];
// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
optional MessageIdData start_message_id = 9;
/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;
optional bool read_compacted = 11;
optional Schema schema = 12;
enum InitialPosition {
Latest = 0;
Earliest = 1;
}
// Signal whether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];
// Mark the subscription as "replicated". Pulsar will make sure
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;
// If true, the subscribe operation will cause a topic to be
// created if it does not exist already (and if topic auto-creation
// is allowed by broker.
// If false, the subscribe operation will fail if the topic
// does not exist.
optional bool force_topic_creation = 15 [default = true];
// If specified, the subscription will reset cursor's position back
// to specified seconds and will send messages from that point
optional uint64 start_message_rollback_duration_sec = 16 [default = 0];
optional KeySharedMeta keySharedMeta = 17;
repeated KeyValue subscription_properties = 18;
// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;
}
message CommandPartitionedTopicMetadata {
required string topic = 1;
required uint64 request_id = 2;
// TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 3;
// Original auth role and auth Method that was passed
// to the proxy.
optional string original_auth_data = 4;
optional string original_auth_method = 5;
}
message CommandPartitionedTopicMetadataResponse {
enum LookupType {
Success = 0;
Failed = 1;
}
optional uint32 partitions = 1; // Optional in case of error
required uint64 request_id = 2;
optional LookupType response = 3;
optional ServerError error = 4;
optional string message = 5;
}
message CommandLookupTopic {
required string topic = 1;
required uint64 request_id = 2;
optional bool authoritative = 3 [default = false];
// TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 4;
// Original auth role and auth Method that was passed
// to the proxy.
optional string original_auth_data = 5;
optional string original_auth_method = 6;
//
optional string advertised_listener_name = 7;
}
message CommandLookupTopicResponse {
enum LookupType {
Redirect = 0;
Connect = 1;
Failed = 2;
}
optional string brokerServiceUrl = 1; // Optional in case of error
optional string brokerServiceUrlTls = 2;
optional LookupType response = 3;
required uint64 request_id = 4;
optional bool authoritative = 5 [default = false];
optional ServerError error = 6;
optional string message = 7;
// If it's true, indicates to the client that it must
// always connect through the service url after the
// lookup has been completed.
optional bool proxy_through_service_url = 8 [default = false];
}
/// Create a new Producer on a topic, assigning the given producer_id,
/// all messages sent with this producer_id will be persisted on the topic
message CommandProducer {
required string topic = 1;
required uint64 producer_id = 2;
required uint64 request_id = 3;
/// If a producer name is specified, the name will be used,
/// otherwise the broker will generate a unique name
optional string producer_name = 4;
optional bool encrypted = 5 [default = false];
/// Add optional metadata key=value to this producer
repeated KeyValue metadata = 6;
optional Schema schema = 7;
// If producer reconnect to broker, the epoch of this producer will +1
optional uint64 epoch = 8 [default = 0];
// Indicate the name of the producer is generated or user provided
// Use default true here is in order to be forward compatible with the client
optional bool user_provided_producer_name = 9 [default = true];
// Require that this producers will be the only producer allowed on the topic
optional ProducerAccessMode producer_access_mode = 10 [default = Shared];
// Topic epoch is used to fence off producers that reconnects after a new
// exclusive producer has already taken over. This id is assigned by the
// broker on the CommandProducerSuccess. The first time, the client will
// leave it empty and then it will always carry the same epoch number on
// the subsequent reconnections.
optional uint64 topic_epoch = 11;
optional bool txn_enabled = 12 [default = false];
// Name of the initial subscription of the topic.
// If this field is not set, the initial subscription will not be created.
// If this field is set but the broker's `allowAutoSubscriptionCreation`
// is disabled, the producer will fail to be created.
optional string initial_subscription_name = 13;
}
message CommandSend {
required uint64 producer_id = 1;
required uint64 sequence_id = 2;
optional int32 num_messages = 3 [default = 1];
optional uint64 txnid_least_bits = 4 [default = 0];
optional uint64 txnid_most_bits = 5 [default = 0];
/// Add highest sequence id to support batch message with external sequence id
optional uint64 highest_sequence_id = 6 [default = 0];
optional bool is_chunk =7 [default = false];
// Specify if the message being published is a Pulsar marker or not
optional bool marker = 8 [default = false];
// Message id of this message, currently is used in replicator for shadow topic.
optional MessageIdData message_id = 9;
}
message CommandSendReceipt {
required uint64 producer_id = 1;
required uint64 sequence_id = 2;
optional MessageIdData message_id = 3;
optional uint64 highest_sequence_id = 4 [default = 0];
}
message CommandSendError {
required uint64 producer_id = 1;
required uint64 sequence_id = 2;
required ServerError error = 3;
required string message = 4;
}
message CommandMessage {
required uint64 consumer_id = 1;
required MessageIdData message_id = 2;
optional uint32 redelivery_count = 3 [default = 0];
repeated int64 ack_set = 4;
optional uint64 consumer_epoch = 5;
}
message CommandAck {
enum AckType {
Individual = 0;
Cumulative = 1;
}
required uint64 consumer_id = 1;
required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids
repeated MessageIdData message_id = 3;
// Acks can contain a flag to indicate the consumer
// received an invalid message that got discarded
// before being passed on to the application.
enum ValidationError {
UncompressedSizeCorruption = 0;
DecompressionError = 1;
ChecksumMismatch = 2;
BatchDeSerializeError = 3;
DecryptionError = 4;
}
optional ValidationError validation_error = 4;
repeated KeyLongValue properties = 5;
optional uint64 txnid_least_bits = 6 [default = 0];
optional uint64 txnid_most_bits = 7 [default = 0];
optional uint64 request_id = 8;
}
message CommandAckResponse {
required uint64 consumer_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
optional uint64 request_id = 6;
}
// changes on active consumer
message CommandActiveConsumerChange {
required uint64 consumer_id = 1;
optional bool is_active = 2 [default = false];
}
message CommandFlow {
required uint64 consumer_id = 1;
// Max number of messages to prefetch, in addition
// of any number previously specified
required uint32 messagePermits = 2;
}
message CommandUnsubscribe {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
}
// Reset an existing consumer to a particular message id
message CommandSeek {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
optional MessageIdData message_id = 3;
optional uint64 message_publish_time = 4;
}
// Message sent by broker to client when a topic
// has been forcefully terminated and there are no more
// messages left to consume
message CommandReachedEndOfTopic {
required uint64 consumer_id = 1;
}
message CommandCloseProducer {
required uint64 producer_id = 1;
required uint64 request_id = 2;
}
message CommandCloseConsumer {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
}
message CommandRedeliverUnacknowledgedMessages {
required uint64 consumer_id = 1;
repeated MessageIdData message_ids = 2;
optional uint64 consumer_epoch = 3;
}
message CommandSuccess {
required uint64 request_id = 1;
optional Schema schema = 2;
}
/// Response from CommandProducer
message CommandProducerSuccess {
required uint64 request_id = 1;
required string producer_name = 2;
// The last sequence id that was stored by this producer in the previous session
// This will only be meaningful if deduplication has been enabled.
optional int64 last_sequence_id = 3 [default = -1];
optional bytes schema_version = 4;
// The topic epoch assigned by the broker. This field will only be set if we
// were requiring exclusive access when creating the producer.
optional uint64 topic_epoch = 5;
// If producer is not "ready", the client will avoid to timeout the request
// for creating the producer. Instead it will wait indefinitely until it gets
// a subsequent `CommandProducerSuccess` with `producer_ready==true`.
optional bool producer_ready = 6 [default = true];
}
message CommandError {
required uint64 request_id = 1;
required ServerError error = 2;
required string message = 3;
}
// Commands to probe the state of connection.
// When either client or broker doesn't receive commands for certain
// amount of time, they will send a Ping probe.
message CommandPing {
}
message CommandPong {
}
message CommandConsumerStats {
required uint64 request_id = 1;
// required string topic_name = 2;
// required string subscription_name = 3;
required uint64 consumer_id = 4;
}
message CommandConsumerStatsResponse {
required uint64 request_id = 1;
optional ServerError error_code = 2;
optional string error_message = 3;
/// Total rate of messages delivered to the consumer. msg/s
optional double msgRateOut = 4;
/// Total throughput delivered to the consumer. bytes/s
optional double msgThroughputOut = 5;
/// Total rate of messages redelivered by this consumer. msg/s
optional double msgRateRedeliver = 6;
/// Name of the consumer
optional string consumerName = 7;
/// Number of available message permits for the consumer
optional uint64 availablePermits = 8;
/// Number of unacknowledged messages for the consumer
optional uint64 unackedMessages = 9;
/// Flag to verify if consumer is blocked due to reaching threshold of unacked messages
optional bool blockedConsumerOnUnackedMsgs = 10;
/// Address of this consumer
optional string address = 11;
/// Timestamp of connection
optional string connectedSince = 12;
/// Whether this subscription is Exclusive or Shared or Failover
optional string type = 13;
/// Total rate of messages expired on this subscription. msg/s
optional double msgRateExpired = 14;
/// Number of messages in the subscription backlog
optional uint64 msgBacklog = 15;
/// Total rate of messages ack. msg/s
optional double messageAckRate = 16;
}
message CommandGetLastMessageId {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
}
message CommandGetLastMessageIdResponse {
required MessageIdData last_message_id = 1;
required uint64 request_id = 2;
optional MessageIdData consumer_mark_delete_position = 3;
}
message CommandGetTopicsOfNamespace {
enum Mode {
PERSISTENT = 0;
NON_PERSISTENT = 1;
ALL = 2;
}
required uint64 request_id = 1;
required string namespace = 2;
optional Mode mode = 3 [default = PERSISTENT];
optional string topics_pattern = 4;
optional string topics_hash = 5;
}
message CommandGetTopicsOfNamespaceResponse {
required uint64 request_id = 1;
repeated string topics = 2;
// true iff the topic list was filtered by the pattern supplied by the client
optional bool filtered = 3 [default = false];
// hash computed from the names of matching topics
optional string topics_hash = 4;
// if false, topics is empty and the list of matching topics has not changed
optional bool changed = 5 [default = true];
}
message CommandWatchTopicList {
required uint64 request_id = 1;
required uint64 watcher_id = 2;
required string namespace = 3;
required string topics_pattern = 4;
// Only present when the client reconnects:
optional string topics_hash = 5;
}
message CommandWatchTopicListSuccess {
required uint64 request_id = 1;
required uint64 watcher_id = 2;
repeated string topic = 3;
required string topics_hash = 4;
}
message CommandWatchTopicUpdate {
required uint64 watcher_id = 1;
repeated string new_topics = 2;
repeated string deleted_topics = 3;
required string topics_hash = 4;
}
message CommandWatchTopicListClose {
required uint64 request_id = 1;
required uint64 watcher_id = 2;
}
message CommandGetSchema {
required uint64 request_id = 1;
required string topic = 2;
optional bytes schema_version = 3;
}
message CommandGetSchemaResponse {
required uint64 request_id = 1;
optional ServerError error_code = 2;
optional string error_message = 3;
optional Schema schema = 4;
optional bytes schema_version = 5;
}
message CommandGetOrCreateSchema {
required uint64 request_id = 1;
required string topic = 2;
required Schema schema = 3;
}
message CommandGetOrCreateSchemaResponse {
required uint64 request_id = 1;
optional ServerError error_code = 2;
optional string error_message = 3;
optional bytes schema_version = 4;
}
/// --- transaction related ---
enum TxnAction {
COMMIT = 0;
ABORT = 1;
}
message CommandTcClientConnectRequest {
required uint64 request_id = 1;
required uint64 tc_id = 2 [default = 0];
}
message CommandTcClientConnectResponse {
required uint64 request_id = 1;
optional ServerError error = 2;
optional string message = 3;
}
message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
optional uint64 tc_id = 3 [default = 0];
}
message CommandNewTxnResponse {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
}
message CommandAddPartitionToTxn {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
repeated string partitions = 4;
}
message CommandAddPartitionToTxnResponse {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
}
message Subscription {
required string topic = 1;
required string subscription = 2;
}
message CommandAddSubscriptionToTxn {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
repeated Subscription subscription = 4;
}
message CommandAddSubscriptionToTxnResponse {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
}
message CommandEndTxn {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional TxnAction txn_action = 4;
}
message CommandEndTxnResponse {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
}
message CommandEndTxnOnPartition {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional string topic = 4;
optional TxnAction txn_action = 5;
optional uint64 txnid_least_bits_of_low_watermark = 6;
}
message CommandEndTxnOnPartitionResponse {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
}
message CommandEndTxnOnSubscription {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional Subscription subscription= 4;
optional TxnAction txn_action = 5;
optional uint64 txnid_least_bits_of_low_watermark = 6;
}
message CommandEndTxnOnSubscriptionResponse {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
}
message BaseCommand {
enum Type {
CONNECT = 2;
CONNECTED = 3;
SUBSCRIBE = 4;
PRODUCER = 5;
SEND = 6;
SEND_RECEIPT= 7;
SEND_ERROR = 8;
MESSAGE = 9;
ACK = 10;
FLOW = 11;
UNSUBSCRIBE = 12;
SUCCESS = 13;
ERROR = 14;
CLOSE_PRODUCER = 15;
CLOSE_CONSUMER = 16;
PRODUCER_SUCCESS = 17;
PING = 18;
PONG = 19;
REDELIVER_UNACKNOWLEDGED_MESSAGES = 20;
PARTITIONED_METADATA = 21;
PARTITIONED_METADATA_RESPONSE = 22;
LOOKUP = 23;
LOOKUP_RESPONSE = 24;
CONSUMER_STATS = 25;
CONSUMER_STATS_RESPONSE = 26;
REACHED_END_OF_TOPIC = 27;
SEEK = 28;
GET_LAST_MESSAGE_ID = 29;
GET_LAST_MESSAGE_ID_RESPONSE = 30;
ACTIVE_CONSUMER_CHANGE = 31;
GET_TOPICS_OF_NAMESPACE = 32;
GET_TOPICS_OF_NAMESPACE_RESPONSE = 33;
GET_SCHEMA = 34;
GET_SCHEMA_RESPONSE = 35;
AUTH_CHALLENGE = 36;
AUTH_RESPONSE = 37;
ACK_RESPONSE = 38;
GET_OR_CREATE_SCHEMA = 39;
GET_OR_CREATE_SCHEMA_RESPONSE = 40;
// transaction related
NEW_TXN = 50;
NEW_TXN_RESPONSE = 51;
ADD_PARTITION_TO_TXN = 52;
ADD_PARTITION_TO_TXN_RESPONSE = 53;
ADD_SUBSCRIPTION_TO_TXN = 54;
ADD_SUBSCRIPTION_TO_TXN_RESPONSE = 55;
END_TXN = 56;
END_TXN_RESPONSE = 57;
END_TXN_ON_PARTITION = 58;
END_TXN_ON_PARTITION_RESPONSE = 59;
END_TXN_ON_SUBSCRIPTION = 60;
END_TXN_ON_SUBSCRIPTION_RESPONSE = 61;
TC_CLIENT_CONNECT_REQUEST = 62;
TC_CLIENT_CONNECT_RESPONSE = 63;
WATCH_TOPIC_LIST = 64;
WATCH_TOPIC_LIST_SUCCESS = 65;
WATCH_TOPIC_UPDATE = 66;
WATCH_TOPIC_LIST_CLOSE = 67;
}
required Type type = 1;
optional CommandConnect connect = 2;
optional CommandConnected connected = 3;
optional CommandSubscribe subscribe = 4;
optional CommandProducer producer = 5;
optional CommandSend send = 6;
optional CommandSendReceipt send_receipt = 7;
optional CommandSendError send_error = 8;
optional CommandMessage message = 9;
optional CommandAck ack = 10;
optional CommandFlow flow = 11;
optional CommandUnsubscribe unsubscribe = 12;
optional CommandSuccess success = 13;
optional CommandError error = 14;
optional CommandCloseProducer close_producer = 15;
optional CommandCloseConsumer close_consumer = 16;
optional CommandProducerSuccess producer_success = 17;
optional CommandPing ping = 18;
optional CommandPong pong = 19;
optional CommandRedeliverUnacknowledgedMessages redeliverUnacknowledgedMessages = 20;
optional CommandPartitionedTopicMetadata partitionMetadata = 21;
optional CommandPartitionedTopicMetadataResponse partitionMetadataResponse = 22;
optional CommandLookupTopic lookupTopic = 23;
optional CommandLookupTopicResponse lookupTopicResponse = 24;
optional CommandConsumerStats consumerStats = 25;
optional CommandConsumerStatsResponse consumerStatsResponse = 26;
optional CommandReachedEndOfTopic reachedEndOfTopic = 27;
optional CommandSeek seek = 28;
optional CommandGetLastMessageId getLastMessageId = 29;
optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30;
optional CommandActiveConsumerChange active_consumer_change = 31;
optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
optional CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33;
optional CommandGetSchema getSchema = 34;
optional CommandGetSchemaResponse getSchemaResponse = 35;
optional CommandAuthChallenge authChallenge = 36;
optional CommandAuthResponse authResponse = 37;
optional CommandAckResponse ackResponse = 38;
optional CommandGetOrCreateSchema getOrCreateSchema = 39;
optional CommandGetOrCreateSchemaResponse getOrCreateSchemaResponse = 40;
// transaction related
optional CommandNewTxn newTxn = 50;
optional CommandNewTxnResponse newTxnResponse = 51;
optional CommandAddPartitionToTxn addPartitionToTxn= 52;
optional CommandAddPartitionToTxnResponse addPartitionToTxnResponse = 53;
optional CommandAddSubscriptionToTxn addSubscriptionToTxn = 54;
optional CommandAddSubscriptionToTxnResponse addSubscriptionToTxnResponse = 55;
optional CommandEndTxn endTxn = 56;
optional CommandEndTxnResponse endTxnResponse = 57;
optional CommandEndTxnOnPartition endTxnOnPartition = 58;
optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59;
optional CommandEndTxnOnSubscription endTxnOnSubscription = 60;
optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse = 61;
optional CommandTcClientConnectRequest tcClientConnectRequest = 62;
optional CommandTcClientConnectResponse tcClientConnectResponse = 63;
optional CommandWatchTopicList watchTopicList = 64;
optional CommandWatchTopicListSuccess watchTopicListSuccess = 65;
optional CommandWatchTopicUpdate watchTopicUpdate = 66;
optional CommandWatchTopicListClose watchTopicListClose = 67;
}