blob: ca7e09a4ac7d488d489d17a97504409cbd564963 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'UA-61232409-1');
</script>
<meta charset="UTF-8">
<title>MQTT Streamer | Ignite Documentation</title>
<link rel="canonical" href="/docs/mqtt/mqtt-streamer" />
<link rel="stylesheet" href="/assets/css/styles.css?1658382975">
<link rel="stylesheet" href="/assets/css/asciidoc-pygments.css">
<link rel="shortcut icon" href="/favicon.ico">
<meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>
<link rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1658382975"></script>
<script type="text/javascript" src="/assets/js/anchor.min.js?1658382975"></script>
</head>
<body>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--header>
<button type='button' class='menu' title='Docs menu'>
<img src="/assets/images/menu-icon.svg"/>
</button>
<nav>
</nav>
<form class='search'>
<button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button>
<input type="search" placeholder="Search…" id="search-input">
</form>
<button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button>
<button type='button' class='top-nav-toggle'>⋮</button>
<a href="https://github.com/ignite" title='GitHub' class='github' target="_blank">
<img src="/assets/images/github-gray.svg" alt="GitHub logo">
</a>
</header-->
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class='left-nav' data-swiftype-index='false'>
<li>
<a href="/docs/extensions/aws/aws" class=''>Amazon S3 IP Finder</a>
</li>
<li>
<a href="/docs/extensions/camel/camel-streamer" class=''>Apache Camel Streamer</a>
</li>
<li>
<a href="/docs/extensions/flink/flink-streamer" class=''>Apache Flink Streamer</a>
</li>
<li>
<a href="/docs/extensions/flume/flume-sink" class=''>Apache Flume Sink</a>
</li>
<li>
<a href="/docs/extensions/azure/azure" class=''>Apache Ignite Azure Module</a>
</li>
<li>
<a href="/docs/extensions/gce/gce" class=''>Apache Ignite GCE Module</a>
</li>
<li>
<a href="/docs/extensions/pub-sub/pub-sub" class=''>Apache Ignite Pub/Sub Module</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-boot" class=''>Apache Ignite and Spring Boot</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-data" class=''>Apache Ignite and Spring Data</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-caching" class=''>Apache Ignite and Spring Cache</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-sessions" class=''>Apache Ignite and Spring Session</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-tx" class=''>Apache Ignite and Spring Transactions</a>
</li>
<li>
<a href="/docs/extensions/kafka/kafka-streamer" class=''>Apache Kafka Streamer</a>
</li>
<li>
<a href="/docs/extensions/storm/storm-streamer" class=''>Apache Storm Streamer</a>
</li>
<li>
<a href="/docs/extensions/cdc/change-data-capture-extensions" class=''>Change Data Capture Extension</a>
</li>
<li>
<a href="/docs/extensions/jms/jms-streamer" class=''>JMS Streamer</a>
</li>
<li>
<a href="/docs/extensions/mqtt/mqtt-streamer" class=''>MQTT Streamer</a>
</li>
<li>
<a href="/docs/extensions/perf-statistics/performance-statistics" class=''>Performance Statistics Extension</a>
</li>
<li>
<a href="/docs/extensions/rocketmq/rocketmq-streamer" class=''>RocketMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/topology-validator/topology-validator" class=''>Topology Validator</a>
</li>
<li>
<a href="/docs/extensions/twitter/twitter-streamer" class=''>Twitter Streamer</a>
</li>
<li>
<a href="/docs/extensions/zeromq/zeromq-streamer" class=''>ZeroMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/zookeeper/zookeeper-ip" class=''>ZooKeeper IP Finder</a>
</li>
</nav>
<div class="left-nav__overlay"></div>
<article data-swiftype-index='true'>
<a class='edit-link' href="/_docs/mqtt/mqtt-streamer.adoc" target="_blank">Edit</a>
<h1>MQTT Streamer</h1>
<div class="sect1">
<h2 id="overview">Overview</h2>
<div class="sectionbody">
<div class="paragraph">
<p>This streamer consumes from an MQTT topic and feeds key-value pairs into an <code>IgniteDataStreamer</code> instance, using
<a href="https://eclipse.org/paho/" target="_blank" rel="noopener">Eclipse Paho</a> as an MQTT client.</p>
</div>
<div class="paragraph">
<p>You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming
message and extract the tuple to insert.</p>
</div>
<div class="paragraph">
<p>This streamer supports:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Subscribing to a single topic or multiple topics at once.</p>
</li>
<li>
<p>Specifying the subscriber&#8217;s QoS for a single topic or for multiple topics.</p>
</li>
<li>
<p>Setting <a href="https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html" target="<em>blank">MqttConnectOptions</a>
to enable features like _last will testament</em>, <em>persistent sessions</em>, etc.</p>
</li>
<li>
<p>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.</p>
</li>
<li>
<p>(Re-)Connection retries powered by the <a href="https://github.com/rholder/guava-retrying" target="<em>blank">guava-retrying library</a>.
_Retry wait</em> and <em>retry stop</em> policies can be configured.</p>
</li>
<li>
<p>Blocking the start() method until the client is connected for the first time.</p>
</li>
</ul>
</div>
</div>
</div>
<div class="sect1">
<h2 id="example">Example</h2>
<div class="sectionbody">
<div class="paragraph">
<p>Here&#8217;s a trivial code sample showing how to use this streamer:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="c1">// Start Ignite.</span>
<span class="nc">Ignite</span> <span class="n">ignite</span> <span class="o">=</span> <span class="nc">Ignition</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
<span class="c1">// Get a data streamer reference.</span>
<span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">dataStreamer</span> <span class="o">=</span> <span class="n">grid</span><span class="o">().</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"mycache"</span><span class="o">);</span>
<span class="c1">// Create an MQTT data streamer</span>
<span class="nc">MqttStreamer</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">streamer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">MqttStreamer</span><span class="o">&lt;&gt;();</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setIgnite</span><span class="o">(</span><span class="n">ignite</span><span class="o">);</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setStreamer</span><span class="o">(</span><span class="n">dataStreamer</span><span class="o">);</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setBrokerUrl</span><span class="o">(</span><span class="n">brokerUrl</span><span class="o">);</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setBlockUntilConnected</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="c1">// Set a single tuple extractor to extract items in the format 'key,value' where key =&gt; Int, and value =&gt; String</span>
<span class="c1">// (using Guava here).</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setSingleTupleExtractor</span><span class="o">(</span><span class="k">new</span> <span class="nc">StreamSingleTupleExtractor</span><span class="o">&lt;</span><span class="nc">MqttMessage</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="nf">extract</span><span class="o">(</span><span class="nc">MqttMessage</span> <span class="n">msg</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">List</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">s</span> <span class="o">=</span> <span class="nc">Splitter</span><span class="o">.</span><span class="na">on</span><span class="o">(</span><span class="s">","</span><span class="o">).</span><span class="na">splitToList</span><span class="o">(</span><span class="k">new</span> <span class="nc">String</span><span class="o">(</span><span class="n">msg</span><span class="o">.</span><span class="na">getPayload</span><span class="o">()));</span>
<span class="k">return</span> <span class="k">new</span> <span class="nc">GridMapEntry</span><span class="o">&lt;&gt;(</span><span class="nc">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">0</span><span class="o">)),</span> <span class="n">s</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">});</span>
<span class="c1">// Consume from multiple topics at once.</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">setTopics</span><span class="o">(</span><span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"def"</span><span class="o">,</span> <span class="s">"ghi"</span><span class="o">,</span> <span class="s">"jkl"</span><span class="o">,</span> <span class="s">"mno"</span><span class="o">));</span>
<span class="c1">// Start the MQTT Streamer.</span>
<span class="n">streamer</span><span class="o">.</span><span class="na">start</span><span class="o">();</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="paragraph">
<p>Refer to the Javadocs of the <code>ignite-mqtt-ext</code> module for more info on the available options.</p>
</div>
</div>
</div>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div class="copyright">
© 2022 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
</div>
</article>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class="right-nav" data-swiftype-index='false'>
<ul class="sectlevel1">
<li><a href="#overview">Overview</a></li>
<li><a href="#example">Example</a></li>
</ul>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<footer>
</footer>
</nav>
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script>
<script>
// inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late
anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
anchors.options = {
placement: 'right',
visible: 'always'
};
</script>
</body>
<script type='module' src='/assets/js/index.js?1658382975' async></script>
</html>