| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| <!DOCTYPE html> |
| |
| |
| |
| <html lang="en"> |
| <head> |
| <!-- Global site tag (gtag.js) - Google Analytics --> |
| <script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script> |
| <script> |
| window.dataLayer = window.dataLayer || []; |
| function gtag(){dataLayer.push(arguments);} |
| gtag('js', new Date()); |
| |
| gtag('config', 'UA-61232409-1'); |
| </script> |
| |
| |
| |
| <meta charset="UTF-8"> |
| <title>MQTT Streamer | Ignite Documentation</title> |
| |
| <link rel="canonical" href="/docs/mqtt/mqtt-streamer" /> |
| |
| |
| <link rel="stylesheet" href="/assets/css/styles.css?1651672546"> |
| <link rel="stylesheet" href="/assets/css/asciidoc-pygments.css"> |
| <link rel="shortcut icon" href="/favicon.ico"> |
| <meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'> |
| |
| <link rel="stylesheet" |
| href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> |
| |
| <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1651672546"></script> |
| <script type="text/javascript" src="/assets/js/anchor.min.js?1651672546"></script> |
| |
| |
| </head> |
| <body> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <header> |
| <div class="container"> |
| <button type='button' class='menu' title='Docs menu'> |
| <img src="/assets/images/menu-icon.svg"/> |
| </button> |
| |
| <div class='home'> |
| <a href="/" class='home' title='Apache Ignite home'> |
| <img src="/assets/images/apache_ignite_logo.svg" alt="Apache Ignite logo" width="103" height="36" > |
| </a> |
| </div> |
| |
| <nav> |
| |
| </nav> |
| <form class='search'> |
| <button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button> |
| <input type="search" placeholder="Search…" id="search-input"> |
| </form> |
| <button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button> |
| <button type='button' class='top-nav-toggle'>⋮</button> |
| <a href="https://github.com/ignite" title='GitHub' class='github' target="_blank"> |
| <img src="/assets/images/github-gray.svg" alt="GitHub logo"> |
| </a> |
| </div> |
| </header> |
| |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| <link rel="stylesheet" href="/assets/css/docs.css"> |
| <section class='page-docs'> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| |
| |
| |
| |
| |
| |
| <nav class='left-nav' data-swiftype-index='false'> |
| |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/aws/aws" class='' >Amazon S3 IP Finder</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/camel/camel-streamer" class='' >Apache Camel Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/flink/flink-streamer" class='' >Apache Flink Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/flume/flume-sink" class='' >Apache Flume Sink</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/azure/azure" class='' >Apache Ignite Azure Module</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/gce/gce" class='' >Apache Ignite GCE Module</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/pub-sub/pub-sub" class='' >Apache Ignite Pub/Sub Module</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-boot" class='' >Apache Ignite and Spring Boot</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-data" class='' >Apache Ignite and Spring Data</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-caching" class='' >Apache Ignite and Spring Cache</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-tx" class='' >Apache Ignite and Spring Transactions</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/kafka/kafka-streamer" class='' >Apache Kafka Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/storm/storm-streamer" class='' >Apache Storm Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/cdc/change-data-capture-extensions" class='' >Change Data Capture Extension</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/jms/jms-streamer" class='' >JMS Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/mqtt/mqtt-streamer" class='' >MQTT Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/perf-statistics/performance-statistics" class='' >Performance Statistics Extension</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/rocketmq/rocketmq-streamer" class='' >RocketMQ Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/topology-validator/topology-validator" class='' >Topology Validator</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/twitter/twitter-streamer" class='' >Twitter Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/zeromq/zeromq-streamer" class='' >ZeroMQ Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/zookeeper/zookeeper-ip" class='' >ZooKeeper IP Finder</a> |
| |
| </li> |
| |
| </nav> |
| <div class="left-nav__overlay"></div> |
| |
| |
| <article data-swiftype-index='true'> |
| <a class='edit-link' href="/_docs/mqtt/mqtt-streamer.adoc" target="_blank">Edit</a> |
| |
| <h1>MQTT Streamer</h1> |
| |
| <div class="sect1"> |
| <h2 id="overview">Overview</h2> |
| <div class="sectionbody"> |
| <div class="paragraph"> |
| <p>This streamer consumes from an MQTT topic and feeds key-value pairs into an <code>IgniteDataStreamer</code> instance, using |
| <a href="https://eclipse.org/paho/" target="_blank" rel="noopener">Eclipse Paho</a> as an MQTT client.</p> |
| </div> |
| <div class="paragraph"> |
| <p>You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming |
| message and extract the tuple to insert.</p> |
| </div> |
| <div class="paragraph"> |
| <p>This streamer supports:</p> |
| </div> |
| <div class="ulist"> |
| <ul> |
| <li> |
| <p>Subscribing to a single topic or multiple topics at once.</p> |
| </li> |
| <li> |
| <p>Specifying the subscriber’s QoS for a single topic or for multiple topics.</p> |
| </li> |
| <li> |
| <p>Setting <a href="https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html" target="<em>blank">MqttConnectOptions</a> |
| to enable features like _last will testament</em>, <em>persistent sessions</em>, etc.</p> |
| </li> |
| <li> |
| <p>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.</p> |
| </li> |
| <li> |
| <p>(Re-)Connection retries powered by the <a href="https://github.com/rholder/guava-retrying" target="<em>blank">guava-retrying library</a>. |
| _Retry wait</em> and <em>retry stop</em> policies can be configured.</p> |
| </li> |
| <li> |
| <p>Blocking the start() method until the client is connected for the first time.</p> |
| </li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="example">Example</h2> |
| <div class="sectionbody"> |
| <div class="paragraph"> |
| <p>Here’s a trivial code sample showing how to use this streamer:</p> |
| </div> |
| <code-tabs><code-tab data-tab='Java'><div class="listingblock"> |
| <div class="content"> |
| <pre class="rouge highlight"><code data-lang="java"><span class="c1">// Start Ignite.</span> |
| <span class="nc">Ignite</span> <span class="n">ignite</span> <span class="o">=</span> <span class="nc">Ignition</span><span class="o">.</span><span class="na">start</span><span class="o">();</span> |
| |
| <span class="c1">// Get a data streamer reference.</span> |
| <span class="nc">IgniteDataStreamer</span><span class="o"><</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">></span> <span class="n">dataStreamer</span> <span class="o">=</span> <span class="n">grid</span><span class="o">().</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"mycache"</span><span class="o">);</span> |
| |
| <span class="c1">// Create an MQTT data streamer</span> |
| <span class="nc">MqttStreamer</span><span class="o"><</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">></span> <span class="n">streamer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">MqttStreamer</span><span class="o"><>();</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">setIgnite</span><span class="o">(</span><span class="n">ignite</span><span class="o">);</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">setStreamer</span><span class="o">(</span><span class="n">dataStreamer</span><span class="o">);</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">setBrokerUrl</span><span class="o">(</span><span class="n">brokerUrl</span><span class="o">);</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">setBlockUntilConnected</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> |
| |
| <span class="c1">// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String</span> |
| <span class="c1">// (using Guava here).</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">setSingleTupleExtractor</span><span class="o">(</span><span class="k">new</span> <span class="nc">StreamSingleTupleExtractor</span><span class="o"><</span><span class="nc">MqttMessage</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o"><</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">></span> <span class="nf">extract</span><span class="o">(</span><span class="nc">MqttMessage</span> <span class="n">msg</span><span class="o">)</span> <span class="o">{</span> |
| <span class="nc">List</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">s</span> <span class="o">=</span> <span class="nc">Splitter</span><span class="o">.</span><span class="na">on</span><span class="o">(</span><span class="s">","</span><span class="o">).</span><span class="na">splitToList</span><span class="o">(</span><span class="k">new</span> <span class="nc">String</span><span class="o">(</span><span class="n">msg</span><span class="o">.</span><span class="na">getPayload</span><span class="o">()));</span> |
| |
| <span class="k">return</span> <span class="k">new</span> <span class="nc">GridMapEntry</span><span class="o"><>(</span><span class="nc">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">0</span><span class="o">)),</span> <span class="n">s</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Consume from multiple topics at once.</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">setTopics</span><span class="o">(</span><span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"def"</span><span class="o">,</span> <span class="s">"ghi"</span><span class="o">,</span> <span class="s">"jkl"</span><span class="o">,</span> <span class="s">"mno"</span><span class="o">));</span> |
| |
| <span class="c1">// Start the MQTT Streamer.</span> |
| <span class="n">streamer</span><span class="o">.</span><span class="na">start</span><span class="o">();</span></code></pre> |
| </div> |
| </div></code-tab></code-tabs> |
| <div class="paragraph"> |
| <p>Refer to the Javadocs of the <code>ignite-mqtt-ext</code> module for more info on the available options.</p> |
| </div> |
| </div> |
| </div> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <div class="copyright"> |
| © 2022 The Apache Software Foundation.<br/> |
| Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation. |
| |
| </div> |
| |
| </article> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <nav class="right-nav" data-swiftype-index='false'> |
| <ul class="sectlevel1"> |
| <li><a href="#overview">Overview</a></li> |
| <li><a href="#example">Example</a></li> |
| </ul> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| |
| <footer> |
| </footer> |
| |
| </nav> |
| |
| </section> |
| <script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script> |
| |
| <script> |
| // inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late |
| anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5'); |
| anchors.options = { |
| placement: 'right', |
| visible: 'always' |
| }; |
| </script> |
| </body> |
| <script type='module' src='/assets/js/index.js?1651672546' async></script> |
| </html> |