title: Pulsar adaptor for Apache Storm tags: [storm, java]

Pulsar Storm is an adaptor for integrating with Apache Storm topologies. It provides core Storm implementations for sending and receiving data.

An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.

Using the Pulsar Storm Adaptor

Include dependency for Pulsar Storm Adaptor:


Pulsar Spout

The Pulsar Spout allows for the data published on a {% popover topic %} to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the MessageToValuesMapper provided by the client.

The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is {% popover acknowledged %} by the consumer. Here's an example construction of a spout:

// Configure a Pulsar Client
ClientConfiguration clientConf = new ClientConfiguration();

// Configure a Pulsar Consumer
ConsumerConfiguration consumerConf = new ConsumerConfiguration();  

MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {

    public Values toValues(Message msg) {
        return new Values(new String(msg.getData()));

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // declare the output fields
        declarer.declare(new Fields("string"));

// Configure a Pulsar Spout
PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();

// Create a Pulsar Spout
PulsarSpout spout = new PulsarSpout(spoutConf, clientConf, consumerConf);

Pulsar Bolt

The Pulsar bolt allows data in a Storm topology to be published on a {% popover topic %}. It publishes messages based on the Storm tuple received and the TupleToMessageMapper provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the TupleToMessageMapper, a “key” will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:

// Configure a Pulsar Client
ClientConfiguration clientConf = new ClientConfiguration();

// Configure a Pulsar Producer  
ProducerConfiguration producerConf = new ProducerConfiguration();

TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {

    public Message toMessage(Tuple tuple) {
        String receivedMessage = tuple.getString(0);
        // message processing
        String processedMsg = receivedMessage + "-processed";
        return MessageBuilder.create().setContent(processedMsg.getBytes()).build();

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // declare the output fields

// Configure a Pulsar Bolt
PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();

// Create a Pulsar Bolt
PulsarBolt bolt = new PulsarBolt(boltConf, clientConf);


You can find a complete example [here]({{ site.pulsar_repo }}/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java).