Pulsarはマルチテナントで、ハイパフォーマンスなサーバ間メッセージングのソリューションです。
主な特徴は:
上位レベルでは、Pulsarインスタンスは単一または複数クラスタで構成され、それぞれのクラスタは異なる地理的地域に存在する可能性があります。
1つのPulsarクラスタはメッセージBroker, BookKeeperおよび (コンポーネント間の) 連携と構成管理のためのZooKeeperアンサンブルで構成されています。
そしてクライアントライブラリが使い勝手の良いAPIを提供します。
PulsarのBrokerはステートレスなコンポーネントで、主に2つの異なるコンポーネントを実行します:
トピックのルックアップと管理操作のためのRESTインターフェースを提供するHTTPサーバと、全てのデータ転送を独自バイナリプロトコル上で行う非同期TCPサーバであるディスパッチャです。
バックログがキャッシュサイズを超えない限り、メッセージは通常Managed Ledgerのキャッシュから送信され、超えた場合にはBrokerがBookKeeperからエントリの読み出しを始めます。
グローバルトピックでのジオレプリケーションをサポートするためにBrokerはレプリケータを管理します。
このレプリケータはPulsarクライアントライブラリを利用し、ローカルクラスタ内で発行されるエントリをtailしリモートクラスタに再発行します。
Pulsarは、Apache BookKeeperを分散ログ先行書き込みシステムの永続ストレージとして使用しています。
BookKeeperでは、アプリケーションはLedgerと呼ばれる多くの独立したログを作成できます。
Ledgerは、単一のwriterが末尾への追加のみを行うデータ構造で、複数のストレージノード (またはBookieと呼びます) に割り当てられ、エントリはこれらのノードの複数にレプリケートされます。
Ledgerのセマンティクスはとてもシンプルです:プロセスはLedgerの作成、エントリの末尾への追加、Ledgerのクローズができます。明示的またはwriterのプロセスがクラッシュしてLedgerがクローズされた後、read-onlyモードのみでオープンできます。
そしてLedger内のエントリが不要になったら、システムからそのLedger自体を削除できます。
BookKeeperの主な強みは、障害発生時のLedgerの読み取りの一貫性を保証することです。
Ledgerは単一のプロセスによってのみ書き込み可能なので、プロセスは非常に効率よく自由にエントリを追加することができ (追加のコンセンサスが不要) 、障害後には、Ledgerは状態を確定しログに最後にコミットされたエントリを確定するリカバリープロセスを実行します。
その後、Ledgerの全てのreaderは全く同じ内容を参照することが保証されます。
BookKeeperはエントリのレプリケーション、ノード障害を処理するのに非常に効率的なシーケンシャルストアであり、加えて容量とスループットが水平にスケーラブルなため、Pulsarで採用しています。
運用上の観点からも、PulsarクラスタにBookieを追加するだけで容量を即座に増やすことができます。
BookKeeperの他の強みは、Bookieは並行読み込み/書き込みで何千ものLedgerを処理するようにデザインされている点と、複数ディスクデバイス (1つはJournal用、もう1つは通常のストレージ用) を利用することで、進行中の書き込み処理のレイテンシから読み込み処理の影響を分離できる点です。
BookKeeperのLedgerが単一のログ抽象化を提供するため、単一トピックのストレージ層を表すManaged Ledgerと呼ばれるLedgerの上位概念を表すライブラリが開発されました。
Managed Ledgerは、ストリームの最後に追加し続ける単一のwriterとストリームをconsumeしている複数カーソル (それぞれが独自の関連位置を保持します) を持つメッセージストリームの抽象化を表します。
内部的には、1つのManaged Ledgerはデータを保持するために複数のBookKeeperのLedgerを使います。
複数のLedgerを持つのには2つの理由があります:
1つ目は、障害後にLedgerはもはや書き込めないため新しいLedgerを作成する必要があるためで、
2つ目は、すべてのカーソルがLedgerに含まれるすべてのメッセージをconsumeした時にLedgerを削除する可能性があるため定期的にLedgerをロールオーバーしたいためです。
Pulsarはメタデータやクラスタの構成、 (コンポーネント間の) 連携のためにApache ZooKeeperを利用します。
トピック はメッセージを発行、購読するするための論理的なエンドポイントです。 Producerはトピックに対してメッセージを発行し、Consumerはトピックに対して購読します。 Pulsarではトピックに対する複数のサブスクリプションモードでpub/sub、ロードバランサー、フェイルオーバーのユースケースをサポートします。
通常のトピック (パーティションドトピックを除く) は明示的に作成される必要はなく、 トピックに対してメッセージをproduce/consumeしようとした際に作成されます。
サブスクリプションはConsumerが最初に接続した時に与えられたサブスクリプション名によって作成される永続性のあるリソースです。 サブスクリプションは作成された後、トピックに対して発行された全てのメッセージを受け取ります。 Consumerがこのサブスクリプションに1つも接続していない場合、発行された全てのメッセージはバックログに保持されます。 最終的に、トピックからサブスクリプションを削除するために、Consumerは購読解除できます。
サブスクリプションモードはメッセージをConsumerにどのようにメッセージを配送するかを決定する設定です。
Exclusive
Shared
Failover
プロパティとネームスペースはマルチテナントをサポートするためのPulsarのキーコンセプトです。
例えば、プロパティmy-property
のための、クラスタus-w
に存在している、アプリケーションmy-app1
が使用するネームスペース、my-property/us-w/my-app1
ついて考えてみます。 そのようなネームスペースに対するトピックネームは以下のようになります:
persistent://my-property/us-w/my-app1/my-topic-1 persistent://my-property/us-w/my-app1/my-topic-2 ...
Producerはトピックに接続し、メッセージを送信します。
同期送信 vs. 非同期送信 - メッセージは同期的あるいは非同期的にBrokerに送信できます:
圧縮 - 帯域を節約するため、メッセージを圧縮することが可能です。圧縮と解凍はどちらもクライアントで動作します。以下の圧縮形式をサポートしています:
バッチ - バッチ処理が可能な場合、Producerはメッセージを蓄積し、1つのリクエストでメッセージのバッチを送信しようとします。バッチサイズはメッセージの最大数と最大発行レイテンシで定義されます。
Consumerはサブスクリプションに接続し、メッセージを受け取ります。
同期受信 vs. 非同期受信 - 同期受信はメッセージが利用可能になるまでブロックされます。同期受信ではCompletableFutureインスタンスとしてすぐに返却されます。CompletableFutureインスタンスは新しいメッセージが利用可能になった時、受信して完了します。
確認応答 - メッセージは1つ1つ個別に、あるいは累積的に確認応答が返されます。累積的な確認応答をするConsumerは、最後に受け取ったメッセージの確認応答を返します。その場合、確認応答を返したメッセージまでのストリーム内の全てのメッセージはそのConsumerに再送されません。サブスクリプションモードがSharedの場合は、累積的な確認応答は使用できません。
リスナー - カスタマイズされたMessageListenerの実装をConsumerに渡すことができます。クライアントライブラリは新しいメッセージを受け取るとリスナーを呼び出します (Consumer Receiveを呼び出す必要はありません) 。
通常のトピックは最大スループットを制限する1つのBrokerのみから提供されます。パーティションドトピックは高いスループットを達成するために複数のBrokerにまたがる特殊なトピックです。パーティションドトピックは管理API/CLIを通じて明示的に作成される必要があります。トピックを作成する際、パーティションの数を記述できます。
パーティションドトピックは実際にはN (パーティションの数) 個の内部トピックとして実装されます。内部トピックと普通のトピックについて、サブスクリプションモードの挙動に違いはありません。
ルーティングモード - ルーティングモードはどのパーティション (内部トピック) にメッセージが発行されるかを決定します:
public interface MessageRouter extends Serializable { /** * @param msg メッセージオブジェクト * @return そのメッセージのために使用するパーティションのインデックス */ int choosePartition(Message msg); }
保証されたメッセージの配送はそれらが配信され、Consumerによる確認応答を受け取るまで頑健な方法で保存される必要があります。このメッセージングの方法は一般的にPersistent Messagingと呼ばれます。
メッセージの永続性はトピックレベルで設定されます。トピックは“persistent”にも“non-persistent”にもなりえます。トピック名は以下のようになります: persistent://my-property/global/my-ns/my-topic
persistent
non-persistent
Pulsarでは異なるジオロケーションでのメッセージのproduce, consumeが可能です。例えば、アプリケーションが1つのジオ/マーケットにデータを発行し、異なるジオ/マーケットでそれをconsumeし処理したいとします。Global Replicationがそれを可能にします。
TODO
PulsarはJava言語バインディングのクライアントAPIを公開しています。 クライアントAPIはクライアント-Broker間の通信プロトコルを最適化・カプセル化し、アプリケーションから使用するためのシンプルで直感的なAPIを公開しています。 その内部では、クライアントライブラリが透過的なBrokerとの再接続や接続フェイルオーバー、BrokerからAckが返されるまでのメッセージのキューイング、バックオフによる接続再試行のようなヒューリスティクスなどをサポートしています。
アプリケーションがProducer/Consumerを作成する際、Pulsarのクライアントライブラリは2つのステップから成るセットアップフェイズを内部的に開始します。 最初のタスクは、ルックアップのためのHTTPリクエストを送信してトピックのオーナーであるBrokerを見つける事です。 このリクエストはアクティブなBrokerの中の1台に届き、 (キャッシュされた) ZooKeeperのメタデータを参照する事でどのBrokerがトピックを提供しているかを知る事ができます。 どのBrokerもリクエストされたトピックを提供していない場合には、最も負荷の小さいBrokerにそのトピックが割り当てられます。
クライアントライブラリがBrokerのアドレスを取得すると、TCPコネクションを作成 (またはプールされた既存のコネクションを再利用) し、認証を行います。 このコネクションの中で、クライアントとBrokerは独自プロトコルのバイナリコマンドを交換します。 この時点で、クライアントはProducer/Consumerを作成するためのコマンドをBrokerに送信します。 なお、このコマンドは認可ポリシーを検証した後、それに従って行われます。
TCPコネクションが切断されると、即座にクライアントはこのセットアップフェイズを再実行し、成功するまで指数バックオフを試行し続けます。
PulsarClient (TODO: javadocs) インスタンスは、メッセージのproduce/consumeを行う前に必要となります。
ClientConfiguration config = new ClientConfiguration(); PulsarClient pulsarClient = PulsarClient.create("http://broker.example.com:8080", config); ... pulsarClient.close();
PulsarClientに引数を渡すためにClientConfiguration (TODO: javadocs) を使用できます:
// Pulsarクライアントのインスタンスの中で使用する認証プロバイダをセット public void setAuthentication(Authentication authentication); public void setAuthentication(String authPluginClassName, String authParamsString); public void setAuthentication(String authPluginClassName, Map<String, String> authParams); // オペレーションのタイムアウト値をセット (デフォルト:30秒) public void setOperationTimeout(int operationTimeout, TimeUnit unit); // Brokerとのコネクションをハンドリングするスレッド数をセット (デフォルト:1スレッド) public void setIoThreads(int numIoThreads); // メッセージリスナとして使用するスレッド数をセット (デフォルト:1スレッド) public void setListenerThreads(int numListenerThreads); // 1台のBrokerとの間にオープンするコネクション数の最大値をセット public void setConnectionsPerBroker(int connectionsPerBroker); // Nagleアルゴリズムを無効化するためにTCP no-delayフラグを使用するかどうかを設定 public void setUseTcpNoDelay(boolean useTcpNoDelay);
PulsarClientを使用してConsumer (TODO javadocs) を作成し、10のメッセージを受信します。
ConsumerConfiguration conf = new ConsumerConfiguration(); conf.setSubscriptionType(SubscriptionType.Exclusive); Consumer consumer = pulsarClient.subscribe( "persistent://my-property/us-w/my-ns/my-topic", "my-subscriber-name", conf); for (int i = 0; i < 10; i++) { // メッセージを受信 Msg msg = consumer.receive(); // 何らかの処理を実行 System.out.println("Received: " + new String(msg.getData())); // メッセージの処理に成功した事を通知するAck consumer.acknowledge(msg); } consumer.close();
Consumerに引数を渡すためにConsumerConfiguration (TODO javadocs) を使用できます:
// Ackが返されていないメッセージのタイムアウト値をセット (最も近いミリ秒の値になるように切り捨て) public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit); // トピックの購読時に使用するサブスクリプションタイプを選択 public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType); // ConsumerのMessageListenerをセット public ConsumerConfiguration setMessageListener(MessageListener messageListener); // Consumerの受信キューのサイズをセット public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize);
PulsarClientを使用してProducer (TODO javadocs) を作成し、10のメッセージを発行します。
ProducerConfiguration = config = new ProducerConfiguration(); Producer producer = pulsarClient.createProducer( "persistent://my-property/us-w/my-ns/my-topic", config); // 10のメッセージをトピックに発行 for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); } producer.close();
Producerに引数を渡すためにProducerConfiguration (TODO javadocs) を使用できます:
// 送信タイムアウト値をセット (デフォルト:30秒) public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit); // BrokerからAckを受信するためにペンディング状態になっているメッセージを保持するキューの最大サイズをセット public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages); // 送信メッセージキューがいっぱいになった時に Producer#send および Producer#sendAsync をブロックするかどうかをセット public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull); // Partitioned Producerのメッセージのルーティングモードをセット public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode); // メッセージの圧縮方式をセット public ProducerConfiguration setCompressionType(CompressionType compressionType); // MessageRouterの実装を渡してカスタマイズされたメッセージのルーティングポリシーをセット public ProducerConfiguration setMessageRouter(MessageRouter messageRouter); // メッセージの自動バッチ処理を有効にするかどうかを指定 (デフォルト:false) public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled); // 送信メッセージがバッチ処理される期間をセット (デフォルト:10ミリ秒) public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit); // 1回のバッチ処理で許可されるメッセージの最大数をセット (デフォルト:1000) public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch);