---
title: Storm MQTT Integration
layout: documentation
documentation: true
---

## About

MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications.

Further information can be found at http://mqtt.org. The HiveMQ website has a great series on 
[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).

Features include:

* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
* Spout implementation(s) for subscribing to MQTT topics
* A bolt implementation for publishing MQTT messages
* A trident function implementation for publishing MQTT messages
* Authentication and TLS/SSL support
* User-defined "mappers" for converting MQTT messages to tuples (subscribers)
* User-defined "mappers" for converting tuples to MQTT messages (publishers)


## Quick Start
To quickly see MQTT integration in action, follow the instructions below.

**Start a MQTT broker and publisher**

The command below will create an MQTT broker on port 1883, and start a publsher that will publish random 
temperature/humidity values to an MQTT topic.

Open a terminal and execute the following command (change the path as necessary):

```bash
java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher
```

**Run the example toplogy**

Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout
publishing to a bolt that simply logs the information it receives.

In a separate terminal, run the following command (Note that the `storm` executable must be on your PATH):

```bash
storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local
```

You should see data from MQTT being logged by the bolt:

```
27020 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
27030 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
27040 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
27049 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
27059 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
27069 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
```

Either allow the local cluster to exit, or stop it by typing Cntrl-C.

**MQTT Fault Tolerance In Action**

After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker,
and it will continue to receive and queue messages (as long as the broker is running).

If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker,
it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a 
rate of about two messages per second.

This happens because, by default, the MQTT Spout creates a *session* when it subscribes -- that means it requests that
the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the 
`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at least once delivery*.

For more information about MQTT fault tolerance, see the **Delivery Guarantees** section below.



## Delivery Guarantees
In Storm terms, ***the MQTT Spout provides at least once delivery***, depending on the configuration of the publisher as
well as the MQTT spout.

The MQTT protocol defines the following QoS levels:

* `0` - At Most Once (AKA "Fire and Forget")
* `1` - At Least Once
* `2` - Exactly Once

This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being 
completely incinerated by a catasrophic event. This is in stark contrast with Storm's reliability model, which expects 
and embraces the concept of node failure.

So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure.

###Recommendations

*You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide 
transational semantics. You will only get at least once guarantees.*

If you need reliability guarantees (i.e. *at least once processing*):

1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` so the broker saves messages if/when the 
spout is offline.
2. Use the spout defaults (`cleanSession = false` and `qos = 1`)
3. If you can, make sure any result of receiving and MQTT message is idempotent.
4. Make sure your MQTT brokers don't die or get isolated due to a network partition. Be prepared for natural and 
man-made disasters and network partitions. Incineration and destruction happens.





## Configuration
For the full range of configuration options, see the JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`.

### Message Mappers
To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the 
`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this:

```java
public interface MqttMessageMapper extends Serializable {

    Values toValues(MqttMessage message);

    Fields outputFields();
}
```

The `MqttMessage` class contains the topic to which the message was published (`String`) and the message payload 
(`byte[]`). For example, here is a `MqttMessageMapper` implementation that produces tuples based on the content of both
the message topic and payload:

```java
/**
 * Given a topic name: "users/{user}/{location}/{deviceId}"
 * and a payload of "{temperature}/{humidity}"
 * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
 *
 */
public class CustomMessageMapper implements MqttMessageMapper {
    private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);


    public Values toValues(MqttMessage message) {
        String topic = message.getTopic();
        String[] topicElements = topic.split("/");
        String[] payloadElements = new String(message.getMessage()).split("/");

        return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), 
                Float.parseFloat(payloadElements[1]));
    }

    public Fields outputFields() {
        return new Fields("user", "deviceId", "location", "temperature", "humidity");
    }
}
```

### Tuple Mappers
When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages 
(topic/payload). This is done by implementing the `org.apache.storm.mqtt.MqttTupleMapper` interface:

```java
public interface MqttTupleMapper extends Serializable{

    MqttMessage toMessage(ITuple tuple);

}
```

For example, a simple `MqttTupleMapper` implementation might look like this:

```java
public class MyTupleMapper implements MqttTupleMapper {
    public MqttMessage toMessage(ITuple tuple) {
        String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device");
        byte[] payload = tuple.getStringByField("message").getBytes();
        return new MqttMessage(topic, payload);
    }
}
```

