Pulsar's WebSocket API is meant to provide a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSockets you can publish and consume messages and use all the features available in the Java, Python, and C++ client libraries.
{% include admonition.html type=“success” content=“You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js below.” %}
The {% popover standalone %} variant of Pulsar that we recommend using for local development already has the WebSocket service enabled.
In non-standalone mode, there are two ways to deploy the WebSocket service:
In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the webSocketServiceEnabled
parameter in the conf/broker.conf
configuration file in your installation.
webSocketServiceEnabled=true
In this mode, the WebSocket service will be run from a Pulsar {% popover broker %} as a separate service. Configuration for this mode is handled in the conf/websocket.conf
configuration file. You'll need to set at least the following parameters:
Here's an example:
globalZookeeperServers=zk1:2181,zk2:2181,zk3:2181 webServicePort=8080 clusterName=my-cluster
When the configuration is set, you can start the service using the pulsar-daemon
tool:
$ bin/pulsar-daemon start websocket
Pulsar's WebSocket API offers three endpoints for producing messages, consuming messages and reading messages.
All exchanges via the WebSocket API use JSON.
The producer endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL:
{% endpoint ws://broker-service-url:8080/ws/producer/persistent/:property/:cluster/:namespace/:topic %}
Key | Type | Required? | Explanation |
---|---|---|---|
sendTimeoutMillis | long | no | Send timeout (default: 30 secs) |
batchingEnabled | boolean | no | Enable batching of messages (default: false) |
batchingMaxMessages | int | no | Maximum number of messages permitted in a batch (default: 1000) |
maxPendingMessages | int | no | Set the max size of the internal-queue holding the messages (default: 1000) |
batchingMaxPublishDelay | long | no | Time period within which the messages will be batched (default: 10ms) |
messageRoutingMode | string | no | Message routing mode for the partitioned producer: SinglePartition/RoundRobinPartition |
compressionType | string | no | Compression type: LZ4/ZLIB |
{ "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "context": "1" }
Key | Type | Required? | Explanation |
---|---|---|---|
payload | string | yes | Base-64 encoded payload |
properties | key-value pairs | no | Application-defined properties |
context | string | no | Application-defined request identifier |
key | string | no | For partitioned topics, decides which partition to use |
replicationClusters | array | no | Restrict replication to this list of {% popover clusters %}, specified by name |
{ "result": "ok", "messageId": "CAAQAw==", "context": "1" }
{ "result": "send-error:3", "errorMsg": "Failed to de-serialize from JSON", "context": "1" }
Key | Type | Required? | Explanation |
---|---|---|---|
result | string | yes | ok if successful or an error message if unsuccessful |
messageId | string | yes | Message ID assigned to the published message |
context | string | no | Application-defined request identifier |
The consumer endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %}, as well as a {% popover subscription %}, in the URL:
{% endpoint ws://broker-service-url:8080/ws/consumer/persistent/:property/:cluster/:namespace/:topic/:subscription %}
Key | Type | Required? | Explanation |
---|---|---|---|
ackTimeoutMillis | long | no | Set the timeout for unacked messages (default: 0) |
subscriptionType | string | no | Subscription type: Exclusive/Failover/Shared |
receiverQueueSize | int | no | Size of the consumer receive queue (default: 1000) |
consumerName | string | no | Consumer name |
Server will push messages on the WebSocket session:
{ "messageId": "CAAQAw==", "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "publishTime": "2016-08-30 16:45:57.785" }
Key | Type | Required? | Explanation |
---|---|---|---|
messageId | string | yes | Message ID |
payload | string | yes | Base-64 encoded payload |
publishTime | string | yes | Publish timestamp |
properties | key-value pairs | no | Application-defined properties |
key | string | no | Original routing key set by producer |
Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.
{ "messageId": "CAAQAw==" }
Key | Type | Required? | Explanation |
---|---|---|---|
messageId | string | yes | Message ID of the processed message |
The reader endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL:
{% endpoint ws://broker-service-url:8080/ws/reader/persistent/:property/:cluster/:namespace/:topic %}
Key | Type | Required? | Explanation |
---|---|---|---|
readerName | string | no | Reader name |
receiverQueueSize | int | no | Size of the consumer receive queue (default: 1000) |
Server will push messages on the WebSocket session:
{ "messageId": "CAAQAw==", "payload": "SGVsbG8gV29ybGQ=", "properties": {"key1": "value1", "key2": "value2"}, "publishTime": "2016-08-30 16:45:57.785" }
Key | Type | Required? | Explanation |
---|---|---|---|
messageId | string | yes | Message ID |
payload | string | yes | Base-64 encoded payload |
publishTime | string | yes | Publish timestamp |
properties | key-value pairs | no | Application-defined properties |
key | string | no | Original routing key set by producer |
In WebSocket, Reader needs to acknowledge the successful processing of the message to have the Pulsar WebSocket service update the number of pending messages. If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.
{ "messageId": "CAAQAw==" }
Key | Type | Required? | Explanation |
---|---|---|---|
messageId | string | yes | Message ID of the processed message |
In case of error the server will close the WebSocket session using the following error codes:
Error Code | Error Message |
---|---|
1 | Failed to create producer |
2 | Failed to subscribe |
3 | Failed to deserialize from JSON |
4 | Failed to serialize to JSON |
5 | Failed to authenticate client |
6 | Client is not authorized |
7 | Invalid payload encoding |
8 | Unknown error |
{% include admonition.html type=‘warning’ content=‘The application is responsible for re-establishing a new WebSocket session after a backoff period.’ %}
Below you'll find code examples for the Pulsar WebSocket API in Python and Node.js.
This example uses the websocket-client
package. You can install it using pip:
$ pip install websocket-client
You can also download it from PyPI.
Here's an example Python {% popover producer %} that sends a simple message to a Pulsar {% popover topic %}:
import websocket, base64, json TOPIC = 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic' ws = websocket.create_connection(TOPIC) # Send one message as JSON ws.send(json.dumps({ 'payload' : base64.b64encode('Hello World'), 'properties': { 'key1' : 'value1', 'key2' : 'value2' }, 'context' : 5 })) response = json.loads(ws.recv()) if response['result'] == 'ok': print 'Message published successfully' else: print 'Failed to publish message:', response ws.close()
Here's an example Python {% popover consumer %} that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives:
import websocket, base64, json TOPIC = 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub' ws = websocket.create_connection(TOPIC) while True: msg = json.loads(ws.recv()) if not msg: break print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
Here's an example Python reader that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives:
import websocket, base64, json TOPIC = 'ws://localhost:8080/ws/reader/persistent/sample/standalone/ns1/my-topic' ws = websocket.create_connection(TOPIC) while True: msg = json.loads(ws.recv()) if not msg: break print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
This example uses the ws
package. You can install it using npm:
$ npm install ws
Here's an example Node.js {% popover producer %} that sends a simple message to a Pulsar {% popover topic %}:
var WebSocket = require('ws'), topic = "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1", ws = new WebSocket(topic); var message = { "payload" : new Buffer("Hello World").toString('base64'), "properties": { "key1" : "value1", "key2" : "value2" }, "context" : "1" }; ws.on('open', function() { // Send one message ws.send(JSON.stringify(message)); }); ws.on('message', function(message) { console.log('received ack: %s', message); });
Here's an example Node.js {% popover consumer %} that listens on the same topic used by the producer above:
var WebSocket = require('ws'), topic = "ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub", ws = new WebSocket(topic); ws.on('message', function(message) { var receiveMsg = JSON.parse(message); console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString()); var ackMsg = {"messageId" : receiveMsg.messageId}; ws.send(JSON.stringify(ackMsg)); });
var WebSocket = require('ws'), topic = "ws://localhost:8080/ws/reader/persistent/my-property/us-west/my-ns/my-topic1", ws = new WebSocket(topic); ws.on('message', function(message) { var receiveMsg = JSON.parse(message); console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString()); var ackMsg = {"messageId" : receiveMsg.messageId}; ws.send(JSON.stringify(ackMsg)); });