PulsarにおけるWebSocket APIは公式のクライアントライブラリのない言語からもPulsarとメッセージをやり取りする簡単な方法を提供することを意味します。WebSocketを通してメッセージの送受信とJava, Python, C++クライアントライブラリから利用可能な全機能を利用できます。
{% include admonition.html type=“success” content=“PulsarのWebSocket APIはどのWebSocketクライアントライブラリからでも利用できます。PythonとNode.jsのサンプルはこちらにあります。” %}
ローカルでの開発で推奨しているPulsarの{% popover_ja スタンドアローン %}モードではすでにWebSocketサービスは利用可能な状態です。
スタンドアローン以外のモードではWebSocketサービスをデプロイする方法が2つあります:
このモードでは、WebSocketサービスはBrokerですでに起動されている同じHTTPサービス内に起動します。このモードを有効にするには、 conf/broker.conf
ファイルにwebSocketServiceEnabled
パラメータをセットします。
webSocketServiceEnabled=true
このモードでは、Pulsar {% popover_ja Broker %}とは別サービスとして起動されます。このモードの設定はconf/websocket.conf
ファイルでハンドリングされます。少なくとも以下のパラメータを設定する必要があります:
こちらは設定例です:
globalZookeeperServers=zk1:2181,zk2:2181,zk3:2181 webServicePort=8080 clusterName=my-cluster
設定を行なったら、pulsar-daemon
ツールを利用してサービスを起動できます:
$ bin/pulsar-daemon start websocket
PulsarのWebSocket APIは2つのエンドポイントを提供します。メッセージをproduceするためのエンドポイントとconsumeするためのエンドポイントです。
WebSocket APIを介した全てのやり取りはJSONで行われます。
ProducerエンドポイントはURLで{% popover_ja プロパティ %}、{% popover_ja クラスタ %}、{% popover_ja ネームスペース %}、{% popover_ja トピック %}を指定します:
{% endpoint ws://broker-service-url:8080/ws/producer/persistent/:property/:cluster/:namespace/:topic %}
{ "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "context": "1" }
キー | 型 | 必須? | 説明 |
---|---|---|---|
payload | string | 必須 | Base-64エンコードされたペイロード |
properties | key-value pairs | 任意 | アプリケーションによって定義されたプロパティ |
context | string | 任意 | アプリケーションによって定義されたリクエスト識別子 |
key | string | 任意 | パーティションドトピックの場合、使用するパーティション |
replicationClusters | array | 任意 | レプリケーションを行う{% popover_ja クラスタ %}をここで指定したものだけに制限 |
{ "result": "ok", "messageId": "CAAQAw==", "context": "1" }
{ "result": "send-error:3", "errorMsg": "Failed to de-serialize from JSON", "context": "1" }
キー | 型 | 必須? | 説明 |
---|---|---|---|
result | string | 必須 | 成功時はok 、失敗時はエラーメッセージ |
messageId | string | 必須 | produceされたメッセージに割り当てられたメッセージID |
context | string | 任意 | アプリケーションによって定義されたリクエスト識別子 |
Consumerエンドポイントは{% popover_ja プロパティ %}、{% popover_ja クラスタ %}、{% popover_ja ネームスペース %}、{% popover_ja トピック %}と同様に{% popover_ja サブスクリプション %}もURLで指定します:
{% endpoint ws://broker-service-url:8080/ws/consumer/persistent/:property/:cluster/:namespace/:topic/:subscription %}
WebSocketセッション上でサーバからメッセージがプッシュされます:
{ "messageId": "CAAQAw==", "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "publishTime": "2016-08-30 16:45:57.785" }
キー | 型 | 必須? | 説明 |
---|---|---|---|
messageId | string | 必須 | メッセージID |
payload | string | 必須 | Base-64エンコードされたペイロード |
publishTime | string | 必須 | メッセージがproduceされた時間のタイムスタンプ |
properties | key-value pairs | 任意 | アプリケーションによって定義されたプロパティ |
key | string | 任意 | Producerによってセットされたオリジナルのルーティングキー |
Pulsar Brokerがメッセージを削除できるように、Consumerはメッセージが正常に処理できたというAckを返す必要があります。
{ "messageId": "CAAQAw==" }
キー | 型 | 必須? | 説明 |
---|---|---|---|
messageId | string | 必須 | 処理したメッセージのメッセージID |
エラーの場合、サーバは次のエラーコードを利用してWebSocketセッションをクローズします:
エラーコード | エラーメッセージ |
---|---|
1 | Producerの作成に失敗 |
2 | 購読に失敗 |
3 | JSONからのデシリアライズに失敗 |
4 | JSONへのシリアライズに失敗 |
5 | クライアントの認証に失敗 |
6 | クライアントが認可されていない |
7 | ペイロードの誤ったエンコード |
8 | 未知のエラー |
{% include admonition.html type=‘warning’ content=‘アプリケーションは、バックオフ期間後にWebSocketセッションを再確立する責任があります。’ %}
この例ではwebsocket-client
パッケージを利用します。pipを利用してインストールできます:
$ pip install websocket-client
PyPIから直接ダウンロードすることもできます。
Pulsarの{% popover_ja トピック %}に簡単なメッセージを送信するPython {% popover_ja Producer %}の実装例です。
import websocket, base64, json TOPIC = 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic' ws = websocket.create_connection(TOPIC) # Send one message as JSON ws.send(json.dumps({ 'payload' : base64.b64encode('Hello World'), 'properties': { 'key1' : 'value1', 'key2' : 'value2' }, 'context' : 5 })) response = json.loads(ws.recv()) if response['result'] == 'ok': print 'Message published successfully' else: print 'Failed to publish message:', response ws.close()
Pulsar {% popover_ja トピック %}をリッスンしてメッセージを受信するたびにそのメッセージIDを表示するPython {% popover_ja Consumer %}の実装例です:
import websocket, base64, json TOPIC = 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub' ws = websocket.create_connection(TOPIC) while True: msg = json.loads(ws.recv()) if not msg: break print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])) # 正常に処理できた事を応答します ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
この例では ws
パッケージを利用します。npmを利用してインストールできます:
$ npm install ws
Pulsarの{% popover_ja トピック %}に簡単なメッセージを送信するNode.jsの実装例です。
var WebSocket = require('ws'), topic = "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1", ws = new WebSocket(topic); var message = { "payload" : new Buffer("Hello World").toString('base64'), "properties": { "key1" : "value1", "key2" : "value2" }, "context" : "1" }; ws.on('open', function() { // 1つのメッセージを送信します ws.send(JSON.stringify(message)); }); ws.on('message', function(message) { console.log('received ack: %s', message); });
var WebSocket = require('ws'), topic = "ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub", ws = new WebSocket(topic); socket.onmessage = function(packet) { var receiveMsg = JSON.parse(packet.data); var ackMsg = {"messageId" : receiveMsg.messageId}; socket.send(JSON.stringify(ackMsg)); };