title: Pulsar's WebSocket API tags: [websocket, nodejs, python]

Pulsar's WebSocket API is meant to provide a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSockets you can publish and consume messages and use all the features available in the Java, Python, and C++ client libraries.

{% include admonition.html type=“success” content=“You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js below.” %}

Running the WebSocket service

The {% popover standalone %} variant of Pulsar that we recommend using for local development already has the WebSocket service enabled.

In non-standalone mode, there are two ways to deploy the WebSocket service:

Embedded with a Pulsar broker

In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the webSocketServiceEnabled parameter in the conf/broker.conf configuration file in your installation.

webSocketServiceEnabled=true

As a separate component

In this mode, the WebSocket service will be run from a Pulsar {% popover broker %} as a separate service. Configuration for this mode is handled in the conf/websocket.conf configuration file. You'll need to set at least the following parameters:

Here's an example:

globalZookeeperServers=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster

Starting the broker

When the configuration is set, you can start the service using the pulsar-daemon tool:

$ bin/pulsar-daemon start websocket

API Reference

Pulsar's WebSocket API offers three endpoints for producing messages, consuming messages and reading messages.

All exchanges via the WebSocket API use JSON.

Producer endpoint

The producer endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL:

{% endpoint ws://broker-service-url:8080/ws/producer/persistent/:property/:cluster/:namespace/:topic %}

Publishing a message

{
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "context": "1"
}
KeyTypeRequired?Explanation
payloadstringyesBase-64 encoded payload
propertieskey-value pairsnoApplication-defined properties
contextstringnoApplication-defined request identifier
keystringnoFor partitioned topics, decides which partition to use
replicationClustersarraynoRestrict replication to this list of {% popover clusters %}, specified by name
Example success response
{
   "result": "ok",
   "messageId": "CAAQAw==",
   "context": "1"
 }
Example failure response
 {
   "result": "send-error:3",
   "errorMsg": "Failed to de-serialize from JSON",
   "context": "1"
 }
KeyTypeRequired?Explanation
resultstringyesok if successful or an error message if unsuccessful
messageIdstringyesMessage ID assigned to the published message
contextstringnoApplication-defined request identifier

Consumer endpoint

The consumer endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %}, as well as a {% popover subscription %}, in the URL:

{% endpoint ws://broker-service-url:8080/ws/consumer/persistent/:property/:cluster/:namespace/:topic/:subscription %}

Receiving messages

Server will push messages on the WebSocket session:

{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785"
}
KeyTypeRequired?Explanation
messageIdstringyesMessage ID
payloadstringyesBase-64 encoded payload
publishTimestringyesPublish timestamp
propertieskey-value pairsnoApplication-defined properties
keystringnoOriginal routing key set by producer

Acknowledging the message

Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.

{
  "messageId": "CAAQAw=="
}
KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Reader endpoint

The reader endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL:

{% endpoint ws://broker-service-url:8080/ws/reader/persistent/:property/:cluster/:namespace/:topic %}

Receiving messages

Server will push messages on the WebSocket session:

{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785"
}
KeyTypeRequired?Explanation
messageIdstringyesMessage ID
payloadstringyesBase-64 encoded payload
publishTimestringyesPublish timestamp
propertieskey-value pairsnoApplication-defined properties
keystringnoOriginal routing key set by producer

Acknowledging the message

In WebSocket, Reader needs to acknowledge the successful processing of the message to have the Pulsar WebSocket service update the number of pending messages. If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.

{
  "messageId": "CAAQAw=="
}
KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Error codes

In case of error the server will close the WebSocket session using the following error codes:

Error CodeError Message
1Failed to create producer
2Failed to subscribe
3Failed to deserialize from JSON
4Failed to serialize to JSON
5Failed to authenticate client
6Client is not authorized
7Invalid payload encoding
8Unknown error

{% include admonition.html type=‘warning’ content=‘The application is responsible for re-establishing a new WebSocket session after a backoff period.’ %}

Client examples

Below you'll find code examples for the Pulsar WebSocket API in Python and Node.js.

Python

This example uses the websocket-client package. You can install it using pip:

$ pip install websocket-client

You can also download it from PyPI.

Python producer

Here's an example Python {% popover producer %} that sends a simple message to a Pulsar {% popover topic %}:

import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic'

ws = websocket.create_connection(TOPIC)

# Send one message as JSON
ws.send(json.dumps({
    'payload' : base64.b64encode('Hello World'),
    'properties': {
        'key1' : 'value1',
        'key2' : 'value2'
    },
    'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
    print 'Message published successfully'
else:
    print 'Failed to publish message:', response
ws.close()

Python consumer

Here's an example Python {% popover consumer %} that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives:

import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub'

ws = websocket.create_connection(TOPIC)

while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])

    # Acknowledge successful processing
    ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Python reader

Here's an example Python reader that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives:

import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/reader/persistent/sample/standalone/ns1/my-topic'

ws = websocket.create_connection(TOPIC)

while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])

    # Acknowledge successful processing
    ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Node.js

This example uses the ws package. You can install it using npm:

$ npm install ws

Node.js producer

Here's an example Node.js {% popover producer %} that sends a simple message to a Pulsar {% popover topic %}:

var WebSocket = require('ws'),
    topic = "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1",
    ws = new WebSocket(topic);

var message = {
  "payload" : new Buffer("Hello World").toString('base64'),
  "properties": {
    "key1" : "value1",
    "key2" : "value2"
  },
  "context" : "1"
};

ws.on('open', function() {
  // Send one message
  ws.send(JSON.stringify(message));
});

ws.on('message', function(message) {
  console.log('received ack: %s', message);
});

Node.js consumer

Here's an example Node.js {% popover consumer %} that listens on the same topic used by the producer above:

var WebSocket = require('ws'),
    topic = "ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub",
    ws = new WebSocket(topic);

ws.on('message', function(message) {
    var receiveMsg = JSON.parse(message);
    console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
    var ackMsg = {"messageId" : receiveMsg.messageId};
    ws.send(JSON.stringify(ackMsg));
});

NodeJS reader

var WebSocket = require('ws'),
    topic = "ws://localhost:8080/ws/reader/persistent/my-property/us-west/my-ns/my-topic1",
    ws = new WebSocket(topic);

ws.on('message', function(message) {
    var receiveMsg = JSON.parse(message);
    console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
    var ackMsg = {"messageId" : receiveMsg.messageId};
    ws.send(JSON.stringify(ackMsg));
});