

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Spark Structured Streaming MQTT</title>
    <meta name="description" content="Spark Structured Streaming MQTT">
    <meta name="author" content="">

    <!-- Enable responsive viewport -->
    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
    <!--[if lt IE 9]>
      <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
    <![endif]-->

    <!-- Le styles -->
    <link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
    <link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
    <link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet"  type="text/css" media="screen" />
    <!-- Le fav and touch icons -->
    <!-- Update these with your own images
    <link rel="shortcut icon" href="images/favicon.ico">
    <link rel="apple-touch-icon" href="images/apple-touch-icon.png">
    <link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
    <link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
  -->

    <!-- make tables sortable by adding class tag "sortable" to table elements -->
    <script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>


  </head>

  <body>

    

<!-- Navigation -->
<div id="nav-bar">
  <nav id="nav-container" class="navbar navbar-inverse " role="navigation">
    <div class="container">
      <!-- Brand and toggle get grouped for better mobile display -->

      <div class="navbar-header page-scroll">
        <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
          <span class="sr-only">Toggle navigation</span>
          <span class="icon-bar"></span>
          <span class="icon-bar"></span>
          <span class="icon-bar"></span>
        </button>
        <a class="navbar-brand page-scroll" href="/#home">Home</a>
      </div>
      <!-- Collect the nav links, forms, and other content for toggling -->
      <nav class="navbar-collapse collapse" role="navigation">
        <ul class="nav navbar-nav">
          
          
          
          <li id="download">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
              
              
              <li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="community">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/community" target="_self">Get Involved</a></li>
              
              
              <li><a href="/contributing" target="_self">Contributing</a></li>
              
              
              <li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
              
              
              <li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
              
              
              <li><a href="/community#source-code" target="_self">Source Code</a></li>
              
              
              <li><a href="/community-members" target="_self">Project Committers</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="documentation">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
              
              
              <li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="github">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
              
              
              <li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
              
              
              <li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="apache">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
              
              
              <li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
              
              
              <li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
              
              
              <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
              
              
              <li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
              
            </ul>
            
          </li>
          
          
        </ul>
      </nav><!--/.navbar-collapse -->
      <!-- /.navbar-collapse -->
    </div>
    <!-- /.container -->
  </nav>
</div>


    <div class="container">

      

<!--<div class="hero-unit Spark Structured Streaming MQTT">
  <h1></h1>
</div>
-->

<div class="row">
  <div class="col-md-12">
    <!--

-->

<p>A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.).</p>

<h2 id="linking">Linking</h2>

<p>Using SBT:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.1.2"
</code></pre></div></div>

<p>Using Maven:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
    &lt;artifactId&gt;spark-sql-streaming-mqtt_2.11&lt;/artifactId&gt;
    &lt;version&gt;2.1.2&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>

<p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option.
For example, to include it when starting the spark shell:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.2
</code></pre></div></div>

<p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p>

<p>This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.</p>

<h2 id="examples">Examples</h2>

<p>A SQL Stream can be created with data streams received through MQTT Server using,</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>sqlContext.readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("topic", "mytopic")
    .load("tcp://localhost:1883")
</code></pre></div></div>

<h2 id="enable-recovering-from-failures">Enable recovering from failures.</h2>

<p>Setting values for option <code class="language-plaintext highlighter-rouge">localStorage</code> and <code class="language-plaintext highlighter-rouge">clientId</code> helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>sqlContext.readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("topic", "mytopic")
    .option("localStorage", "/path/to/localdir")
    .option("clientId", "some-client-id")
    .load("tcp://localhost:1883")
</code></pre></div></div>

<h2 id="configuration-options">Configuration options.</h2>

<p>This source uses <a href="https://eclipse.org/paho/clients/java/">Eclipse Paho Java Client</a>. Client API documentation is located <a href="http://www.eclipse.org/paho/files/javadoc/index.html">here</a>.</p>

<ul>
  <li><code class="language-plaintext highlighter-rouge">brokerUrl</code> A url MqttClient connects to. Set this or <code class="language-plaintext highlighter-rouge">path</code> as the url of the Mqtt Server. e.g. tcp://localhost:1883.</li>
  <li><code class="language-plaintext highlighter-rouge">persistence</code> By default it is used for storing incoming messages on disk. If <code class="language-plaintext highlighter-rouge">memory</code> is provided as value for this option, then recovery on restart is not supported.</li>
  <li><code class="language-plaintext highlighter-rouge">topic</code> Topic MqttClient subscribes to.</li>
  <li><code class="language-plaintext highlighter-rouge">clientId</code> clientId, this client is assoicated with. Provide the same value to recover a stopped client.</li>
  <li><code class="language-plaintext highlighter-rouge">QoS</code> The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.</li>
  <li><code class="language-plaintext highlighter-rouge">username</code> Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.</li>
  <li><code class="language-plaintext highlighter-rouge">password</code> Sets the password to use for the connection.</li>
  <li><code class="language-plaintext highlighter-rouge">cleanSession</code> Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.</li>
  <li><code class="language-plaintext highlighter-rouge">connectionTimeout</code> Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See <code class="language-plaintext highlighter-rouge">MqttConnectOptions.setConnectionTimeout</code> for more information.</li>
  <li><code class="language-plaintext highlighter-rouge">keepAlive</code> Same as <code class="language-plaintext highlighter-rouge">MqttConnectOptions.setKeepAliveInterval</code>.</li>
  <li><code class="language-plaintext highlighter-rouge">mqttVersion</code> Same as <code class="language-plaintext highlighter-rouge">MqttConnectOptions.setMqttVersion</code>.</li>
</ul>

<h3 id="scala-api">Scala API</h3>

<p>An example, for scala API to count words from incoming message stream.</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic)
  .load(brokerUrl).as[(String, Timestamp)]

// Split the lines into words
val words = lines.map(_._1).flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
</code></pre></div></div>

<p>Please see <code class="language-plaintext highlighter-rouge">MQTTStreamWordCount.scala</code> for full example.</p>

<h3 id="java-api">Java API</h3>

<p>An example, for Java API to count words from incoming message stream.</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>// Create DataFrame representing the stream of input lines from connection to mqtt server.
Dataset&lt;String&gt; lines = spark
        .readStream()
        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
        .option("topic", topic)
        .load(brokerUrl).select("value").as(Encoders.STRING());

// Split the lines into words
Dataset&lt;String&gt; words = lines.flatMap(new FlatMapFunction&lt;String, String&gt;() {
    @Override
    public Iterator&lt;String&gt; call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
    }
}, Encoders.STRING());

// Generate running word count
Dataset&lt;Row&gt; wordCounts = words.groupBy("value").count();

// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
        .outputMode("complete")
        .format("console")
        .start();

query.awaitTermination();
</code></pre></div></div>

<p>Please see <code class="language-plaintext highlighter-rouge">JavaMQTTStreamWordCount.java</code> for full example.</p>


  </div>
</div>



      <hr>

      <!-- <p>&copy; 2021 </p>-->
      <footer class="site-footer">
    <div class="wrapper">
        <div class="footer-col-wrapper">
            
            <div style="text-align:center;">
                
                <div>
                    Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
                    Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
                    <br>
                    
                    Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
                    
                </div>
            </div>
        </div>
    </div>
</footer>

    </div>

    


  <script type="text/javascript">
  (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
  (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
  m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
  })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

  ga('create', 'UA-79140859-1', 'bahir.apache.org');
  ga('require', 'linkid', 'linkid.js');
  ga('send', 'pageview');

</script>



    <script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>

    <script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>


  </body>
</html>