### MQTT Spout Parallelism
It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances
of the spout subscribed to the same topic(s), resulting in duplicate messages.

If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy 
and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined 
by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do 
something like the following:

```java
String spout1Topic = "users/east/#";
String spout2Topic = "users/west/#";
```

and then join the resulting streams together by subscribing a bolt to each stream.


### Using Flux

The following Flux YAML configuration creates the toplolgy used in the example:

```yaml
name: "mqtt-topology"

components:
   ########## MQTT Spout Config ############
  - id: "mqtt-type"
    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"

  - id: "mqtt-options"
    className: "org.apache.storm.mqtt.common.MqttOptions"
    properties:
      - name: "url"
        value: "tcp://localhost:1883"
      - name: "topics"
        value:
          - "/users/tgoetz/#"

# topology configuration
config:
  topology.workers: 1
  topology.max.spout.pending: 1000

# spout definitions
spouts:
  - id: "mqtt-spout"
    className: "org.apache.storm.mqtt.spout.MqttSpout"
    constructorArgs:
      - ref: "mqtt-type"
      - ref: "mqtt-options"
    parallelism: 1

# bolt definitions
bolts:
  - id: "log"
    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
    parallelism: 1


streams:
  - from: "mqtt-spout"
    to: "log"
    grouping:
      type: SHUFFLE

```


### Using Java

Similarly, you can create the same topology using the Storm Core Java API:

```java
TopologyBuilder builder = new TopologyBuilder();
MqttOptions options = new MqttOptions();
options.setTopics(Arrays.asList("/users/tgoetz/#"));
options.setCleanConnection(false);
MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);

MqttBolt bolt = new LogInfoBolt();

builder.setSpout("mqtt-spout", spout);
builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");

return builder.createTopology();
```

## SSL/TLS
If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout 
with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates.

### SSL/TLS URIs
To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` instead of `tcp://`. For further control over
the algorithm, you can specify a specific protocol:

 * `ssl://` Use the JVM default version of the SSL protocol.
 * `sslv*://` Use a specific version of the SSL protocol, where `*` is replaced by the version (e.g. `sslv3://`)
 * `tls://` Use the JVM default version of the TLS protocol.
 * `tlsv*://` Use a specific version of the TLS protocol, where `*` is replaced by the version (e.g. `tlsv1.1://`)
 
 
### Specifying Keystore/Truststore Locations
 
 The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors that take a `KeyStoreLoader` instance that
 is used to load the certificates required for TLS/SSL connections. For example:
 
```java
 public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader)
```
 
The `DefaultKeyStoreLoader` class can be used to load certificates from the local filesystem. Note that the 
keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use 
`DefaultKeyStoreLoader` you specify the location of the keystore/truststore file(s), and set the necessary passwords:

```java
DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks");
ksl.setKeyStorePassword("password");
ksl.setTrustStorePassword("password");
//...
```

If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor:

```java
DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
ksl.setKeyStorePassword("password");
//...
```

SSL/TLS can also be configured using Flux:

```yaml
name: "mqtt-topology"

components:
   ########## MQTT Spout Config ############
  - id: "mqtt-type"
    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"

  - id: "keystore-loader"
    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
    constructorArgs:
      - "keystore.jks"
      - "truststore.jks"
    properties:
      - name: "keyPassword"
        value: "password"
      - name: "keyStorePassword"
        value: "password"
      - name: "trustStorePassword"
        value: "password"

  - id: "mqtt-options"
    className: "org.apache.storm.mqtt.common.MqttOptions"
    properties:
      - name: "url"
        value: "ssl://raspberrypi.local:8883"
      - name: "topics"
        value:
          - "/users/tgoetz/#"

# topology configuration
config:
  topology.workers: 1
  topology.max.spout.pending: 1000

# spout definitions
spouts:
  - id: "mqtt-spout"
    className: "org.apache.storm.mqtt.spout.MqttSpout"
    constructorArgs:
      - ref: "mqtt-type"
      - ref: "mqtt-options"
      - ref: "keystore-loader"
    parallelism: 1

# bolt definitions
bolts:

  - id: "log"
    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
    parallelism: 1


streams:

  - from: "mqtt-spout"
    to: "log"
    grouping:
      type: SHUFFLE

```

## Committer Sponsors

 * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))