PulsarのJavaクライアントでは{% popover_ja Producer %}と{% popover_ja Consumer %}の両方が利用可能であるだけでなく、管理タスクも実行可能です。
Javaクライアントの現在のバージョンは {{ site.current_version }} です。
PulsarクライアントのJavadocはパッケージ名によって2つに分割されています:
パッケージ | 説明 |
---|---|
org.apache.pulsar.client.api | {% popover_ja Producer %}と{% popover_ja Consumer %}のAPI |
org.apache.pulsar.client.admin | Java admin API |
このドキュメントはPulsarの{% popover_ja トピック %}に対して、メッセージのproduceとconsumeを行うクライアントAPIのみに焦点を当てています。Java admin APIについては、Pulsar admin APIをご確認ください。
最新バージョンのPulsar Javaクライアントライブラリは[Maven Central](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C{{ site.current_version }}%7Cjar)から利用可能です。 最新のバージョンを使うために、ビルド設定にpulsar-client
ライブラリを追加してください。
Mavenを利用している場合は、以下をpom.xml
に記述してください:
<!-- <properties>ブロック内 --> <pulsar.version>{{ site.current_version }}</pulsar.version> <!-- <dependencies>ブロック内 --> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency>
Gradleを利用している場合は、build.gradle
ファイルに以下を記述してください:
def pulsarVersion = '{{ site.current_version }}' dependencies { compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion }
{% include explanations/ja/client-url.md %}
以下のように、{% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %}オブジェクトを接続先のPulsar{% popover_ja クラスタ %}のURLのみを用いて生成できます:
String pulsarBrokerRootUrl = "pulsar://localhost:6650"; PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
このPulsarClient
オブジェクトはデフォルトの設定を使用します。デフォルト以外の設定を適用する方法はJavadocの{% javadoc ClientConfiguration client org.apache.pulsar.client.api.ClientConfiguration %}をご確認ください。
{% include admonition.html type=“info” content=" クライアントレベルの設定に加えて、以下のセクションで説明するProducerあるいはConsumer固有の設定も可能です。 " %}
Pulsarでは{% popover_ja Producer %}は{% popover_ja メッセージ %}を{% popover_ja トピック %}に書き込みます。Producerのオブジェクトを生成するため、まずは{% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %}オブジェクトをPulsar {% popover_ja Broker %}のURLを用いて生成します。
String pulsarBrokerRootUrl = "pulsar://localhost:6650"; PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
{% include admonition.html type=‘info’ title=‘スタンドアローンクラスタのデフォルトのBroker URL’ content=" Pulsarをスタンドアローンモードで起動している場合、デフォルトではpulsar://localhost:6650
というURLでBrokerが利用できます。" %}
{% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %}オブジェクトを生成したら、{% popover_ja トピック %}に対して{% javadoc Producer client org.apache.pulsar.client.api.Producer %}オブジェクトを生成できます。
String topic = "persistent://sample/standalone/ns1/my-topic"; Producer producer = client.createProducer(topic);
指定したBroker, トピックに対してメッセージを送信できます。
// トピックに対して10個のメッセージを発行 for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); }
{% include admonition.html type=‘warning’ content=" Producer, Consumer, クライアントはそれらが必要ではなくなった時にクローズしてください:
producer.close(); consumer.close(); client.close();
クローズ処理は非同期でも可能です:
producer.asyncClose(); consumer.asyncClose(); clioent.asyncClose();
" %}
上記の例のように、Producer
オブジェクトをトピック名のみで生成すると、Producerはデフォルトの設定を利用します。デフォルト以外の設定を利用したい場合は、Producer
を{% javadoc ProducerConfiguration client org.apache.pulsar.client.api.ProducerConfiguration %}を用いて生成してください。
以下は設定の例です:
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl); ProducerConfiguration config = new ProducerConfiguration(); config.setBatchingEnabled(true); config.setSendTimeout(10, TimeUnit.SECONDS); Producer producer = client.createProducer(topic, config);
{% popover_ja パーティションドトピック %}を使用している場合は、{% popover_ja Producer %}を用いたメッセージの発行時のルーティングモードを指定できます。Javaクライアントを使用したルーティングモードの指定方法の詳細は、パーティションドトピックのドキュメントを御覧ください。
Javaクライアントを使ってメッセージを非同期で発行できます。非同期送信では、Producerはメッセージをブロッキングキューに入れ、制御を戻します。クライアントライブラリはバックグラウンドで{% popover_ja Broker %}に送信します。キューが最大量 (設定可能) に達した場合、Producerは送信APIを呼び出した時、Producerに渡される引数に応じてブロックされる、あるいは失敗する可能性があります。
非同期送信処理の実装例は以下のようになります:
CompletableFuture<MessageId> future = producer.sendAsync("my-async-message".getBytes());
非同期送信処理では、CompletableFuture
によってラップされた{% javadoc MessageId client org.apache.pulsar.client.api.MessageId %}がリターンされます。
Pulsarでは{% popover_ja Consumer %}は{% popover_ja トピック %}を購読し、{% popover_ja Producer %}がトピックに発行した{% popover_ja メッセージ %}を処理します。Consumerのオブジェクトを生成するため、まずは{% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %}のオブジェクトをPulsar {% popover_ja Broker %}のURLを用いて生成します (上記のProducerの例のようにclient
オブジェクトを使用) 。
{% javadoc PulsarClient client org.apache.pulsar.client.api.PulsarClient %}オブジェクトを生成したら、{% popover_ja トピック %}に対して{% javadoc Consumer client org.apache.pulsar.client.api.Consumer %}オブジェクトを生成できます。{% popover_ja サブスクリプション %}名の指定も必要です。
String topic = "persistent://sample/standalone/ns1/my-topic"; // from above String subscription = "my-subscription"; Consumer consumer = client.subscribe(topic, subscription);
トピック上のメッセージを取得するためにreceive
メソッドを使用できます。このwhile
ループはpersistent://sample/standalone/ns1/my-topic
トピックに対する長期のリスナーです。メッセージを受信するとその内容を出力し、メッセージが処理された後{% popover_ja Ack %} (確認応答) を送信します:
while (true) { // メッセージを待ち受ける Message msg = consumer.receive(); System.out.println("Received message: " + msg.getData()); // Brokerがメッセージを削除できるようにAckを送信する consumer.acknowledge(msg); }
上記の例のように、Consumer
オブジェクトをトピック名とサブスクリプション名のみで生成すると、Consumerはデフォルトの設定を利用します。デフォルト以外の設定を利用したい場合は、Consumer
を{% javadoc ConsumerConfiguration client org.apache.pulsar.client.api.ConsumerConfiguration %}を用いて生成してください。
以下は設定の例です:
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl); ConsumerConfiguration config = new ConsumerConfiguration(); config.setSubscriptionType(SubscriptionType.Shared); config.setReceiverQueueSize(10); Consumer consumer = client.createConsumer(topic, config);
receive
メソッドはメッセージを同期的 (メッセージが利用できるようになるまでConsumerプロセスがブロックされる) に受信します。非同期受信も利用可能です。このメソッドはCompletableFuture
オブジェクトとしてすぐにリターンします。CompletableFutureオブジェクトは新しいメッセージが利用可能になった時、受信して完了します。
以下は実装例です:
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
非同期受信ではCompletableFuture
でラップした{% javadoc Message client org.apache.pulsar.client.api.Message %}がリターンされます。
Pulsarは現在、TLSとAthenzの2つの認証スキームをサポートしています。Pulsar Javaクライアントでは両方が利用可能です。
TLSを利用するために、setUseTls
メソッドでtrue
をセットし、TLS通信を有効にする必要があります。また、CAの証明書、クライアントの証明書、秘密鍵のパスを指定する必要があります。
以下は設定の例です:
ClientConfiguration conf = new ClientConfiguration(); conf.setUseTls(true); conf.setTlsTrustCertsFilePath("/path/to/cacert.pem"); Map<String, String> authParams = new HashMap<>(); authParams.put("tlsCertFile", "/path/to/client-cert.pem"); authParams.put("tlsKeyFile", "/path/to/client-key.pem"); conf.setAuthentication(AuthenticationTls.class.getName(), authParams); PulsarClient client = PulsarClient.create( "pulsar+ssl://my-broker.com:6651", conf);
Athenzを利用するために、TLS通信を有効にし、Map
として以下の4つのパラメータを与える必要があります:
tenantDomain
tenantService
providerDomain
privateKeyPath
keyId
というパラメータも任意で設定可能です。
以下は設定の例です:
ClientConfiguration conf = new ClientConfiguration(); // TLSを有効に conf.setUseTls(true); conf.setTlsTrustCertsFilePath("/path/to/cacert.pem"); // Athenz認証プラグインのパラメータをセット Map<String, String> authParams = new HashMap<>(); authParams.put("tenantDomain", "shopping"); // テナントドメイン名 authParams.put("tenantService", "some_app"); // テナントサービス名 authParams.put("providerDomain", "pulsar"); // プロバイダドメイン名 authParams.put("privateKeyPath", "/path/to/private.pem"); // テナントの秘密鍵のパス authParams.put("keyId", "v1"); // テナントの秘密鍵のID (任意, デフォルト: "0") conf.setAuthentication(AuthenticationAthenz.class.getName(), authParams); PulsarClient client = PulsarClient.create( "pulsar+ssl://my-broker.com:6651", conf);