Pulsarはpub-subパラダイムに基づいて構築されたメッセージングシステムです。 トピックはProducerとConsumerを結びつける基本的なリソースです。
Producerはトピックに接続してメッセージを発行する事ができます。 Consumerはトピックを購読してメッセージを受信する事ができます。
一度サブスクリプションが作成されると、たとえConsumerが切断された場合でも、 Consumerから処理の成功を通知する**Ack (確認応答) **が返されるまで全てのメッセージは システムによって保持されます。
トピック名は次のようになります:
persistent://my-property/us-west/my-namespace/my-topic
トピック名の構造は、Pulsarのマルチテナント性に関連しています。 この例では:
persistent
→ 全てのメッセージが複数のディスクに永続化されるトピックである事を示します。 これは現時点でサポートされている唯一のトピック形式です。my-property
→ プロパティはPulsarインスタンスにおけるテナントを示す識別子です。us-west
→ トピックが存在するクラスタです。 典型的には、地理的地域やデータセンタごとにクラスタが存在する事になります。my-namespace
→ ネームスペースは管理単位であり、関連するトピックのグループを表します。 ほとんどの設定はネームスペースレベルで行われます。各プロパティは複数のネームスペースを持つ事が可能です。my-topic
→ トピック名の最後の部分です。この部分は自由形式であり、システム上の特別な意味は持ちません。各トピックは複数のサブスクリプションを持つ事ができます。 それぞれのサブスクリプションは異なる名前を持ち、またサブスクリプションごとに異なるタイプを指定できます:
詳しい説明はアーキテクチャのページを参照してください。
次の場所から最新のバイナリリリースをダウンロードしてください。
https://github.com/apache/incubator-pulsar/releases/latest
$ tar xvfz pulsar-X.Y-bin.tar.gz $ cd pulsar-X.Y
アプリケーション開発や実際に稼働するサービスの迅速なセットアップのために、 Pulsarのスタンドアローンモードを使用する事ができます。 このモードでは、Broker, ZooKeeper, BookKeeperの3コンポーネントを 単一のJVMプロセスで起動します。
$ bin/pulsar standalone
Pulsarのサービスはすぐに利用可能となり、http://localhost:8080/
をサービスのURLとしてクライアントに使用させる事ができます。
サンプルのネームスペース sample/standalone/ns1
が既に利用可能な状態になっています。
Pulsarクライアントライブラリの依存関係をインクルードしてください。
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency>
PulsarClient client = PulsarClient.create("http://localhost:8080"); Consumer consumer = client.subscribe( "persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name"); while (true) { // 1つのメッセージを待ち受け Message msg = consumer.receive(); System.out.println("Received message: " + msg.getData()); // Brokerがメッセージを削除できるようにするためのAck consumer.acknowledge(msg); } client.close();
PulsarClient client = PulsarClient.create("http://localhost:8080"); Producer producer = client.createProducer( "persistent://sample/standalone/ns1/my-topic"); // 10のメッセージをトピックに発行 for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); } client.close();