title: PulsarにおけるWebSocket API tags_ja: [websocket, nodejs, python]

PulsarにおけるWebSocket APIは公式のクライアントライブラリのない言語からもPulsarとメッセージをやり取りする簡単な方法を提供することを意味します。WebSocketを通してメッセージの送受信とJava, Python, C++クライアントライブラリから利用可能な全機能を利用できます。

{% include admonition.html type=“success” content=“PulsarのWebSocket APIはどのWebSocketクライアントライブラリからでも利用できます。PythonとNode.jsのサンプルはこちらにあります。” %}

WebSocketサービスの起動

ローカルでの開発で推奨しているPulsarの{% popover_ja スタンドアローン %}モードではすでにWebSocketサービスは利用可能な状態です。

スタンドアローン以外のモードではWebSocketサービスをデプロイする方法が2つあります:

Pulsar Brokerに組み込む

このモードでは、WebSocketサービスはBrokerですでに起動されている同じHTTPサービス内に起動します。このモードを有効にするには、 conf/broker.confファイルにwebSocketServiceEnabledパラメータをセットします。

webSocketServiceEnabled=true

独立したコンポーネントとして起動

このモードでは、Pulsar {% popover_ja Broker %}とは別サービスとして起動されます。このモードの設定はconf/websocket.confファイルでハンドリングされます。少なくとも以下のパラメータを設定する必要があります:

こちらは設定例です:

globalZookeeperServers=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster

WebSocketサービスの起動

設定を行なったら、pulsar-daemonツールを利用してサービスを起動できます:

$ bin/pulsar-daemon start websocket

APIリファレンス

PulsarのWebSocket APIは2つのエンドポイントを提供します。メッセージをproduceするためのエンドポイントとconsumeするためのエンドポイントです。

WebSocket APIを介した全てのやり取りはJSONで行われます。

Producerエンドポイント

ProducerエンドポイントはURLで{% popover_ja プロパティ %}、{% popover_ja クラスタ %}、{% popover_ja ネームスペース %}、{% popover_ja トピック %}を指定します:

{% endpoint ws://broker-service-url:8080/ws/producer/persistent/:property/:cluster/:namespace/:topic %}

メッセージの送信

{
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "context": "1"
}
キー必須?説明
payloadstring必須Base-64エンコードされたペイロード
propertieskey-value pairs任意アプリケーションによって定義されたプロパティ
contextstring任意アプリケーションによって定義されたリクエスト識別子
keystring任意パーティションドトピックの場合、使用するパーティション
replicationClustersarray任意レプリケーションを行う{% popover_ja クラスタ %}をここで指定したものだけに制限
成功時のレスポンス例
{
   "result": "ok",
   "messageId": "CAAQAw==",
   "context": "1"
 }
失敗時のレスポンス例
 {
   "result": "send-error:3",
   "errorMsg": "Failed to de-serialize from JSON",
   "context": "1"
 }
キー必須?説明
resultstring必須成功時はok、失敗時はエラーメッセージ
messageIdstring必須produceされたメッセージに割り当てられたメッセージID
contextstring任意アプリケーションによって定義されたリクエスト識別子

Consumerエンドポイント

Consumerエンドポイントは{% popover_ja プロパティ %}、{% popover_ja クラスタ %}、{% popover_ja ネームスペース %}、{% popover_ja トピック %}と同様に{% popover_ja サブスクリプション %}もURLで指定します:

{% endpoint ws://broker-service-url:8080/ws/consumer/persistent/:property/:cluster/:namespace/:topic/:subscription %}

メッセージの受信

WebSocketセッション上でサーバからメッセージがプッシュされます:

{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785"
}
キー必須?説明
messageIdstring必須メッセージID
payloadstring必須Base-64エンコードされたペイロード
publishTimestring必須メッセージがproduceされた時間のタイムスタンプ
propertieskey-value pairs任意アプリケーションによって定義されたプロパティ
keystring任意Producerによってセットされたオリジナルのルーティングキー

Ack (確認応答)

Pulsar Brokerがメッセージを削除できるように、Consumerはメッセージが正常に処理できたというAckを返す必要があります。

{
  "messageId": "CAAQAw=="
}
キー必須?説明
messageIdstring必須処理したメッセージのメッセージID

エラーコード

エラーの場合、サーバは次のエラーコードを利用してWebSocketセッションをクローズします:

エラーコードエラーメッセージ
1Producerの作成に失敗
2購読に失敗
3JSONからのデシリアライズに失敗
4JSONへのシリアライズに失敗
5クライアントの認証に失敗
6クライアントが認可されていない
7ペイロードの誤ったエンコード
8未知のエラー

{% include admonition.html type=‘warning’ content=‘アプリケーションは、バックオフ期間後にWebSocketセッションを再確立する責任があります。’ %}

クライアントの実装例

以下はPythonNode.jsのサンプルコードです。

Python

この例ではwebsocket-clientパッケージを利用します。pipを利用してインストールできます:

$ pip install websocket-client

PyPIから直接ダウンロードすることもできます。

Python Producer

Pulsarの{% popover_ja トピック %}に簡単なメッセージを送信するPython {% popover_ja Producer %}の実装例です。

import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic'

ws = websocket.create_connection(TOPIC)

# Send one message as JSON
ws.send(json.dumps({
    'payload' : base64.b64encode('Hello World'),
    'properties': {
        'key1' : 'value1',
        'key2' : 'value2'
    },
    'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
    print 'Message published successfully'
else:
    print 'Failed to publish message:', response
ws.close()

Python Consumer

Pulsar {% popover_ja トピック %}をリッスンしてメッセージを受信するたびにそのメッセージIDを表示するPython {% popover_ja Consumer %}の実装例です:

import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub'

ws = websocket.create_connection(TOPIC)

while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))

    # 正常に処理できた事を応答します
    ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Node.js

この例では wsパッケージを利用します。npmを利用してインストールできます:

$ npm install ws

Node.js Producer

Pulsarの{% popover_ja トピック %}に簡単なメッセージを送信するNode.jsの実装例です。

var WebSocket = require('ws'),
    topic = "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1",
    ws = new WebSocket(topic);

var message = {
  "payload" : new Buffer("Hello World").toString('base64'),
  "properties": {
    "key1" : "value1",
    "key2" : "value2"
  },
  "context" : "1"
};

ws.on('open', function() {
  // 1つのメッセージを送信します
  ws.send(JSON.stringify(message));
});

ws.on('message', function(message) {
  console.log('received ack: %s', message);
});

NodeJS Consumer

var WebSocket = require('ws'),
    topic = "ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub",
    ws = new WebSocket(topic);

socket.onmessage = function(packet) {
	var receiveMsg = JSON.parse(packet.data);
	var ackMsg = {"messageId" : receiveMsg.messageId};
	socket.send(JSON.stringify(ackMsg));      
};