This API is considered experimental and might change before it's finalized. We encourage to submit feedback on this WebSocket API.
Pulsar WebSocket API is meant to provide a simple way to interact with Pulsar from languages outside the JVM.
Through WebSocket you can publish and consume messages and use all the features available from the Java client library.
Apart from the Pulsar standalone that already has the WebSocket service enabled, there are 2 possible ways to deploy the WebSocket service:
Enable WebSocket in conf/broker.conf
:
webSocketServiceEnabled=true
This will run the service within the same HTTP server that is already running in the broker.
WebSocket service can be run as a separate component on its own.
Configuration is located at conf/websocket.conf
and the minimal parameters to change are:
globalZookeeperServers=... # Port to use to server HTTP request webServicePort=8080 # Name of the pulsar cluster to connect to clusterName=...
Then, to start the service:
$ bin/pulsar-daemon start websocket
There are 2 WebSocket endpoint, for publishing and consuming messages and all exchanges are done through JSON messages.
Open a WebSocket session to create a producer for a specific topic:
http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}/{topic}
{ "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "context": "1" }
Key | Type | Requirement | Explanation |
---|---|---|---|
payload | String | Required | Base-64 encoded payload |
properties | Key-Value pairs | Optional | Application defined properties |
context | String | Optional | Application defined request identifier |
key | String | Optional | For partitioned topics, decides the partition to use |
replicationClusters | List | Optional | Restrict replication to these clusters |
{ "result": "ok", "messageId": "CAAQAw==", "context": "1" }
Key | Type | Requirement | Explanation |
---|---|---|---|
result | String | Required | ok if successful or error message |
messageId | String | Required | Message Id assigned to the published message |
context | String | Optional | Application defined request identifier |
Open a WebSocket session to create a producer for a specific topic:
http://{serviceUrl}:8080/ws/consumer/persistent/{property}/{cluster}/{namespace}/{topic}/{subscription}
Server will push messages on the WebSocket session:
{ "messageId": "CAAQAw==", "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "publishTime": "2016-08-30 16:45:57.785", "context": "1" }
Key | Type | Requirement | Explanation |
---|---|---|---|
messageId | String | Required | Message Id |
payload | String | Required | Base-64 encoded payload |
properties | Key-Value pairs | Optional | Application defined properties |
publishTime | String | Required | Publish timestamp |
context | String | Optional | Application defined request identifier |
key | String | Optional | Original routing key set by producer |
Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.
{ "messageId": "CAAQAw==" }
Key | Type | Requirement | Explanation |
---|---|---|---|
messageId | String | Required | Message Id of the processed message |
In case of error the server will close the WebSocket session using the following error codes:
Error Code | Error Message |
---|---|
1 | Failed to create producer |
2 | Failed to subscribe |
3 | Failed to de-serialize from JSON |
4 | Failed to serialize to JSON |
5 | Failed to authenticate client |
6 | Client is not authorized |
Application is responsible to re-establish a new WebSocket session after a backoff period.
in this example you need to install websocket-client
package , you can install it using pip install websocket-client
or download it from Pypi page .
import websocket, base64, json ws = websocket.create_connection( 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic') # Send one message ws.send(json.dumps({ 'payload' : base64.b64encode('Hello World'), 'properties': { 'key1' : 'value1', 'key2' : 'value2' }, 'context' : 5 })) response = json.loads(ws.recv()) if response['result'] == 'ok': print 'Message published successfully' else: print 'Failed to publish message:', response ws.close()
import websocket, base64, json ws = websocket.create_connection( 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub') while True: msg = json.loads(ws.recv()) if not msg: break print 'Received: ', msg, ' - payload:', base64.b64decode(msg['payload']) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
var WebSocket = require('ws'); var ws = new WebSocket( "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1"); var message = { "payload" : new Buffer("Hello World").toString('base64'), "properties": { "key1" : "value1", "key2" : "value2" }, "context" : "1" }; ws.on('open', function() { // Send one message ws.send(JSON.stringify(message)); }); ws.on('message', function(message) { console.log('received ack: %s', message); });
var WebSocket = require('ws'); var socket = new WebSocket( "ws://localhost:6080/pubilsh/persistent/my-property/us-west/my-ns/my-topic1/my-sub-1") socket.onmessage = function(pckt){ var receiveMsg = pckt.data; var ackMsg = {"messageId" : receiveMsg.messageId} socket.send(JSON.stringify(ackMsg)); };