| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm MQTT Integration</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 2.3.0</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.3.0/index.html">2.3.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.1/index.html">2.2.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.0/index.html">2.2.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.1/index.html">2.1.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.0/index.html">2.1.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.4/index.html">1.2.4</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| <li><a href="/Powered-By.html">PoweredBy</a></li> |
| </ul> |
| </li> |
| <li><a href="/2021/10/14/storm211-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm MQTT Integration</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><h2 id="about">About</h2> |
| |
| <p>MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications.</p> |
| |
| <p>Further information can be found at <a href="http://mqtt.org">http://mqtt.org</a>. The HiveMQ website has a great series on |
| <a href="http://www.hivemq.com/mqtt-essentials/">MQTT Essentials</a>.</p> |
| |
| <p>Features include:</p> |
| |
| <ul> |
| <li>Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)</li> |
| <li>Spout implementation(s) for subscribing to MQTT topics</li> |
| <li>A bolt implementation for publishing MQTT messages</li> |
| <li>A trident function implementation for publishing MQTT messages</li> |
| <li>Authentication and TLS/SSL support</li> |
| <li>User-defined "mappers" for converting MQTT messages to tuples (subscribers)</li> |
| <li>User-defined "mappers" for converting tuples to MQTT messages (publishers)</li> |
| </ul> |
| |
| <h2 id="quick-start">Quick Start</h2> |
| |
| <p>To quickly see MQTT integration in action, follow the instructions below.</p> |
| |
| <p><strong>Start a MQTT broker and publisher</strong></p> |
| |
| <p>The command below will create an MQTT broker on port 1883, and start a publsher that will publish random |
| temperature/humidity values to an MQTT topic.</p> |
| |
| <p>Open a terminal and execute the following command (change the path as necessary):</p> |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash">java <span class="nt">-cp</span> examples/target/storm-mqtt-examples-<span class="k">*</span><span class="nt">-SNAPSHOT</span>.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher |
| </code></pre></div> |
| <p><strong>Run the example toplogy</strong></p> |
| |
| <p>Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout |
| publishing to a bolt that simply logs the information it receives.</p> |
| |
| <p>In a separate terminal, run the following command (Note that the <code>storm</code> executable must be on your PATH):</p> |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash">storm jar ./examples/target/storm-mqtt-examples-<span class="k">*</span><span class="nt">-SNAPSHOT</span>.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml <span class="nt">--local</span> |
| </code></pre></div> |
| <p>You should see data from MQTT being logged by the bolt:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">27020 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0} |
| 27030 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0} |
| 27040 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0} |
| 27049 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0} |
| 27059 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0} |
| 27069 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0} |
| </code></pre></div> |
| <p>Either allow the local cluster to exit, or stop it by typing Cntrl-C.</p> |
| |
| <p><strong>MQTT Fault Tolerance In Action</strong></p> |
| |
| <p>After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker, |
| and it will continue to receive and queue messages (as long as the broker is running).</p> |
| |
| <p>If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker, |
| it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a |
| rate of about two messages per second.</p> |
| |
| <p>This happens because, by default, the MQTT Spout creates a <em>session</em> when it subscribes -- that means it requests that |
| the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the |
| <code>MqttBrokerPublisher</code> publishes messages with a MQTT QoS of <code>1</code>, meaning <em>at least once delivery</em>.</p> |
| |
| <p>For more information about MQTT fault tolerance, see the <strong>Delivery Guarantees</strong> section below.</p> |
| |
| <h2 id="delivery-guarantees">Delivery Guarantees</h2> |
| |
| <p>In Storm terms, <strong><em>the MQTT Spout provides at least once delivery</em></strong>, depending on the configuration of the publisher as |
| well as the MQTT spout.</p> |
| |
| <p>The MQTT protocol defines the following QoS levels:</p> |
| |
| <ul> |
| <li><code>0</code> - At Most Once (AKA "Fire and Forget")</li> |
| <li><code>1</code> - At Least Once</li> |
| <li><code>2</code> - Exactly Once</li> |
| </ul> |
| |
| <p>This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being |
| completely incinerated by a catasrophic event. This is in stark contrast with Storm's reliability model, which expects |
| and embraces the concept of node failure.</p> |
| |
| <p>So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure.</p> |
| |
| <h3 id="recommendations">Recommendations</h3> |
| |
| <p><em>You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide |
| transational semantics. You will only get at least once guarantees.</em></p> |
| |
| <p>If you need reliability guarantees (i.e. <em>at least once processing</em>):</p> |
| |
| <ol> |
| <li>For MQTT publishers (outside of Storm), publish messages with a QoS of <code>1</code> so the broker saves messages if/when the |
| spout is offline.</li> |
| <li>Use the spout defaults (<code>cleanSession = false</code> and <code>qos = 1</code>)</li> |
| <li>If you can, make sure any result of receiving and MQTT message is idempotent.</li> |
| <li>Make sure your MQTT brokers don't die or get isolated due to a network partition. Be prepared for natural and |
| man-made disasters and network partitions. Incineration and destruction happens.</li> |
| </ol> |
| |
| <h2 id="configuration">Configuration</h2> |
| |
| <p>For the full range of configuration options, see the JavaDoc for <code>org.apache.storm.mqtt.common.MqttOptions</code>.</p> |
| |
| <h3 id="message-mappers">Message Mappers</h3> |
| |
| <p>To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the |
| <code>org.apache.storm.mqtt.MqttMessageMapper</code> interface, which looks like this:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MqttMessageMapper</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| |
| <span class="n">Values</span> <span class="nf">toValues</span><span class="o">(</span><span class="n">MqttMessage</span> <span class="n">message</span><span class="o">);</span> |
| |
| <span class="n">Fields</span> <span class="nf">outputFields</span><span class="o">();</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The <code>MqttMessage</code> class contains the topic to which the message was published (<code>String</code>) and the message payload |
| (<code>byte[]</code>). For example, here is a <code>MqttMessageMapper</code> implementation that produces tuples based on the content of both |
| the message topic and payload:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/** |
| * Given a topic name: "users/{user}/{location}/{deviceId}" |
| * and a payload of "{temperature}/{humidity}" |
| * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float) |
| * |
| */</span> |
| <span class="kd">public</span> <span class="kd">class</span> <span class="nc">CustomMessageMapper</span> <span class="kd">implements</span> <span class="n">MqttMessageMapper</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Logger</span> <span class="n">LOG</span> <span class="o">=</span> <span class="n">LoggerFactory</span><span class="o">.</span><span class="na">getLogger</span><span class="o">(</span><span class="n">CustomMessageMapper</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| |
| |
| <span class="kd">public</span> <span class="n">Values</span> <span class="nf">toValues</span><span class="o">(</span><span class="n">MqttMessage</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">String</span> <span class="n">topic</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="na">getTopic</span><span class="o">();</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">topicElements</span> <span class="o">=</span> <span class="n">topic</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"/"</span><span class="o">);</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">payloadElements</span> <span class="o">=</span> <span class="k">new</span> <span class="n">String</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getMessage</span><span class="o">()).</span><span class="na">split</span><span class="o">(</span><span class="s">"/"</span><span class="o">);</span> |
| |
| <span class="k">return</span> <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="n">topicElements</span><span class="o">[</span><span class="mi">2</span><span class="o">],</span> <span class="n">topicElements</span><span class="o">[</span><span class="mi">4</span><span class="o">],</span> <span class="n">topicElements</span><span class="o">[</span><span class="mi">3</span><span class="o">],</span> <span class="n">Float</span><span class="o">.</span><span class="na">parseFloat</span><span class="o">(</span><span class="n">payloadElements</span><span class="o">[</span><span class="mi">0</span><span class="o">]),</span> |
| <span class="n">Float</span><span class="o">.</span><span class="na">parseFloat</span><span class="o">(</span><span class="n">payloadElements</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="n">Fields</span> <span class="nf">outputFields</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">"user"</span><span class="o">,</span> <span class="s">"deviceId"</span><span class="o">,</span> <span class="s">"location"</span><span class="o">,</span> <span class="s">"temperature"</span><span class="o">,</span> <span class="s">"humidity"</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <h3 id="tuple-mappers">Tuple Mappers</h3> |
| |
| <p>When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages |
| (topic/payload). This is done by implementing the <code>org.apache.storm.mqtt.MqttTupleMapper</code> interface:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MqttTupleMapper</span> <span class="kd">extends</span> <span class="n">Serializable</span><span class="o">{</span> |
| |
| <span class="n">MqttMessage</span> <span class="nf">toMessage</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">tuple</span><span class="o">);</span> |
| |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>For example, a simple <code>MqttTupleMapper</code> implementation might look like this:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyTupleMapper</span> <span class="kd">implements</span> <span class="n">MqttTupleMapper</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">MqttMessage</span> <span class="nf">toMessage</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">String</span> <span class="n">topic</span> <span class="o">=</span> <span class="s">"users/"</span> <span class="o">+</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"userId"</span><span class="o">)</span> <span class="o">+</span> <span class="s">"/"</span> <span class="o">+</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"device"</span><span class="o">);</span> |
| <span class="kt">byte</span><span class="o">[]</span> <span class="n">payload</span> <span class="o">=</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"message"</span><span class="o">).</span><span class="na">getBytes</span><span class="o">();</span> |
| <span class="k">return</span> <span class="k">new</span> <span class="nf">MqttMessage</span><span class="o">(</span><span class="n">topic</span><span class="o">,</span> <span class="n">payload</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <h3 id="mqtt-spout-parallelism">MQTT Spout Parallelism</h3> |
| |
| <p>It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances |
| of the spout subscribed to the same topic(s), resulting in duplicate messages.</p> |
| |
| <p>If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy |
| and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined |
| by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do |
| something like the following:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">String</span> <span class="n">spout1Topic</span> <span class="o">=</span> <span class="s">"users/east/#"</span><span class="o">;</span> |
| <span class="n">String</span> <span class="n">spout2Topic</span> <span class="o">=</span> <span class="s">"users/west/#"</span><span class="o">;</span> |
| </code></pre></div> |
| <p>and then join the resulting streams together by subscribing a bolt to each stream.</p> |
| |
| <h3 id="using-flux">Using Flux</h3> |
| |
| <p>The following Flux YAML configuration creates the toplolgy used in the example:</p> |
| <div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-topology"</span> |
| |
| <span class="na">components</span><span class="pi">:</span> |
| <span class="c1">########## MQTT Spout Config ############</span> |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.examples.CustomMessageMapper"</span> |
| |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.common.MqttOptions"</span> |
| <span class="na">properties</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">url"</span> |
| <span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">tcp://localhost:1883"</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">topics"</span> |
| <span class="na">value</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="s2">"</span><span class="s">/users/tgoetz/#"</span> |
| |
| <span class="c1"># topology configuration</span> |
| <span class="na">config</span><span class="pi">:</span> |
| <span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span> |
| <span class="s">topology.max.spout.pending</span><span class="pi">:</span> <span class="s">1000</span> |
| |
| <span class="c1"># spout definitions</span> |
| <span class="na">spouts</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.spout.MqttSpout"</span> |
| <span class="na">constructorArgs</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span> |
| <span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span> |
| <span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span> |
| |
| <span class="c1"># bolt definitions</span> |
| <span class="na">bolts</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span> |
| <span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span> |
| |
| |
| <span class="na">streams</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span> |
| <span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span> |
| <span class="na">grouping</span><span class="pi">:</span> |
| <span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span> |
| |
| </code></pre></div> |
| <h3 id="using-java">Using Java</h3> |
| |
| <p>Similarly, you can create the same topology using the Storm Core Java API:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span> |
| <span class="n">MqttOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MqttOptions</span><span class="o">();</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">setTopics</span><span class="o">(</span><span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"/users/tgoetz/#"</span><span class="o">));</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">setCleanConnection</span><span class="o">(</span><span class="kc">false</span><span class="o">);</span> |
| <span class="n">MqttSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MqttSpout</span><span class="o">(</span><span class="k">new</span> <span class="n">StringMessageMapper</span><span class="o">(),</span> <span class="n">options</span><span class="o">);</span> |
| |
| <span class="n">MqttBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">LogInfoBolt</span><span class="o">();</span> |
| |
| <span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"mqtt-spout"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span> |
| <span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"log-bolt"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"mqtt-spout"</span><span class="o">);</span> |
| |
| <span class="k">return</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">();</span> |
| </code></pre></div> |
| <h2 id="ssl-tls">SSL/TLS</h2> |
| |
| <p>If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout |
| with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates.</p> |
| |
| <h3 id="ssl-tls-uris">SSL/TLS URIs</h3> |
| |
| <p>To connect over SSL/TLS use a URI with a prefix of <code>ssl://</code> or <code>tls://</code> instead of <code>tcp://</code>. For further control over |
| the algorithm, you can specify a specific protocol:</p> |
| |
| <ul> |
| <li><code>ssl://</code> Use the JVM default version of the SSL protocol.</li> |
| <li><code>sslv*://</code> Use a specific version of the SSL protocol, where <code>*</code> is replaced by the version (e.g. <code>sslv3://</code>)</li> |
| <li><code>tls://</code> Use the JVM default version of the TLS protocol.</li> |
| <li><code>tlsv*://</code> Use a specific version of the TLS protocol, where <code>*</code> is replaced by the version (e.g. <code>tlsv1.1://</code>)</li> |
| </ul> |
| |
| <h3 id="specifying-keystore-truststore-locations">Specifying Keystore/Truststore Locations</h3> |
| |
| <p>The <code>MqttSpout</code>, <code>MqttBolt</code> and <code>MqttPublishFunction</code> all have constructors that take a <code>KeyStoreLoader</code> instance that |
| is used to load the certificates required for TLS/SSL connections. For example:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="kd">public</span> <span class="nf">MqttSpout</span><span class="o">(</span><span class="n">MqttMessageMapper</span> <span class="n">type</span><span class="o">,</span> <span class="n">MqttOptions</span> <span class="n">options</span><span class="o">,</span> <span class="n">KeyStoreLoader</span> <span class="n">keyStoreLoader</span><span class="o">)</span> |
| </code></pre></div> |
| <p>The <code>DefaultKeyStoreLoader</code> class can be used to load certificates from the local filesystem. Note that the |
| keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use |
| <code>DefaultKeyStoreLoader</code> you specify the location of the keystore/truststore file(s), and set the necessary passwords:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DefaultKeyStoreLoader</span> <span class="n">ksl</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultKeyStoreLoader</span><span class="o">(</span><span class="s">"/path/to/keystore.jks"</span><span class="o">,</span> <span class="s">"/path/to/truststore.jks"</span><span class="o">);</span> |
| <span class="n">ksl</span><span class="o">.</span><span class="na">setKeyStorePassword</span><span class="o">(</span><span class="s">"password"</span><span class="o">);</span> |
| <span class="n">ksl</span><span class="o">.</span><span class="na">setTrustStorePassword</span><span class="o">(</span><span class="s">"password"</span><span class="o">);</span> |
| <span class="c1">//...</span> |
| </code></pre></div> |
| <p>If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DefaultKeyStoreLoader</span> <span class="n">ksl</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultKeyStoreLoader</span><span class="o">(</span><span class="s">"/path/to/keystore.jks"</span><span class="o">);</span> |
| <span class="n">ksl</span><span class="o">.</span><span class="na">setKeyStorePassword</span><span class="o">(</span><span class="s">"password"</span><span class="o">);</span> |
| <span class="c1">//...</span> |
| </code></pre></div> |
| <p>SSL/TLS can also be configured using Flux:</p> |
| <div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-topology"</span> |
| |
| <span class="na">components</span><span class="pi">:</span> |
| <span class="c1">########## MQTT Spout Config ############</span> |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.examples.CustomMessageMapper"</span> |
| |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keystore-loader"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"</span> |
| <span class="na">constructorArgs</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="s2">"</span><span class="s">keystore.jks"</span> |
| <span class="pi">-</span> <span class="s2">"</span><span class="s">truststore.jks"</span> |
| <span class="na">properties</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keyPassword"</span> |
| <span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">password"</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keyStorePassword"</span> |
| <span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">password"</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">trustStorePassword"</span> |
| <span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">password"</span> |
| |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.common.MqttOptions"</span> |
| <span class="na">properties</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">url"</span> |
| <span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">ssl://raspberrypi.local:8883"</span> |
| <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">topics"</span> |
| <span class="na">value</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="s2">"</span><span class="s">/users/tgoetz/#"</span> |
| |
| <span class="c1"># topology configuration</span> |
| <span class="na">config</span><span class="pi">:</span> |
| <span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span> |
| <span class="s">topology.max.spout.pending</span><span class="pi">:</span> <span class="s">1000</span> |
| |
| <span class="c1"># spout definitions</span> |
| <span class="na">spouts</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.spout.MqttSpout"</span> |
| <span class="na">constructorArgs</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span> |
| <span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span> |
| <span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keystore-loader"</span> |
| <span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span> |
| |
| <span class="c1"># bolt definitions</span> |
| <span class="na">bolts</span><span class="pi">:</span> |
| |
| <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span> |
| <span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span> |
| <span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span> |
| |
| |
| <span class="na">streams</span><span class="pi">:</span> |
| |
| <span class="pi">-</span> <span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span> |
| <span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span> |
| <span class="na">grouping</span><span class="pi">:</span> |
| <span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span> |
| |
| </code></pre></div> |
| <h2 id="committer-sponsors">Committer Sponsors</h2> |
| |
| <ul> |
| <li>P. Taylor Goetz (<a href="mailto:ptgoetz@apache.org">ptgoetz@apache.org</a>)</li> |
| </ul> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2021 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |