Pulsar WebSocket API

This API is considered experimental and might change before it's finalized. We encourage to submit feedback on this WebSocket API.

Pulsar WebSocket API is meant to provide a simple way to interact with Pulsar from languages outside the JVM.

Through WebSocket you can publish and consume messages and use all the features available from the Java client library.

Running WebSocket service

Apart from the Pulsar standalone that already has the WebSocket service enabled, there are 2 possible ways to deploy the WebSocket service:

Embedded with Pulsar broker

Enable WebSocket in conf/broker.conf:

webSocketServiceEnabled=true

This will run the service within the same HTTP server that is already running in the broker.

WebSocket as a separate component

WebSocket service can be run as a separate component on its own.

Configuration is located at conf/websocket.conf and the minimal parameters to change are:

globalZookeeperServers=...

# Port to use to server HTTP request
webServicePort=8080

# Name of the pulsar cluster to connect to
clusterName=...

Then, to start the service:

$ bin/pulsar-daemon start websocket

API Reference

There are 2 WebSocket endpoint, for publishing and consuming messages and all exchanges are done through JSON messages.

Producer

Open a WebSocket session to create a producer for a specific topic:

http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}/{topic}
Publishing a message
{
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "context": "1"
}
KeyTypeRequirementExplanation
payloadStringRequiredBase-64 encoded payload
propertiesKey-Value pairsOptionalApplication defined properties
contextStringOptionalApplication defined request identifier
keyStringOptionalFor partitioned topics, decides the
partition to use
replicationClustersListOptionalRestrict replication to these clusters
Acknowledgement from server
Success response
{
   "result": "ok",
   "messageId": "CAAQAw==",
   "context": "1"
 }
Failure response
 {
   "result": "send-error:3",
   "errorMsg": "Failed to de-serialize from JSON",
   "context": "1"
 }
KeyTypeRequirementExplanation
resultStringRequiredok if successful or error message
messageIdStringRequiredMessage Id assigned to the published message
contextStringOptionalApplication defined request identifier

Consumer

Open a WebSocket session to create a producer for a specific topic:

http://{serviceUrl}:8080/ws/consumer/persistent/{property}/{cluster}/{namespace}/{topic}/{subscription}
Receiving messages

Server will push messages on the WebSocket session:

{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785",
  "context": "1"
}
KeyTypeRequirementExplanation
messageIdStringRequiredMessage Id
payloadStringRequiredBase-64 encoded payload
propertiesKey-Value pairsOptionalApplication defined properties
publishTimeStringRequiredPublish timestamp
contextStringOptionalApplication defined request identifier
keyStringOptionalOriginal routing key set by producer
Acknowledging the message

Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.

{
  "messageId": "CAAQAw=="
}
KeyTypeRequirementExplanation
messageIdStringRequiredMessage Id of the processed message

Error codes

In case of error the server will close the WebSocket session using the following error codes:

Different Possible Error Types

Error CodeError Message
1Failed to create producer
2Failed to subscribe
3Failed to de-serialize from JSON
4Failed to serialize to JSON
5Failed to authenticate client
6Client is not authorized
7Invalid payload encoding
8Unknown error

Application is responsible to re-establish a new WebSocket session after a backoff period.

Examples

Python

in this example you need to install websocket-client package , you can install it using pip install websocket-client or download it from Pypi page .

Python producer

import websocket, base64, json

ws = websocket.create_connection(
   'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic')

# Send one message
ws.send(json.dumps({
    'payload' : base64.b64encode('Hello World'),
    'properties': {
       'key1' : 'value1',
       'key2' : 'value2'
    },
    'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
    print 'Message published successfully'
else:
    print 'Failed to publish message:', response
ws.close()

Python consumer

import websocket, base64, json

ws = websocket.create_connection(
   'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub')

while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print 'Received: ', msg, ' - payload:', base64.b64decode(msg['payload'])

    # Acknowledge successful processing
    ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

NodeJS

NodeJS producer

var WebSocket = require('ws');
var ws = new WebSocket(
	  "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1");

var message = {
  "payload" : new Buffer("Hello World").toString('base64'),
  "properties": {
    "key1" : "value1",
    "key2" : "value2"
  },
  "context" : "1"
};

ws.on('open', function() {
  // Send one message
  ws.send(JSON.stringify(message));
});

ws.on('message', function(message) {
  console.log('received ack: %s', message);
});

NodeJS consumer

var WebSocket = require('ws');
var socket = new WebSocket(
	"ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub1")

socket.onmessage = function(pckt){
	var receiveMsg = JSON.parse(pckt.data);
	var ackMsg = {"messageId" : receiveMsg.messageId}
	socket.send(JSON.stringify(ackMsg));      
};