

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Spark Structured Streaming MQTT</title>
    <meta name="description" content="Spark Structured Streaming MQTT">
    <meta name="author" content="">

    <!-- Enable responsive viewport -->
    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
    <!--[if lt IE 9]>
      <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
    <![endif]-->

    <!-- Le styles -->
    <link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
    <link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
    <link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet"  type="text/css" media="screen" />
    <!-- Le fav and touch icons -->
    <!-- Update these with your own images
    <link rel="shortcut icon" href="images/favicon.ico">
    <link rel="apple-touch-icon" href="images/apple-touch-icon.png">
    <link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
    <link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
  -->

    <!-- make tables sortable by adding class tag "sortable" to table elements -->
    <script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>


  </head>

  <body>

    

<!-- Navigation -->
<div id="nav-bar">
  <nav id="nav-container" class="navbar navbar-inverse " role="navigation">
    <div class="container">
      <!-- Brand and toggle get grouped for better mobile display -->

      <div class="navbar-header page-scroll">
        <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
          <span class="sr-only">Toggle navigation</span>
          <span class="icon-bar"></span>
          <span class="icon-bar"></span>
          <span class="icon-bar"></span>
        </button>
        <a class="navbar-brand page-scroll" href="/#home">Home</a>
      </div>
      <!-- Collect the nav links, forms, and other content for toggling -->
      <nav class="navbar-collapse collapse" role="navigation">
        <ul class="nav navbar-nav">
          
          
          
          <li id="download">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
              
              
              <li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="community">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/community" target="_self">Get Involved</a></li>
              
              
              <li><a href="/contributing" target="_self">Contributing</a></li>
              
              
              <li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
              
              
              <li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
              
              
              <li><a href="/community#source-code" target="_self">Source Code</a></li>
              
              
              <li><a href="/community-members" target="_self">Project Committers</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="documentation">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
              
              
              <li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="github">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
              
              
              <li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
              
              
              <li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="apache">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
              
              
              <li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
              
              
              <li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
              
              
              <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
              
              
              <li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
              
            </ul>
            
          </li>
          
          
        </ul>
      </nav><!--/.navbar-collapse -->
      <!-- /.navbar-collapse -->
    </div>
    <!-- /.container -->
  </nav>
</div>


    <div class="container">

      

<!--<div class="hero-unit Spark Structured Streaming MQTT">
  <h1></h1>
</div>
-->

<div class="row">
  <div class="col-md-12">
    <!--

-->

<p><a href="http://mqtt.org/">MQTT</a> is MQTT is a machine-to-machine (M2M)/”Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.</p>

<h2 id="linking">Linking</h2>

<p>Using SBT:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.0.1"
</code></pre></div></div>

<p>Using Maven:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
    &lt;artifactId&gt;spark-streaming-mqtt_2.11&lt;/artifactId&gt;
    &lt;version&gt;2.0.1&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>

<p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option.
For example, to include it when starting the spark shell:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.0.1
</code></pre></div></div>

<p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p>

<p>This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.</p>

<h2 id="configuration-options">Configuration options.</h2>

<p>This source uses the <a href="https://eclipse.org/paho/clients/java/">Eclipse Paho Java Client</a>. Client API documentation is located <a href="http://www.eclipse.org/paho/files/javadoc/index.html">here</a>.</p>

<ul>
  <li><code class="language-plaintext highlighter-rouge">brokerUrl</code> A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.</li>
  <li><code class="language-plaintext highlighter-rouge">storageLevel</code> By default it is used for storing incoming messages on disk.</li>
  <li><code class="language-plaintext highlighter-rouge">topic</code> Topic MqttClient subscribes to.</li>
  <li><code class="language-plaintext highlighter-rouge">clientId</code> clientId, this client is assoicated with. Provide the same value to recover a stopped client.</li>
  <li><code class="language-plaintext highlighter-rouge">QoS</code> The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.</li>
  <li><code class="language-plaintext highlighter-rouge">username</code> Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.</li>
  <li><code class="language-plaintext highlighter-rouge">password</code> Sets the password to use for the connection.</li>
  <li><code class="language-plaintext highlighter-rouge">cleanSession</code> Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.</li>
  <li><code class="language-plaintext highlighter-rouge">connectionTimeout</code> Sets the connection timeout, a value of 0 is interpreted as wait until client connects. See <code class="language-plaintext highlighter-rouge">MqttConnectOptions.setConnectionTimeout</code> for more information.</li>
  <li><code class="language-plaintext highlighter-rouge">keepAlive</code> Same as <code class="language-plaintext highlighter-rouge">MqttConnectOptions.setKeepAliveInterval</code>.</li>
  <li><code class="language-plaintext highlighter-rouge">mqttVersion</code> Same as <code class="language-plaintext highlighter-rouge">MqttConnectOptions.setMqttVersion</code>.</li>
</ul>

<h2 id="examples">Examples</h2>

<h3 id="scala-api">Scala API</h3>

<p>You need to extend <code class="language-plaintext highlighter-rouge">ActorReceiver</code> so as to store received data into Spark using <code class="language-plaintext highlighter-rouge">store(...)</code> methods. The supervisor strategy of
this actor can be configured to handle failures, etc.</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
</code></pre></div></div>

<p>Additional mqtt connection options can be provided:</p>

<pre><code class="language-Scala">val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
</code></pre>

<h3 id="java-api">Java API</h3>

<p>You need to extend <code class="language-plaintext highlighter-rouge">JavaActorReceiver</code> so as to store received data into Spark using <code class="language-plaintext highlighter-rouge">store(...)</code> methods. The supervisor strategy of
this actor can be configured to handle failures, etc.</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>JavaDStream&lt;String&gt; lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
</code></pre></div></div>

<p>See end-to-end examples at <a href="https://github.com/apache/bahir/tree/master/streaming-mqtt/examples">MQTT Examples</a></p>

  </div>
</div>



      <hr>

      <!-- <p>&copy; 2021 </p>-->
      <footer class="site-footer">
    <div class="wrapper">
        <div class="footer-col-wrapper">
            
            <div style="text-align:center;">
                
                <div>
                    Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
                    Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
                    <br>
                    
                    Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
                    
                </div>
            </div>
        </div>
    </div>
</footer>

    </div>

    


  <script type="text/javascript">
  (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
  (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
  m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
  })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

  ga('create', 'UA-79140859-1', 'bahir.apache.org');
  ga('require', 'linkid', 'linkid.js');
  ga('send', 'pageview');

</script>



    <script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>

    <script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>


  </body>
</html>

