| --- |
| title: Pulsar's WebSocket API |
| tags: [websocket, nodejs, python] |
| --- |
| |
| <!-- |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| --> |
| |
| Pulsar's [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) API is meant to provide a simple way to interact with Pulsar using languages that do not have an official [client library](../../getting-started/Clients). Through WebSockets you can publish and consume messages and use all the features available in the [Java](../Java), [Python](../Python), and [C++](../Cpp) client libraries. |
| |
| {% include admonition.html type="success" content="You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js [below](#client-examples)." %} |
| |
| ## Running the WebSocket service |
| |
| The {% popover standalone %} variant of Pulsar that we recommend using for [local development](../../getting-started/LocalCluster) already has the WebSocket service enabled. |
| |
| In non-standalone mode, there are two ways to deploy the WebSocket service: |
| |
| * [embedded](#embedded-with-a-pulsar-broker) with a Pulsar {% popover broker %} |
| * as a [separate component](#as-a-separate-component) |
| |
| ### Embedded with a Pulsar broker |
| |
| In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the [`webSocketServiceEnabled`](../../reference/Configuration#broker-webSocketServiceEnabled) parameter in the [`conf/broker.conf`](../../reference/Configuration#broker) configuration file in your installation. |
| |
| ```properties |
| webSocketServiceEnabled=true |
| ``` |
| |
| ### As a separate component |
| |
| In this mode, the WebSocket service will be run from a Pulsar {% popover broker %} as a separate service. Configuration for this mode is handled in the [`conf/websocket.conf`](../../reference/Configuration#websocket) configuration file. You'll need to set *at least* the following parameters: |
| |
| * [`globalZookeeperServers`](../../reference/Configuration#websocket-globalZookeeperServers) |
| * [`webServicePort`](../../reference/Configuration#websocket-webServicePort) |
| * [`clusterName`](../../reference/Configuration#websocket-clusterName) |
| |
| Here's an example: |
| |
| ```properties |
| globalZookeeperServers=zk1:2181,zk2:2181,zk3:2181 |
| webServicePort=8080 |
| clusterName=my-cluster |
| ``` |
| |
| ### Starting the broker |
| |
| When the configuration is set, you can start the service using the [`pulsar-daemon`](../../reference/CliTools#pulsar-daemon) tool: |
| |
| ```shell |
| $ bin/pulsar-daemon start websocket |
| ``` |
| |
| ## API Reference |
| |
| Pulsar's WebSocket API offers three endpoints for [producing](#producer-endpoint) messages, [consuming](#consumer-endpoint) messages and [reading](#reader-endpoint) messages. |
| |
| All exchanges via the WebSocket API use JSON. |
| |
| ### Producer endpoint |
| |
| The producer endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL: |
| |
| {% endpoint ws://broker-service-url:8080/ws/producer/persistent/:property/:cluster/:namespace/:topic %} |
| |
| ##### Query param |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `sendTimeoutMillis` | long | no | Send timeout (default: 30 secs) |
| `batchingEnabled` | boolean | no | Enable batching of messages (default: false) |
| `batchingMaxMessages` | int | no | Maximum number of messages permitted in a batch (default: 1000) |
| `maxPendingMessages` | int | no | Set the max size of the internal-queue holding the messages (default: 1000) |
| `batchingMaxPublishDelay` | long | no | Time period within which the messages will be batched (default: 10ms) |
| `messageRoutingMode` | string | no | Message [routing mode](https://pulsar.incubator.apache.org/api/client/index.html?org/apache/pulsar/client/api/ProducerConfiguration.MessageRoutingMode.html) for the partitioned producer: SinglePartition/RoundRobinPartition |
| `compressionType` | string | no | Compression [type](https://pulsar.incubator.apache.org/api/client/index.html?org/apache/pulsar/client/api/CompressionType.html): LZ4/ZLIB |
| |
| |
| #### Publishing a message |
| |
| ```json |
| { |
| "payload": "SGVsbG8gV29ybGQ=", |
| "properties": {"key1": "value1", "key2": "value2"}, |
| "context": "1" |
| } |
| ``` |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `payload` | string | yes | Base-64 encoded payload |
| `properties` | key-value pairs | no | Application-defined properties |
| `context` | string | no | Application-defined request identifier |
| `key` | string | no | For partitioned topics, decides which partition to use |
| `replicationClusters` | array | no | Restrict replication to this list of {% popover clusters %}, specified by name |
| |
| |
| ##### Example success response |
| |
| ```json |
| { |
| "result": "ok", |
| "messageId": "CAAQAw==", |
| "context": "1" |
| } |
| ``` |
| ##### Example failure response |
| |
| ```json |
| { |
| "result": "send-error:3", |
| "errorMsg": "Failed to de-serialize from JSON", |
| "context": "1" |
| } |
| ``` |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `result` | string | yes | `ok` if successful or an error message if unsuccessful |
| `messageId` | string | yes | Message ID assigned to the published message |
| `context` | string | no | Application-defined request identifier |
| |
| |
| ### Consumer endpoint |
| |
| The consumer endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %}, as well as a {% popover subscription %}, in the URL: |
| |
| {% endpoint ws://broker-service-url:8080/ws/consumer/persistent/:property/:cluster/:namespace/:topic/:subscription %} |
| |
| ##### Query param |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `ackTimeoutMillis` | long | no | Set the timeout for unacked messages (default: 0) |
| `subscriptionType` | string | no | [Subscription type](https://pulsar.incubator.apache.org/api/client/index.html?org/apache/pulsar/client/api/SubscriptionType.html): Exclusive/Failover/Shared |
| `receiverQueueSize` | int | no | Size of the consumer receive queue (default: 1000) |
| `consumerName` | string | no | Consumer name |
| |
| ##### Receiving messages |
| |
| Server will push messages on the WebSocket session: |
| |
| ```json |
| { |
| "messageId": "CAAQAw==", |
| "payload": "SGVsbG8gV29ybGQ=", |
| "properties": {"key1": "value1", "key2": "value2"}, |
| "publishTime": "2016-08-30 16:45:57.785" |
| } |
| ``` |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `messageId` | string | yes | Message ID |
| `payload` | string | yes | Base-64 encoded payload |
| `publishTime` | string | yes | Publish timestamp |
| `properties` | key-value pairs | no | Application-defined properties |
| `key` | string | no | Original routing key set by producer |
| |
| #### Acknowledging the message |
| |
| Consumer needs to acknowledge the successful processing of the message to |
| have the Pulsar broker delete it. |
| |
| ```json |
| { |
| "messageId": "CAAQAw==" |
| } |
| ``` |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `messageId`| string | yes | Message ID of the processed message |
| |
| |
| ### Reader endpoint |
| |
| The reader endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL: |
| |
| {% endpoint ws://broker-service-url:8080/ws/reader/persistent/:property/:cluster/:namespace/:topic %} |
| |
| ##### Query param |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `readerName` | string | no | Reader name |
| `receiverQueueSize` | int | no | Size of the consumer receive queue (default: 1000) |
| |
| ##### Receiving messages |
| |
| Server will push messages on the WebSocket session: |
| |
| ```json |
| { |
| "messageId": "CAAQAw==", |
| "payload": "SGVsbG8gV29ybGQ=", |
| "properties": {"key1": "value1", "key2": "value2"}, |
| "publishTime": "2016-08-30 16:45:57.785" |
| } |
| ``` |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `messageId` | string | yes | Message ID |
| `payload` | string | yes | Base-64 encoded payload |
| `publishTime` | string | yes | Publish timestamp |
| `properties` | key-value pairs | no | Application-defined properties |
| `key` | string | no | Original routing key set by producer |
| |
| #### Acknowledging the message |
| |
| **In WebSocket**, Reader needs to acknowledge the successful processing of the message to |
| have the Pulsar WebSocket service update the number of pending messages. |
| If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit. |
| |
| ```json |
| { |
| "messageId": "CAAQAw==" |
| } |
| ``` |
| |
| Key | Type | Required? | Explanation |
| :---|:-----|:----------|:----------- |
| `messageId`| string | yes | Message ID of the processed message |
| |
| |
| ### Error codes |
| |
| In case of error the server will close the WebSocket session using the |
| following error codes: |
| |
| Error Code | Error Message |
| :----------|:------------- |
| 1 | Failed to create producer |
| 2 | Failed to subscribe |
| 3 | Failed to deserialize from JSON |
| 4 | Failed to serialize to JSON |
| 5 | Failed to authenticate client |
| 6 | Client is not authorized |
| 7 | Invalid payload encoding |
| 8 | Unknown error |
| |
| {% include admonition.html type='warning' content='The application is responsible for re-establishing a new WebSocket session after a backoff period.' %} |
| |
| ## Client examples |
| |
| Below you'll find code examples for the Pulsar WebSocket API in [Python](#python) and [Node.js](#nodejs). |
| |
| ### Python |
| |
| This example uses the [`websocket-client`](https://pypi.python.org/pypi/websocket-client) package. You can install it using [pip](https://pypi.python.org/pypi/pip): |
| |
| ```shell |
| $ pip install websocket-client |
| ``` |
| |
| You can also download it from [PyPI](https://pypi.python.org/pypi/websocket-client). |
| |
| #### Python producer |
| |
| Here's an example Python {% popover producer %} that sends a simple message to a Pulsar {% popover topic %}: |
| |
| ```python |
| import websocket, base64, json |
| |
| TOPIC = 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic' |
| |
| ws = websocket.create_connection(TOPIC) |
| |
| # Send one message as JSON |
| ws.send(json.dumps({ |
| 'payload' : base64.b64encode('Hello World'), |
| 'properties': { |
| 'key1' : 'value1', |
| 'key2' : 'value2' |
| }, |
| 'context' : 5 |
| })) |
| |
| response = json.loads(ws.recv()) |
| if response['result'] == 'ok': |
| print 'Message published successfully' |
| else: |
| print 'Failed to publish message:', response |
| ws.close() |
| ``` |
| |
| #### Python consumer |
| |
| Here's an example Python {% popover consumer %} that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives: |
| |
| ```python |
| import websocket, base64, json |
| |
| TOPIC = 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub' |
| |
| ws = websocket.create_connection(TOPIC) |
| |
| while True: |
| msg = json.loads(ws.recv()) |
| if not msg: break |
| |
| print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])) |
| |
| # Acknowledge successful processing |
| ws.send(json.dumps({'messageId' : msg['messageId']})) |
| |
| ws.close() |
| ``` |
| |
| #### Python reader |
| |
| Here's an example Python reader that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives: |
| |
| ```python |
| import websocket, base64, json |
| |
| TOPIC = 'ws://localhost:8080/ws/reader/persistent/sample/standalone/ns1/my-topic' |
| |
| ws = websocket.create_connection(TOPIC) |
| |
| while True: |
| msg = json.loads(ws.recv()) |
| if not msg: break |
| |
| print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])) |
| |
| # Acknowledge successful processing |
| ws.send(json.dumps({'messageId' : msg['messageId']})) |
| |
| ws.close() |
| ``` |
| |
| ### Node.js |
| |
| This example uses the [`ws`](https://websockets.github.io/ws/) package. You can install it using [npm](https://www.npmjs.com/): |
| |
| ```shell |
| $ npm install ws |
| ``` |
| |
| #### Node.js producer |
| |
| Here's an example Node.js {% popover producer %} that sends a simple message to a Pulsar {% popover topic %}: |
| |
| ```javascript |
| var WebSocket = require('ws'), |
| topic = "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1", |
| ws = new WebSocket(topic); |
| |
| var message = { |
| "payload" : new Buffer("Hello World").toString('base64'), |
| "properties": { |
| "key1" : "value1", |
| "key2" : "value2" |
| }, |
| "context" : "1" |
| }; |
| |
| ws.on('open', function() { |
| // Send one message |
| ws.send(JSON.stringify(message)); |
| }); |
| |
| ws.on('message', function(message) { |
| console.log('received ack: %s', message); |
| }); |
| ``` |
| |
| #### Node.js consumer |
| |
| Here's an example Node.js {% popover consumer %} that listens on the same topic used by the producer above: |
| |
| ```javascript |
| var WebSocket = require('ws'), |
| topic = "ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub", |
| ws = new WebSocket(topic); |
| |
| ws.on('message', function(message) { |
| var receiveMsg = JSON.parse(message); |
| console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString()); |
| var ackMsg = {"messageId" : receiveMsg.messageId}; |
| ws.send(JSON.stringify(ackMsg)); |
| }); |
| ``` |
| |
| #### NodeJS reader |
| ```javascript |
| var WebSocket = require('ws'), |
| topic = "ws://localhost:8080/ws/reader/persistent/my-property/us-west/my-ns/my-topic1", |
| ws = new WebSocket(topic); |
| |
| ws.on('message', function(message) { |
| var receiveMsg = JSON.parse(message); |
| console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString()); |
| var ackMsg = {"messageId" : receiveMsg.messageId}; |
| ws.send(JSON.stringify(ackMsg)); |
| }); |
| ``` |