blob: 3b9c352d8864bb947760b220c9da7c821e49e9ae [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Spark Structured Streaming Akka</title>
<meta name="description" content="Spark Structured Streaming Akka">
<meta name="author" content="">
<!-- Enable responsive viewport -->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
<!--[if lt IE 9]>
<script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
<!-- Le styles -->
<link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
<link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
<link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" />
<!-- Le fav and touch icons -->
<!-- Update these with your own images
<link rel="shortcut icon" href="images/favicon.ico">
<link rel="apple-touch-icon" href="images/apple-touch-icon.png">
<link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
<link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
-->
<!-- make tables sortable by adding class tag "sortable" to table elements -->
<script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>
</head>
<body>
<!-- Navigation -->
<div id="nav-bar">
<nav id="nav-container" class="navbar navbar-inverse " role="navigation">
<div class="container">
<!-- Brand and toggle get grouped for better mobile display -->
<div class="navbar-header page-scroll">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand page-scroll" href="/#home">Home</a>
</div>
<!-- Collect the nav links, forms, and other content for toggling -->
<nav class="navbar-collapse collapse" role="navigation">
<ul class="nav navbar-nav">
<li id="download">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="community">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/community" target="_self">Get Involved</a></li>
<li><a href="/contributing" target="_self">Contributing</a></li>
<li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
<li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
<li><a href="/community#source-code" target="_self">Source Code</a></li>
<li><a href="/community-members" target="_self">Project Committers</a></li>
</ul>
</li>
<li id="documentation">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="github">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
<li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
<li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
</ul>
</li>
<li id="apache">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
<li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
</ul>
</li>
</ul>
</nav><!--/.navbar-collapse -->
<!-- /.navbar-collapse -->
</div>
<!-- /.container -->
</nav>
</div>
<div class="container">
<!--<div class="hero-unit Spark Structured Streaming Akka">
<h1></h1>
</div>
-->
<div class="row">
<div class="col-md-12">
<!--
-->
<p>A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.).</p>
<h2 id="linking">Linking</h2>
<p>Using SBT:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.2.1"
</code></pre></div></div>
<p>Using Maven:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
&lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
&lt;artifactId&gt;spark-sql-streaming-akka_2.11&lt;/artifactId&gt;
&lt;version&gt;2.2.1&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>
<p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option.
For example, to include it when starting the spark shell:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.2.1
</code></pre></div></div>
<p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p>
<p>This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.</p>
<h2 id="examples">Examples</h2>
<p>A SQL Stream can be created with data streams received from Akka Feeder actor using,</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> sqlContext.readStream
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
.option("urlOfPublisher", "feederActorUri")
.load()
</code></pre></div></div>
<h2 id="enable-recovering-from-failures">Enable recovering from failures.</h2>
<p>Setting values for option <code class="language-plaintext highlighter-rouge">persistenceDirPath</code> helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> sqlContext.readStream
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
.option("urlOfPublisher", "feederActorUri")
.option("persistenceDirPath", "/path/to/localdir")
.load()
</code></pre></div></div>
<h2 id="configuration-options">Configuration options.</h2>
<p>This source uses <a href="http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html">Akka Actor api</a>.</p>
<ul>
<li><code class="language-plaintext highlighter-rouge">urlOfPublisher</code> The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.</li>
<li><code class="language-plaintext highlighter-rouge">persistenceDirPath</code> By default it is used for storing incoming messages on disk.</li>
</ul>
<h3 id="scala-api">Scala API</h3>
<p>An example, for scala API to count words from incoming message stream.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> // Create DataFrame representing the stream of input lines from connection
// to publisher or feeder actor
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
.option("urlOfPublisher", urlOfPublisher)
.load().as[(String, Timestamp)]
// Split the lines into words
val words = lines.map(_._1).flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
</code></pre></div></div>
<p>Please see <code class="language-plaintext highlighter-rouge">AkkaStreamWordCount.scala</code> for full example.</p>
<h3 id="java-api">Java API</h3>
<p>An example, for Java API to count words from incoming message stream.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> // Create DataFrame representing the stream of input lines from connection
// to publisher or feeder actor
Dataset&lt;String&gt; lines = spark
.readStream()
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
.option("urlOfPublisher", urlOfPublisher)
.load().select("value").as(Encoders.STRING());
// Split the lines into words
Dataset&lt;String&gt; words = lines.flatMap(new FlatMapFunction&lt;String, String&gt;() {
@Override
public Iterator&lt;String&gt; call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
}, Encoders.STRING());
// Generate running word count
Dataset&lt;Row&gt; wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
</code></pre></div></div>
<p>Please see <code class="language-plaintext highlighter-rouge">JavaAkkaStreamWordCount.java</code> for full example.</p>
</div>
</div>
<hr>
<!-- <p>&copy; 2021 </p>-->
<footer class="site-footer">
<div class="wrapper">
<div class="footer-col-wrapper">
<div style="text-align:center;">
<div>
Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
<br>
Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
</div>
</div>
</div>
</div>
</footer>
</div>
<script type="text/javascript">
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-79140859-1', 'bahir.apache.org');
ga('require', 'linkid', 'linkid.js');
ga('send', 'pageview');
</script>
<script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>
<script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>
</body>
</html>