PulsarはProducer/ConsumerとBrokerの通信に独自バイナリプロトコルを利用しています。 プロトコルは転送と実装の最大効率を保証しながら、 要求される全ての特徴 (例: Ack、フロー制御) をサポートするようにデザインされています。
クライアントとBrokerは互いに コマンド を交換します。 コマンドはProtocol Buffersによるバイナリメッセージの形式です。
Protocol Buffersによるコマンドの仕様はPulsarApi.proto
(pulsar-common/src/main/proto/PulsarApi.proto) に記述されています。
異なるProducerとConsumerのコマンドは同じ接続を介して 制限なく送信、インターリーブできます。
全てのコマンドはenum型と全ての可能なサブコマンドをオプショナルフィールドに含むProtocol Buffersのオブジェクト BaseCommand
に埋め込まれています。 いかなる時も、1つのBaseCommand
は1つのサブコマンドしか設定できません。
Protocol Buffersはメッセージ・フレームのソートを提供していないため、 Protocol Buffersのデータの前に4バイトのフィールドを追加しています。
1つのフレームの最大サイズは5 MBです。
Pulsarのプロトコルには以下の2つのタイプのコマンドがあります:
全てのサイズは4バイト符号なしビッグエンディアン整数として渡されます。
[TOTAL_SIZE] [CMD_SIZE] [CMD]
TOTAL_SIZE
→ (4バイト) フレームのサイズ。その後に続く全てのデータを数える。CMD_SIZE
→ (4バイト) シリアライズされたProtocol Buffersのコマンドのサイズ。CMD
→ バイナリ形式にシリアライズされたProtocol Buffersのオブジェクト。[TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
TOTAL_SIZE
→ (4バイト) フレームのサイズ。その後に続く全てのデータを数えます。CMD_SIZE
→ (4バイト) シリアライズされたProtocol Buffersのコマンドのサイズ。CMD
→ バイナリ形式にシリアライズされたProtocol Buffersのオブジェクト。MAGIC_NUMBER
→ (2バイト) 0x0e01
現在のフォーマットを特定するマジックナンバー。CHECKSUM
→ (4バイト) この後に続くデータに対するCRC32-Cチェックサム。METADATA_SIZE
→ (4バイト) メッセージ・メタデータのサイズ。METADATA
→ バイナリのProtocol Buffers形式のメッセージ・メタデータ。PAYLOAD
→ フレームの残りの部分はペイロードとみなされます。 任意のバイト列を含めることができます。メッセージ・メタデータは、アプリケーションに指定されたペイロードとともに、 シリアライズされたProtocol Buffersオブジェクトとして保存されます。
メタデータはProducerに作成され、変更されることなくConsumerに渡されます。
フィールド:
producer_name
→ メッセージを発行したProducerの名前。sequence_id
→ Producerから割り当てられたメッセージのシーケンスID。publish_time
→ 発行時のタイムスタンプ (UTCで1970年1月1日からのミリ秒単位の経過時間) 。properties
→ アプリケーションに定義されたPair<String, String>
形式のデータ (Pulsarの動作には影響を与えないKey-Value) 。replicated_from
→ (任意) メッセージがレプリケートされたものかを表し、 レプリケートされたものである場合レプリケート元のクラスタ名。partition_key
→ (任意) パーティションドトピックでpubulishされる間、 キーが存在すれば、そのハッシュをパーティションの選択に利用します。compression
→ (任意) ペイロードが圧縮されているか、 どの圧縮ライブラリが使用されているか。uncompressed_size
→ (任意) 圧縮されている場合、 Producerはこのフィールドに元のペイロードサイズを記述する必要があります。num_messages_in_batch
→ (任意) このメッセージが複数のメッセージのバッチである場合は、 含まれているメッセージの数が記述されている必要があります。バッチ・メッセージを利用する時、ペイロードはエントリのリストを含んでいます。 それらはSingleMessageMetadata
により定義された固有のメタデータを持ちます。
1つのバッチのペイロードは以下の通りです:
[MD_SIZE_1] [MD_1] [PAYLOAD_1] [MD_SIZE_2] [MD_2] [PAYLOAD_2] ...
ここで:
MD_SIZE_X
→ シリアライズされたProocol Buffers形式のシングル・メッセージ・メタデータのサイズ。MD_X
→ シングル・メッセージ・メタデータ。PAYLOAD_X
→ アプリケーションから渡されたメッセージペイロード。SingleMessageMetadata
のフィールド:
properties
→ アプリケーションが定義したプロパティ。partition_key
→ (任意) ハッシュによりパーティションを特定するためのKey。payload_size
→ バッチ内の1つのメッセージについてのペイロードのサイズ。圧縮が有効な場合、バッチ全体が一度に圧縮されます。
BrokerへのTCPコネクションの確立後、クライアントはセッションを開始しなければなりません。 通常これには6650番のポートが利用されます。
![Connect interaction](../../img/Binary Protocol - Connect.png)
BrokerからConnected
という応答を受け取ると、クライアントは 接続準備完了とみなします。もしBrokerが クライアントの認証を検証できなれければ、代わりにError
コマンドを返し TCPコネクションをクローズします。
例:
CommandConnect { "client_version" : "Pulsar-Client-Java-v1.15.2", "auth_method_name" : "my-authentication-plugin", "auth_data" : "my-auth-data", "protocol_version" : 6 }
フィールド:
client_version
→ フォーマットの強制されていないString形式の識別子。auth_method_name
→ (任意) 認証が有効な場合は認証プラグインの名前。auth_data
→ (任意) プラグイン固有の認証データ。protocol_version
→ クライアントのサポートするプロトコルのバージョン。 Brokerは指定されたバージョンより新しいプロトコルのコマンドを送りません。 Brokerは最低限のバージョンを要求する可能性があります。CommandConnected { "server_version" : "Pulsar-Broker-v1.15.2", "protocol_version" : 6 }
フィールド:
server_version
→ BrokerのバージョンのString形式の識別子。protocol_version
→ Brokerがサポートするプロトコルのバージョン。 クライアントは指定されたバージョンより新しいプロトコルのコマンドを送ることができません。クライアントとBrokerの長期のネットワークのパーティションやリモートエンドでの TCPコネクションを終了しないままマシンがクラッシュした場合 (例: 停電、カーネルパニック、ハードリブート) を識別するため、 リモートピアのアベイラビリティステータスを調べる仕組みが備わっています。
クライアントとBrokerはPing
コマンド定期的に送信し、 タイムアウト時間内 (Brokerが使用するデフォルトの値は60秒)にPong
レスポンスを受け取らなければ ソケットをクローズします。
Pulsarクライアントの実装において、Ping
の送信は必須ではありません。 しかし、Brokerから強制的にTCPコネクションを終了されないために、Ping
を受け取った場合は 迅速な返答が必要です。
メッセージを送るために、クライアントはProducerを作成する必要があります。 Producerを作成する時、Brokerは最初にそのクライアントがトピックへの発行を 認可されているかを検証します。
クライアントがProducerの作成を完了すると、ネゴシエートされたProducer IDを参照して Brokerにメッセージを発行できます。
![Producer interaction](../../img/Binary Protocol - Producer.png)
CommandProducer { "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", "producer_id" : 1, "request_id" : 1 }
パラメータ:
topic
→ Producerを作成したいトピックの完全な名前。producer_id
→ クライアントが生成した同一接続内で一意に定まるProducerの識別子。request_id
→ レスポンスのマッチングに用いる同一接続内で一意に定まるリクエストの識別子。producer_name
→ (任意) Producer名が指定されていればそれが利用され、 そうでなければ、Brokerが一意に定まる名前を生成します。生成されたProducer名は グローバルで一意に定まることが保証されます。Producerが最初に作成された時、 Brokerに新しいProducer名を作成させ、再接続後にProducerを再作成する時は再利用するという 実装が期待されます。BrokerはProducerSuccess
コマンドかError
コマンドを返します。
CommandProducerSuccess { "request_id" : 1, "producer_name" : "generated-unique-producer-name" }
パラメータ:
request_id
→ CreateProducer
リクエストのID。producer_name
→ 生成されたグローバルで一意に定まるProducer名、 もしくはクライアントにより指定された名前。Send
コマンドは既に存在するProducerのコンテキスト内で新しいメッセージを 発行する時に使用されます。このコマンドはコマンドだけでなくペイロードを含むフレーム内で 使用されます。ペイロードはペイロードコマンド セクションに記されている 完全なフォーマットで記述されます。
CommandSend { "producer_id" : 1, "sequence_id" : 0, "num_messages" : 1 }
パラメータ:
producer_id
→ ProducerのID。sequence_id
→ 各メッセージは関連する0からカウントが始まるような 実装が期待されるシーケンスのIDを持ちます。メッセージの効果的な発行を承認する。 SendReceipt
は、シーケンスIDによってメッセージを参照します。num_messages
→ (任意) バッチ・メッセージが発行される時に使用されます。メッセージが設定されたレプリカの数に応じて永続化されたあと、 BrokerはProducerにメッセージを受け取ったことを示すAck (確認応答) を返します。
CommandSendReceipt { "producer_id" : 1, "sequence_id" : 0, "message_id" : { "ledgerId" : 123, "entryId" : 456 } }
パラメータ:
producer_id
→ ProducerのID。sequence_id
→ pubulishされたメッセージのシーケンスのID。message_id
→ システムに割り当てられた1つのクラスタ内で一意に定まるメッセージのID。 メッセージIDはledgerId
とentryId
の2つのlong値から構成されます。 これらの値はBookKeeperのLedgerに追加された時に割り振られたIDに基づいています。注: このコマンドはProducerとBrokerの両方から送信される可能性があります。
BrokerがCloseProducer
コマンドを受け取った時、BrokerはProducerからそれ以上のメッセージの受信を 停止し、ペンディング中の全てのメッセージが永続化されるまで待ち、クライアントにSuccess
を返します。
Brokerは、正常なフェイルオーバー (例: Brokerの再起動中、またはトピックが別のBrokerに転送されるようにロードバランサによってアンロードされている場合など) を実行している時、クライアントにCloseProducer
コマンドを送ることができます。
ProducerがCloseProducer
コマンドを受け取った時、クライアントはサービスディスカバリ・ルックアップを通じて Producerを再作成することが期待されます。 この際TCPコネクションは影響を受けません。
Consumerはサブスクリプションへの接続とそこからのメッセージのconsumeに利用されます。 接続後、クライアントはトピックを購読する必要があります。 もしサブスクリプションがそのトピックになければ、新しく作成されます。
![Consumer](../../img/Binary Protocol - Consumer.png)
Consumerの準備が整ったあと、クライアントはBrokerにメッセージをプッシュするための パーミッションを与える 必要があります。これはFlow
コマンドによって成されます。
Flow
コマンドは追加の パーミッション を与えます。 一般的なConsumerの実装では、アプリケーションがメッセージをconsumeする 準備が整うまでのメッセージの蓄積にキューを利用します。
アプリケーションがメッセージをデキューしたあと、Consumerは Brokerに対してさらなるメッセージをプッシュするパーミッションを送ります。
CommandSubscribe { "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", "subscription" : "my-subscription-name", "subType" : "Exclusive", "consumer_id" : 1, "request_id" : 1 }
パラメータ:
topic
→ Consumerを作成したいトピックの完全な名前。subscription
→ サブスクリプション名。subType
→ サブスクリプションタイプ: Exclusive, Shared, Failoverconsumer_id
→ クライアントが生成した同一接続内で一意に定まるConsumerの識別子。request_id
→ レスポンスのマッチングに用いる同一接続内で一意に定まるリクエストの識別子。consumer_name
→ (任意) クライアントはConsumer名を指定できます。 この名前は、ステータス上で特定のConsumerを追跡するのに利用されます。 また、サブスクリプションタイプがFailoverの時、この名前はどのConsumerが master (メッセージを受け取るConsumer) となるかを 決めるのに使用されます。ConsumerはConsumer名によってソートされ、最初のものが masterとして選ばれます。CommandFlow { "consumer_id" : 1, "messagePermits" : 1000 }
パラメータ:
consumer_id
→ ConsumerのID。messagePermits
→ Brokerに対して追加でプッシュを許可するメッセージの数。Message
コマンドはBrokerが、与えられたパーミッションの制限内で Consumerにメッセージをプッシュする際に使用されます。
このコマンドはコマンドだけでなくペイロードを含むフレーム内で 使用されます。ペイロードはペイロードコマンド セクションに記されている 完全なフォーマットで記述されます。
CommandMessage { "consumer_id" : 1, "message_id" : { "ledgerId" : 123, "entryId" : 456 } }
Ack
コマンドは与えられたメッセージがアプリケーションによって正しく処理され、 Brokerによる破棄が可能であるというBrokerへの信号です。
加えて、BrokerはAck
の返されたメッセージに基いて Consumerの購読位置を管理します。
message CommandAck { "consumer_id" : 1, "ack_type" : "Individual", "message_id" : { "ledgerId" : 123, "entryId" : 456 } }
パラメータ:
consumer_id
→ ConsumerのID。ack_type
→ Ackのタイプ: Individual
もしくは Cumulative
message_id
→ メッセージのID。validation_error
→ (任意) Consumerが次の理由のためメッセージを 破棄したことを示します: UncompressedSizeCorruption
, DecompressionError
, ChecksumMismatch
, BatchDeSerializeError
注: このコマンドはConsumerとBrokerの両方から送信される可能性があります。
このコマンドはCloseProducer
と同様の振る舞いをします。
ConsumerはBrokerに、特定のConsumerにプッシュしたがまだAck
が返っていないメッセージの 再配送を要求できます。
そのProtocl BuffersオブジェクトはConsumerが再配送してほしい メッセージのIDのリストから構成されます。リストが空の場合は、 Brokerはペンディング中の全てのメッセージを再送します。
再配送において、メッセージは同一のConsumer、あるいはサブスクリプションタイプがSharedの場合は 全ての利用可能なConsumerに送信されます。
トピックのルックアップはクライアントがProducer, Consumerを作成、再接続する度に必要となります。 ルックアップはどの特定のBrokerが、利用しようとしているTopicを提供しているかを見つけるのに使用されます。
ルックアップはadmin APIで 説明されているREST APIのコールで行うことが可能です。
Pulsar-1.16からは、バイナリプロトコルで行うことも可能です。
例として、サービスディスカバリコンポーネントをpulsar://broker.example.com:6650
で 起動していると仮定します。
個別のBrokerはpulsar://broker-1.example.com:6650
, pulsar://broker-2.example.com:6650
, ...で起動しています。
クライアントはサービスディスカバリへの接続をLookupTopic
コマンドを 送るために利用できます。レスポンスは接続すべきBrokerのホスト名、あるいは ルックアップをリトライするためのBrokerのホスト名のどちらかです。
LookupTopic
コマンドはConnect
/ Connected
の最初のハンドシェイクを終えた 接続で使用されなければなりません。
![トピックのルックアップ](../../img/Binary Protocol - Topic lookup.png)
CommandLookupTopic { "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", "request_id" : 1, "authoritative" : false }
フィールド:
topic
→ ルックアップするトピックの名前。request_id
→ レスポンスとともに受け取るリクエストのID。authoritative
→ 最初のルックアップリクエストではfalse、その後に続くリダイレクトのレスポンスでは クライアントはレスポンスに含まれているものと同じ値を渡すべきです。成功時のレスポンス例:
Command LookupTopicResponse { "request_id" : 1, "response" : "Connect", "brokerServiceUrl" : "pulsar://broker-1.example.com:6650", "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651", "authoritative" : true }
リダイレクト時のレスポンス例:
Command LookupTopicResponse { "request_id" : 1, "response" : "Redirect", "brokerServiceUrl" : "pulsar://broker-2.example.com:6650", "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651", "authoritative" : true }
後者の場合、LookupTopic
コマンドリクエストをbroker-2.example.com
に対して再発行する必要があります。 このBrokerはルックアップリクエストに対して決定的なレスポンスを返すことができます。
パーティションドトピックのメタデータのディスカバリはトピックがパーティションドトピックかどうか、 いくつのパーティションがセットアップされているかを調べるのに利用されます。
トピックがパーティションドである場合、クライアントはpartition-X
というサフィックスを用いて 各パーティションにつき一つのProducerあるいはConsumerを作成することが期待されます。
この情報は最初にProducerあるいはConsumerが作成される時のみに利用されます。 再接続後は必要ありません。
パーティションドトピック・メタデータのディスカバリはトピックのルックアップと非常によく似た働きをします。 クライアントはサービスディスカバリに対してリクエストを送り、レスポンスには実際のメタデータが含まれます。
CommandPartitionedTopicMetadata { "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", "request_id" : 1 }
フィールド:
topic
→ パーティションのメタデータを確認するトピック。request_id
→ レスポンスに渡されるリクエストのID。メタデータ付きのレスポンスの例:
CommandPartitionedTopicMetadataResponse { "request_id" : 1, "response" : "Success", "partitions" : 32 }