| |
| |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <title>Spark Structured Streaming MQTT</title> |
| <meta name="description" content="Spark Structured Streaming MQTT"> |
| <meta name="author" content=""> |
| |
| <!-- Enable responsive viewport --> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <!-- Le HTML5 shim, for IE6-8 support of HTML elements --> |
| <!--[if lt IE 9]> |
| <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script> |
| <![endif]--> |
| |
| <!-- Le styles --> |
| <link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet"> |
| <link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css"> |
| <link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" /> |
| <!-- Le fav and touch icons --> |
| <!-- Update these with your own images |
| <link rel="shortcut icon" href="images/favicon.ico"> |
| <link rel="apple-touch-icon" href="images/apple-touch-icon.png"> |
| <link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png"> |
| <link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png"> |
| --> |
| |
| <!-- make tables sortable by adding class tag "sortable" to table elements --> |
| <script src="/js/sorttable.js"></script> |
| |
| |
| </head> |
| |
| <body> |
| |
| |
| |
| <!-- Navigation --> |
| <div id="nav-bar"> |
| <nav id="nav-container" class="navbar navbar-inverse " role="navigation"> |
| <div class="container"> |
| <!-- Brand and toggle get grouped for better mobile display --> |
| |
| <div class="navbar-header page-scroll"> |
| <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <a class="navbar-brand page-scroll" href="/#home">Home</a> |
| </div> |
| <!-- Collect the nav links, forms, and other content for toggling --> |
| <nav class="navbar-collapse collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| |
| |
| |
| <li id="download"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="community"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/community" target="_self">Get Involved</a></li> |
| |
| |
| <li><a href="/contributing" target="_self">Contributing</a></li> |
| |
| |
| <li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li> |
| |
| |
| <li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li> |
| |
| |
| <li><a href="/community#source-code" target="_self">Source Code</a></li> |
| |
| |
| <li><a href="/community-members" target="_self">Project Committers</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="documentation"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="github"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li> |
| |
| |
| <li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="apache"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li> |
| |
| |
| <li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li> |
| |
| |
| <li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li> |
| |
| |
| <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li> |
| |
| |
| <li><a href="/privacy-policy" target="_self">Privacy Policy</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| </ul> |
| </nav><!--/.navbar-collapse --> |
| <!-- /.navbar-collapse --> |
| </div> |
| <!-- /.container --> |
| </nav> |
| </div> |
| |
| |
| <div class="container"> |
| |
| |
| |
| <!--<div class="hero-unit Spark Structured Streaming MQTT"> |
| <h1></h1> |
| </div> |
| --> |
| |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- |
| |
| --> |
| |
| <!-- |
| |
| --> |
| <h1 id="spark-sql-streaming-mqtt-data-source">Spark SQL Streaming MQTT Data Source</h1> |
| |
| <p>A library for writing and reading data from MQTT Servers using Spark SQL Streaming (or Structured streaming).</p> |
| |
| <h2 id="linking">Linking</h2> |
| |
| <p>Using SBT:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.4.0-SNAPSHOT" |
| </code></pre></div></div> |
| |
| <p>Using Maven:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code><dependency> |
| <groupId>org.apache.bahir</groupId> |
| <artifactId>spark-sql-streaming-mqtt_2.11</artifactId> |
| <version>2.4.0-SNAPSHOT</version> |
| </dependency> |
| </code></pre></div></div> |
| |
| <p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option. |
| For example, to include it when starting the spark shell:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.4.0-SNAPSHOT |
| </code></pre></div></div> |
| |
| <p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath. |
| The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p> |
| |
| <p>This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.</p> |
| |
| <h2 id="examples">Examples</h2> |
| |
| <p>SQL Stream can be created with data streams received through MQTT Server using:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>sqlContext.readStream |
| .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") |
| .option("topic", "mytopic") |
| .load("tcp://localhost:1883") |
| </code></pre></div></div> |
| |
| <p>SQL Stream may be also transferred into MQTT messages using:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>sqlContext.writeStream |
| .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider") |
| .option("checkpointLocation", "/path/to/localdir") |
| .outputMode("complete") |
| .option("topic", "mytopic") |
| .load("tcp://localhost:1883") |
| </code></pre></div></div> |
| |
| <h2 id="source-recovering-from-failures">Source recovering from failures</h2> |
| |
| <p>Setting values for option <code class="language-plaintext highlighter-rouge">localStorage</code> and <code class="language-plaintext highlighter-rouge">clientId</code> helps in recovering in case of source restart, by restoring the state where it left off before the shutdown.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>sqlContext.readStream |
| .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") |
| .option("topic", "mytopic") |
| .option("localStorage", "/path/to/localdir") |
| .option("clientId", "some-client-id") |
| .load("tcp://localhost:1883") |
| </code></pre></div></div> |
| |
| <h2 id="configuration-options">Configuration options</h2> |
| |
| <p>This connector uses <a href="https://eclipse.org/paho/clients/java/">Eclipse Paho Java Client</a>. Client API documentation is located <a href="http://www.eclipse.org/paho/files/javadoc/index.html">here</a>.</p> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Parameter name</th> |
| <th>Description</th> |
| <th>Eclipse Paho reference</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">brokerUrl</code></td> |
| <td>URL MQTT client connects to. Specify this parameter or <em>path</em>. Example: <em>tcp://localhost:1883</em>, <em>ssl://localhost:1883</em>.</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">persistence</code></td> |
| <td>Defines how incoming messages are stored. If <em>memory</em> is provided as value for this option, recovery on restart is not supported. Otherwise messages are stored on disk and parameter <em>localStorage</em> may define target directory.</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">topic</code></td> |
| <td>Topic which client subscribes to.</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">clientId</code></td> |
| <td>Uniquely identifies client instance. Provide the same value to recover a stopped source client. MQTT sink ignores client identifier, because Spark batch can be distributed across multiple workers whereas MQTT broker does not allow simultaneous connections with same ID from multiple hosts.</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">QoS</code></td> |
| <td>The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">username</code></td> |
| <td>User name used to authenticate with MQTT server. Do not set it, if server does not require authentication. Leaving empty may lead to errors.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setUserName</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">password</code></td> |
| <td>User password.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setPassword</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">cleanSession</code></td> |
| <td>Setting to <em>true</em> starts a clean session, removes all check-pointed messages persisted during previous run. Defaults to <code class="language-plaintext highlighter-rouge">false</code>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setCleanSession</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">connectionTimeout</code></td> |
| <td>Sets the connection timeout, a value of <em>0</em> is interpreted as wait until client connects.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setConnectionTimeout</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">keepAlive</code></td> |
| <td>Sets the “keep alive” interval in seconds.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setKeepAliveInterval</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">mqttVersion</code></td> |
| <td>Specify MQTT protocol version.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setMqttVersion</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">maxInflight</code></td> |
| <td>Sets the maximum inflight requests. Useful for high volume traffic.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setMaxInflight</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">autoReconnect</code></td> |
| <td>Sets whether the client will automatically attempt to reconnect to the server upon connectivity disruption.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setAutomaticReconnect</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.protocol</code></td> |
| <td>SSL protocol. Example: <em>SSLv3</em>, <em>TLS</em>, <em>TLSv1</em>, <em>TLSv1.2</em>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.protocol</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.key.store</code></td> |
| <td>Absolute path to key store file.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.keyStore</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.key.store.password</code></td> |
| <td>Key store password.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.keyStorePassword</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.key.store.type</code></td> |
| <td>Key store type. Example: <em>JKS</em>, <em>JCEKS</em>, <em>PKCS12</em>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.keyStoreType</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.key.store.provider</code></td> |
| <td>Key store provider. Example: <em>IBMJCE</em>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.keyStoreProvider</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.trust.store</code></td> |
| <td>Absolute path to trust store file.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.trustStore</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.trust.store.password</code></td> |
| <td>Trust store password.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.trustStorePassword</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.trust.store.type</code></td> |
| <td>Trust store type. Example: <em>JKS</em>, <em>JCEKS</em>, <em>PKCS12</em>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.trustStoreType</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.trust.store.provider</code></td> |
| <td>Trust store provider. Example: <em>IBMJCEFIPS</em>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.trustStoreProvider</code></td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">ssl.ciphers</code></td> |
| <td>List of enabled cipher suites. Example: <em>SSL_RSA_WITH_AES_128_CBC_SHA</em>.</td> |
| <td><code class="language-plaintext highlighter-rouge">MqttConnectOptions.setSSLProperties</code>, <code class="language-plaintext highlighter-rouge">com.ibm.ssl.enabledCipherSuites</code></td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h2 id="environment-variables">Environment variables</h2> |
| |
| <p>Custom environment variables allowing to manage MQTT connectivity performed by sink connector:</p> |
| |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">spark.mqtt.client.connect.attempts</code> Number of attempts sink will try to connect to MQTT broker before failing.</li> |
| <li><code class="language-plaintext highlighter-rouge">spark.mqtt.client.connect.backoff</code> Delay in milliseconds to wait before retrying connection to the server.</li> |
| <li><code class="language-plaintext highlighter-rouge">spark.mqtt.connection.cache.timeout</code> Sink connector caches MQTT connections. Idle connections will be closed after timeout milliseconds.</li> |
| <li><code class="language-plaintext highlighter-rouge">spark.mqtt.client.publish.attempts</code> Number of attempts to publish the message before failing the task.</li> |
| <li><code class="language-plaintext highlighter-rouge">spark.mqtt.client.publish.backoff</code> Delay in milliseconds to wait before retrying send operation.</li> |
| </ul> |
| |
| <h3 id="scala-api">Scala API</h3> |
| |
| <p>An example, for scala API to count words from incoming message stream.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>// Create DataFrame representing the stream of input lines from connection to mqtt server |
| val lines = spark.readStream |
| .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") |
| .option("topic", topic) |
| .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String] |
| |
| // Split the lines into words |
| val words = lines.map(_._1).flatMap(_.split(" ")) |
| |
| // Generate running word count |
| val wordCounts = words.groupBy("value").count() |
| |
| // Start running the query that prints the running counts to the console |
| val query = wordCounts.writeStream |
| .outputMode("complete") |
| .format("console") |
| .start() |
| |
| query.awaitTermination() |
| </code></pre></div></div> |
| |
| <p>Please see <code class="language-plaintext highlighter-rouge">MQTTStreamWordCount.scala</code> for full example. Review <code class="language-plaintext highlighter-rouge">MQTTSinkWordCount.scala</code>, if interested in publishing data to MQTT broker.</p> |
| |
| <h3 id="java-api">Java API</h3> |
| |
| <p>An example, for Java API to count words from incoming message stream.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>// Create DataFrame representing the stream of input lines from connection to mqtt server. |
| Dataset<String> lines = spark |
| .readStream() |
| .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") |
| .option("topic", topic) |
| .load(brokerUrl) |
| .selectExpr("CAST(payload AS STRING)").as(Encoders.STRING()); |
| |
| // Split the lines into words |
| Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { |
| @Override |
| public Iterator<String> call(String x) { |
| return Arrays.asList(x.split(" ")).iterator(); |
| } |
| }, Encoders.STRING()); |
| |
| // Generate running word count |
| Dataset<Row> wordCounts = words.groupBy("value").count(); |
| |
| // Start running the query that prints the running counts to the console |
| StreamingQuery query = wordCounts.writeStream() |
| .outputMode("complete") |
| .format("console") |
| .start(); |
| |
| query.awaitTermination(); |
| </code></pre></div></div> |
| |
| <p>Please see <code class="language-plaintext highlighter-rouge">JavaMQTTStreamWordCount.java</code> for full example. Review <code class="language-plaintext highlighter-rouge">JavaMQTTSinkWordCount.java</code>, if interested in publishing data to MQTT broker.</p> |
| |
| <h2 id="best-practices">Best Practices.</h2> |
| |
| <ol> |
| <li>Turn Mqtt into a more reliable messaging service.</li> |
| </ol> |
| |
| <blockquote> |
| <p><em>MQTT is a machine-to-machine (M2M)/”Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.</em></p> |
| </blockquote> |
| |
| <p>The design of Mqtt and the purpose it serves goes well together, but often in an application it is of utmost value to have reliability. Since mqtt is not a distributed message queue and thus does not offer the highest level of reliability features. It should be redirected via a kafka message queue to take advantage of a distributed message queue. In fact, using a kafka message queue offers a lot of possibilities including a single kafka topic subscribed to several mqtt sources and even a single mqtt stream publishing to multiple kafka topics. Kafka is a reliable and scalable message queue.</p> |
| |
| <ol> |
| <li>Often the message payload is not of the default character encoding or contains binary that needs to be parsed using a particular parser. In such cases, spark mqtt payload should be processed using the external parser. For example:</li> |
| </ol> |
| |
| <ul> |
| <li>Scala API example: |
| <div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code> <span class="c1">// Create DataFrame representing the stream of binary messages</span> |
| <span class="k">val</span> <span class="nv">lines</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">readStream</span> |
| <span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="n">topic</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">load</span><span class="o">(</span><span class="n">brokerUrl</span><span class="o">).</span><span class="py">select</span><span class="o">(</span><span class="s">"payload"</span><span class="o">).</span><span class="py">as</span><span class="o">[</span><span class="kt">Array</span><span class="o">[</span><span class="kt">Byte</span><span class="o">]].</span><span class="py">map</span><span class="o">(</span><span class="nf">externalParser</span><span class="o">(</span><span class="k">_</span><span class="o">))</span> |
| </code></pre></div> </div> |
| </li> |
| <li> |
| <p>Java API example |
| ```java |
| // Create DataFrame representing the stream of binary messages |
| Dataset<byte[]> lines = spark |
| .readStream() |
| .format(“org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider”) |
| .option(“topic”, topic) |
| .load(brokerUrl).selectExpr(“CAST(payload AS BINARY)”).as(Encoders.BINARY());</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> // Split the lines into words |
| Dataset<String> words = lines.map(new MapFunction<byte[], String>() { |
| @Override |
| public String call(byte[] bytes) throws Exception { |
| return new String(bytes); // Plug in external parser here. |
| } |
| }, Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { |
| @Override |
| public Iterator<String> call(String x) { |
| return Arrays.asList(x.split(" ")).iterator(); |
| } |
| }, Encoders.STRING()); |
| </code></pre></div> </div> |
| </li> |
| </ul> |
| |
| <p>```</p> |
| |
| <ol> |
| <li>What is the solution for a situation when there are a large number of varied mqtt sources, each with different schema and throughput characteristics.</li> |
| </ol> |
| |
| <p>Generally, one would create a lot of streaming pipelines to solve this problem. This would either require a very sophisticated scheduling setup or will waste a lot of resources, as it is not certain which stream is using more amount of data.</p> |
| |
| <p>The general solution is both less optimum and is more cumbersome to operate, with multiple moving parts incurs a high maintenance overall. As an alternative, in this situation, one can setup a single topic kafka-spark stream, where message from each of the varied stream contains a unique tag separating one from other streams. This way at the processing end, one can distinguish the message from one another and apply the right kind of decoding and processing. Similarly while storing, each message can be distinguished from others by a tag that distinguishes.</p> |
| |
| </div> |
| </div> |
| |
| |
| |
| <hr> |
| |
| <!-- <p>© 2021 </p>--> |
| <footer class="site-footer"> |
| <div class="wrapper"> |
| <div class="footer-col-wrapper"> |
| |
| <div style="text-align:center;"> |
| |
| <div> |
| Copyright © 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>. |
| Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>. |
| <br> |
| |
| Apache and the Apache Feather logo are trademarks of The Apache Software Foundation. |
| |
| </div> |
| </div> |
| </div> |
| </div> |
| </footer> |
| |
| </div> |
| |
| |
| |
| |
| |
| |
| |
| <script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script> |
| |
| <script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script> |
| |
| |
| </body> |
| </html> |
| |