blob: 2ccc14bf42c0eb453b0b697a85a218064a088a60 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm MQTT Integration</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.2.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm MQTT Integration</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><h2 id="about">About</h2>
<p>MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications.</p>
<p>Further information can be found at <a href="http://mqtt.org">http://mqtt.org</a>. The HiveMQ website has a great series on
<a href="http://www.hivemq.com/mqtt-essentials/">MQTT Essentials</a>.</p>
<p>Features include:</p>
<ul>
<li>Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)</li>
<li>Spout implementation(s) for subscribing to MQTT topics</li>
<li>A bolt implementation for publishing MQTT messages</li>
<li>A trident function implementation for publishing MQTT messages</li>
<li>Authentication and TLS/SSL support</li>
<li>User-defined &quot;mappers&quot; for converting MQTT messages to tuples (subscribers)</li>
<li>User-defined &quot;mappers&quot; for converting tuples to MQTT messages (publishers)</li>
</ul>
<h2 id="quick-start">Quick Start</h2>
<p>To quickly see MQTT integration in action, follow the instructions below.</p>
<p><strong>Start a MQTT broker and publisher</strong></p>
<p>The command below will create an MQTT broker on port 1883, and start a publsher that will publish random
temperature/humidity values to an MQTT topic.</p>
<p>Open a terminal and execute the following command (change the path as necessary):</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash">java <span class="nt">-cp</span> examples/target/storm-mqtt-examples-<span class="k">*</span><span class="nt">-SNAPSHOT</span>.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher
</code></pre></div>
<p><strong>Run the example toplogy</strong></p>
<p>Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout
publishing to a bolt that simply logs the information it receives.</p>
<p>In a separate terminal, run the following command (Note that the <code>storm</code> executable must be on your PATH):</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash">storm jar ./examples/target/storm-mqtt-examples-<span class="k">*</span><span class="nt">-SNAPSHOT</span>.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml <span class="nt">--local</span>
</code></pre></div>
<p>You should see data from MQTT being logged by the bolt:</p>
<div class="highlight"><pre><code class="language-" data-lang="">27020 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
27030 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
27040 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
27049 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
27059 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
27069 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
</code></pre></div>
<p>Either allow the local cluster to exit, or stop it by typing Cntrl-C.</p>
<p><strong>MQTT Fault Tolerance In Action</strong></p>
<p>After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker,
and it will continue to receive and queue messages (as long as the broker is running).</p>
<p>If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker,
it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a
rate of about two messages per second.</p>
<p>This happens because, by default, the MQTT Spout creates a <em>session</em> when it subscribes -- that means it requests that
the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the
<code>MqttBrokerPublisher</code> publishes messages with a MQTT QoS of <code>1</code>, meaning <em>at least once delivery</em>.</p>
<p>For more information about MQTT fault tolerance, see the <strong>Delivery Guarantees</strong> section below.</p>
<h2 id="delivery-guarantees">Delivery Guarantees</h2>
<p>In Storm terms, <strong><em>the MQTT Spout provides at least once delivery</em></strong>, depending on the configuration of the publisher as
well as the MQTT spout.</p>
<p>The MQTT protocol defines the following QoS levels:</p>
<ul>
<li><code>0</code> - At Most Once (AKA &quot;Fire and Forget&quot;)</li>
<li><code>1</code> - At Least Once</li>
<li><code>2</code> - Exactly Once</li>
</ul>
<p>This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being
completely incinerated by a catasrophic event. This is in stark contrast with Storm&#39;s reliability model, which expects
and embraces the concept of node failure.</p>
<p>So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure.</p>
<h3 id="recommendations">Recommendations</h3>
<p><em>You will never get at exactly once processing with this spout. It can be used with Trident, but it won&#39;t provide
transational semantics. You will only get at least once guarantees.</em></p>
<p>If you need reliability guarantees (i.e. <em>at least once processing</em>):</p>
<ol>
<li>For MQTT publishers (outside of Storm), publish messages with a QoS of <code>1</code> so the broker saves messages if/when the
spout is offline.</li>
<li>Use the spout defaults (<code>cleanSession = false</code> and <code>qos = 1</code>)</li>
<li>If you can, make sure any result of receiving and MQTT message is idempotent.</li>
<li>Make sure your MQTT brokers don&#39;t die or get isolated due to a network partition. Be prepared for natural and
man-made disasters and network partitions. Incineration and destruction happens.</li>
</ol>
<h2 id="configuration">Configuration</h2>
<p>For the full range of configuration options, see the JavaDoc for <code>org.apache.storm.mqtt.common.MqttOptions</code>.</p>
<h3 id="message-mappers">Message Mappers</h3>
<p>To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the
<code>org.apache.storm.mqtt.MqttMessageMapper</code> interface, which looks like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MqttMessageMapper</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="n">Values</span> <span class="nf">toValues</span><span class="o">(</span><span class="n">MqttMessage</span> <span class="n">message</span><span class="o">);</span>
<span class="n">Fields</span> <span class="nf">outputFields</span><span class="o">();</span>
<span class="o">}</span>
</code></pre></div>
<p>The <code>MqttMessage</code> class contains the topic to which the message was published (<code>String</code>) and the message payload
(<code>byte[]</code>). For example, here is a <code>MqttMessageMapper</code> implementation that produces tuples based on the content of both
the message topic and payload:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/**
* Given a topic name: "users/{user}/{location}/{deviceId}"
* and a payload of "{temperature}/{humidity}"
* emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
*
*/</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">CustomMessageMapper</span> <span class="kd">implements</span> <span class="n">MqttMessageMapper</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Logger</span> <span class="n">LOG</span> <span class="o">=</span> <span class="n">LoggerFactory</span><span class="o">.</span><span class="na">getLogger</span><span class="o">(</span><span class="n">CustomMessageMapper</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="kd">public</span> <span class="n">Values</span> <span class="nf">toValues</span><span class="o">(</span><span class="n">MqttMessage</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">topic</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="na">getTopic</span><span class="o">();</span>
<span class="n">String</span><span class="o">[]</span> <span class="n">topicElements</span> <span class="o">=</span> <span class="n">topic</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"/"</span><span class="o">);</span>
<span class="n">String</span><span class="o">[]</span> <span class="n">payloadElements</span> <span class="o">=</span> <span class="k">new</span> <span class="n">String</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getMessage</span><span class="o">()).</span><span class="na">split</span><span class="o">(</span><span class="s">"/"</span><span class="o">);</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="n">topicElements</span><span class="o">[</span><span class="mi">2</span><span class="o">],</span> <span class="n">topicElements</span><span class="o">[</span><span class="mi">4</span><span class="o">],</span> <span class="n">topicElements</span><span class="o">[</span><span class="mi">3</span><span class="o">],</span> <span class="n">Float</span><span class="o">.</span><span class="na">parseFloat</span><span class="o">(</span><span class="n">payloadElements</span><span class="o">[</span><span class="mi">0</span><span class="o">]),</span>
<span class="n">Float</span><span class="o">.</span><span class="na">parseFloat</span><span class="o">(</span><span class="n">payloadElements</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="n">Fields</span> <span class="nf">outputFields</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">"user"</span><span class="o">,</span> <span class="s">"deviceId"</span><span class="o">,</span> <span class="s">"location"</span><span class="o">,</span> <span class="s">"temperature"</span><span class="o">,</span> <span class="s">"humidity"</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<h3 id="tuple-mappers">Tuple Mappers</h3>
<p>When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages
(topic/payload). This is done by implementing the <code>org.apache.storm.mqtt.MqttTupleMapper</code> interface:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MqttTupleMapper</span> <span class="kd">extends</span> <span class="n">Serializable</span><span class="o">{</span>
<span class="n">MqttMessage</span> <span class="nf">toMessage</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>For example, a simple <code>MqttTupleMapper</code> implementation might look like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyTupleMapper</span> <span class="kd">implements</span> <span class="n">MqttTupleMapper</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">MqttMessage</span> <span class="nf">toMessage</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">topic</span> <span class="o">=</span> <span class="s">"users/"</span> <span class="o">+</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"userId"</span><span class="o">)</span> <span class="o">+</span> <span class="s">"/"</span> <span class="o">+</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"device"</span><span class="o">);</span>
<span class="kt">byte</span><span class="o">[]</span> <span class="n">payload</span> <span class="o">=</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"message"</span><span class="o">).</span><span class="na">getBytes</span><span class="o">();</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">MqttMessage</span><span class="o">(</span><span class="n">topic</span><span class="o">,</span> <span class="n">payload</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<h3 id="mqtt-spout-parallelism">MQTT Spout Parallelism</h3>
<p>It&#39;s recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances
of the spout subscribed to the same topic(s), resulting in duplicate messages.</p>
<p>If you want to parallelize the spout, it&#39;s recommended that you use multiple instances of the spout in your topolgoy
and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined
by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do
something like the following:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">String</span> <span class="n">spout1Topic</span> <span class="o">=</span> <span class="s">"users/east/#"</span><span class="o">;</span>
<span class="n">String</span> <span class="n">spout2Topic</span> <span class="o">=</span> <span class="s">"users/west/#"</span><span class="o">;</span>
</code></pre></div>
<p>and then join the resulting streams together by subscribing a bolt to each stream.</p>
<h3 id="using-flux">Using Flux</h3>
<p>The following Flux YAML configuration creates the toplolgy used in the example:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-topology"</span>
<span class="na">components</span><span class="pi">:</span>
<span class="c1">########## MQTT Spout Config ############</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.examples.CustomMessageMapper"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.common.MqttOptions"</span>
<span class="na">properties</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">url"</span>
<span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">tcp://localhost:1883"</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">topics"</span>
<span class="na">value</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">/users/tgoetz/#"</span>
<span class="c1"># topology configuration</span>
<span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span>
<span class="s">topology.max.spout.pending</span><span class="pi">:</span> <span class="s">1000</span>
<span class="c1"># spout definitions</span>
<span class="na">spouts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.spout.MqttSpout"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># bolt definitions</span>
<span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="na">streams</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
</code></pre></div>
<h3 id="using-java">Using Java</h3>
<p>Similarly, you can create the same topology using the Storm Core Java API:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="n">MqttOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MqttOptions</span><span class="o">();</span>
<span class="n">options</span><span class="o">.</span><span class="na">setTopics</span><span class="o">(</span><span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"/users/tgoetz/#"</span><span class="o">));</span>
<span class="n">options</span><span class="o">.</span><span class="na">setCleanConnection</span><span class="o">(</span><span class="kc">false</span><span class="o">);</span>
<span class="n">MqttSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MqttSpout</span><span class="o">(</span><span class="k">new</span> <span class="n">StringMessageMapper</span><span class="o">(),</span> <span class="n">options</span><span class="o">);</span>
<span class="n">MqttBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">LogInfoBolt</span><span class="o">();</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"mqtt-spout"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"log-bolt"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"mqtt-spout"</span><span class="o">);</span>
<span class="k">return</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">();</span>
</code></pre></div>
<h2 id="ssl-tls">SSL/TLS</h2>
<p>If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout
with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates.</p>
<h3 id="ssl-tls-uris">SSL/TLS URIs</h3>
<p>To connect over SSL/TLS use a URI with a prefix of <code>ssl://</code> or <code>tls://</code> instead of <code>tcp://</code>. For further control over
the algorithm, you can specify a specific protocol:</p>
<ul>
<li><code>ssl://</code> Use the JVM default version of the SSL protocol.</li>
<li><code>sslv*://</code> Use a specific version of the SSL protocol, where <code>*</code> is replaced by the version (e.g. <code>sslv3://</code>)</li>
<li><code>tls://</code> Use the JVM default version of the TLS protocol.</li>
<li><code>tlsv*://</code> Use a specific version of the TLS protocol, where <code>*</code> is replaced by the version (e.g. <code>tlsv1.1://</code>)</li>
</ul>
<h3 id="specifying-keystore-truststore-locations">Specifying Keystore/Truststore Locations</h3>
<p>The <code>MqttSpout</code>, <code>MqttBolt</code> and <code>MqttPublishFunction</code> all have constructors that take a <code>KeyStoreLoader</code> instance that
is used to load the certificates required for TLS/SSL connections. For example:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="kd">public</span> <span class="nf">MqttSpout</span><span class="o">(</span><span class="n">MqttMessageMapper</span> <span class="n">type</span><span class="o">,</span> <span class="n">MqttOptions</span> <span class="n">options</span><span class="o">,</span> <span class="n">KeyStoreLoader</span> <span class="n">keyStoreLoader</span><span class="o">)</span>
</code></pre></div>
<p>The <code>DefaultKeyStoreLoader</code> class can be used to load certificates from the local filesystem. Note that the
keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use
<code>DefaultKeyStoreLoader</code> you specify the location of the keystore/truststore file(s), and set the necessary passwords:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DefaultKeyStoreLoader</span> <span class="n">ksl</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultKeyStoreLoader</span><span class="o">(</span><span class="s">"/path/to/keystore.jks"</span><span class="o">,</span> <span class="s">"/path/to/truststore.jks"</span><span class="o">);</span>
<span class="n">ksl</span><span class="o">.</span><span class="na">setKeyStorePassword</span><span class="o">(</span><span class="s">"password"</span><span class="o">);</span>
<span class="n">ksl</span><span class="o">.</span><span class="na">setTrustStorePassword</span><span class="o">(</span><span class="s">"password"</span><span class="o">);</span>
<span class="c1">//...</span>
</code></pre></div>
<p>If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DefaultKeyStoreLoader</span> <span class="n">ksl</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultKeyStoreLoader</span><span class="o">(</span><span class="s">"/path/to/keystore.jks"</span><span class="o">);</span>
<span class="n">ksl</span><span class="o">.</span><span class="na">setKeyStorePassword</span><span class="o">(</span><span class="s">"password"</span><span class="o">);</span>
<span class="c1">//...</span>
</code></pre></div>
<p>SSL/TLS can also be configured using Flux:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-topology"</span>
<span class="na">components</span><span class="pi">:</span>
<span class="c1">########## MQTT Spout Config ############</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.examples.CustomMessageMapper"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keystore-loader"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">keystore.jks"</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">truststore.jks"</span>
<span class="na">properties</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keyPassword"</span>
<span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">password"</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keyStorePassword"</span>
<span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">password"</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">trustStorePassword"</span>
<span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">password"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.common.MqttOptions"</span>
<span class="na">properties</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">url"</span>
<span class="na">value</span><span class="pi">:</span> <span class="s2">"</span><span class="s">ssl://raspberrypi.local:8883"</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">topics"</span>
<span class="na">value</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">/users/tgoetz/#"</span>
<span class="c1"># topology configuration</span>
<span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span>
<span class="s">topology.max.spout.pending</span><span class="pi">:</span> <span class="s">1000</span>
<span class="c1"># spout definitions</span>
<span class="na">spouts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.mqtt.spout.MqttSpout"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-type"</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-options"</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">keystore-loader"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># bolt definitions</span>
<span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="na">streams</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">mqtt-spout"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
</code></pre></div>
<h2 id="committer-sponsors">Committer Sponsors</h2>
<ul>
<li>P. Taylor Goetz (<a href="mailto:ptgoetz@apache.org">ptgoetz@apache.org</a>)</li>
</ul>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>