blob: 8d7d2ef2dde47032b31adbb5b662d554698110bc [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="shortcut icon" href="../../img/favicon.ico">
<title>Write Your 1st App - Apache Gearpump(incubating)</title>
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="../../css/highlight.css">
<script>
// Current page data
var mkdocs_page_name = "Write Your 1st App";
var mkdocs_page_input_path = "dev/dev-write-1st-app.md";
var mkdocs_page_url = "/dev/dev-write-1st-app/index.html";
</script>
<script src="../../js/jquery-2.1.1.min.js"></script>
<script src="../../js/modernizr-2.8.3.min.js"></script>
<script type="text/javascript" src="../../js/highlight.pack.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../../index.html" class="icon icon-home"> Apache Gearpump(incubating)</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li>
<li class="toctree-l1 ">
<a class="" href="../../index.html">Overview</a>
</li>
<li>
<li>
<ul class="subnav">
<li><span>Introduction</span></li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/submit-your-1st-application/index.html">Submit Your 1st Application</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/commandline/index.html">Client Command Line</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/basic-concepts/index.html">Basic Concepts</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/features/index.html">Technical Highlights</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/message-delivery/index.html">Reliable Message Delivery</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/performance-report/index.html">Performance</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/gearpump-internals/index.html">Gearpump Internals</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Deployment</span></li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-local/index.html">Local Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-standalone/index.html">Standalone Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-yarn/index.html">YARN Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-docker/index.html">Docker Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-ui-authentication/index.html">UI Authentication</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-ha/index.html">High Availability</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-msg-delivery/index.html">Reliable Message Delivery</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-configuration/index.html">Configuration</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-resource-isolation/index.html">Resource Isolation</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-security/index.html">YARN Security Guide</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/get-gearpump-distribution/index.html">How to Get Your Gearpump Distribution</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/hardware-requirement/index.html">Hardware Requirement</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Programming Guide</span></li>
<li class="toctree-l1 current">
<a class="current" href="index.html">Write Your 1st App</a>
<ul>
<li class="toctree-l3"><a href="#mavensbt-settings">Maven/Sbt Settings</a></li>
<li class="toctree-l3"><a href="#ide-setup-optional">IDE Setup (Optional)</a></li>
<li class="toctree-l3"><a href="#decide-which-language-and-api-to-use-for-writing">Decide which language and API to use for writing</a></li>
<li class="toctree-l3"><a href="#dsl-version-for-wordcount">DSL version for Wordcount</a></li>
<li><a class="toctree-l4" href="#in-scala">In Scala</a></li>
<li><a class="toctree-l4" href="#in-java">In Java</a></li>
<li class="toctree-l3"><a href="#low-level-api-based-wordcount">Low level API based Wordcount</a></li>
<li><a class="toctree-l4" href="#define-processortask-class-and-partitioner-class">Define Processor(Task) class and Partitioner class</a></li>
<li><a class="toctree-l4" href="#wrap-up-as-an-application">Wrap up as an application</a></li>
<li class="toctree-l3"><a href="#submit-application">Submit application</a></li>
<li class="toctree-l3"><a href="#advanced-topic">Advanced topic</a></li>
<li><a class="toctree-l4" href="#gearpump-for-non-streaming-usage">Gearpump for Non-Streaming Usage</a></li>
</ul>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-custom-serializer/index.html">Customized Message Passing</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-connectors/index.html">Gearpump Connectors</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-storm/index.html">Storm Compatibility</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-ide-setup/index.html">IDE Setup</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-non-streaming-example/index.html">Non Streaming Examples</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-rest-api/index.html">REST API</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../api/scala/index.html">Scala API</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../api/java/index.html">Java API</a>
</li>
</ul>
<li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../index.html">Apache Gearpump(incubating)</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../index.html">Docs</a> &raquo;</li>
<li>Programming Guide &raquo;</li>
<li>Write Your 1st App</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gearpump/edit/master/docs/dev/dev-write-1st-app.md"
class="icon icon-github"> Edit on GitHub</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<p>We'll use <a href="https://github.com/apache/incubator-gearpump/tree/master/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount">wordcount</a> as an example to illustrate how to write Gearpump applications.</p>
<h3 id="mavensbt-settings">Maven/Sbt Settings</h3>
<p>Repository and library dependencies can be found at <a href="http://gearpump.apache.org/downloads.html#maven-dependencies">Maven Setting</a>.</p>
<h3 id="ide-setup-optional">IDE Setup (Optional)</h3>
<p>You can get your preferred IDE ready for Gearpump by following <a href="../dev-ide-setup">this guide</a>.</p>
<h3 id="decide-which-language-and-api-to-use-for-writing">Decide which language and API to use for writing</h3>
<p>Gearpump supports two level APIs:</p>
<ol>
<li>
<p>Low level API, which is more similar to Akka programming, operating on each event. The API document can be found at <a href="http://gearpump.apache.org/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.package">Low Level API Doc</a>.</p>
</li>
<li>
<p>High level API (aka DSL), which is operating on streaming instead of individual event. The API document can be found at <a href="http://gearpump.apache.org/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.dsl.package">DSL API Doc</a>.</p>
</li>
</ol>
<p>And both APIs have their Java version and Scala version.</p>
<p>So, before you writing your first Gearpump application, you need to decide which API to use and which language to use. </p>
<h2 id="dsl-version-for-wordcount">DSL version for Wordcount</h2>
<p>The easiest way to write your streaming application is to write it with Gearpump DSL.
Below will demostrate how to write WordCount application via Gearpump DSL.</p>
<h4 id="in-scala">In Scala</h4>
<pre class="codehilite"><code class="language-scala">/** WordCount with High level DSL */
object WordCount extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] = Array.empty
override def main(akkaConf: Config, args: Array[String]): Unit = {
val context = ClientContext(akkaConf)
val app = StreamApp(&quot;dsl&quot;, context)
val data = &quot;This is a good start, bingo!! bingo!!&quot;
//count for each word and output to log
app.source(data.lines.toList, 1, &quot;source&quot;).
// word =&gt; (word, count)
flatMap(line =&gt; line.split(&quot;[\\s]+&quot;)).map((_, 1)).
// (word, count1), (word, count2) =&gt; (word, count1 + count2)
groupByKey().sum.log
val appId = context.submit(app)
context.close()
}
}</code></pre>
<h4 id="in-java">In Java</h4>
<pre class="codehilite"><code class="language-java">/** Java version of WordCount with high level DSL API */
public class WordCount {
public static void main(String[] args) throws InterruptedException {
main(ClusterConfig.defaultConfig(), args);
}
public static void main(Config akkaConf, String[] args) throws InterruptedException {
ClientContext context = new ClientContext(akkaConf);
JavaStreamApp app = new JavaStreamApp(&quot;JavaDSL&quot;, context, UserConfig.empty());
List&lt;String&gt; source = Lists.newArrayList(&quot;This is a good start, bingo!! bingo!!&quot;);
//create a stream from the string list.
JavaStream&lt;String&gt; sentence = app.source(source, 1, UserConfig.empty(), &quot;source&quot;);
//tokenize the strings and create a new stream
JavaStream&lt;String&gt; words = sentence.flatMap(new FlatMapFunction&lt;String, String&gt;() {
@Override
public Iterator&lt;String&gt; apply(String s) {
return Lists.newArrayList(s.split(&quot;\\s+&quot;)).iterator();
}
}, &quot;flatMap&quot;);
//map each string as (string, 1) pair
JavaStream&lt;Tuple2&lt;String, Integer&gt;&gt; ones = words.map(new MapFunction&lt;String, Tuple2&lt;String, Integer&gt;&gt;() {
@Override
public Tuple2&lt;String, Integer&gt; apply(String s) {
return new Tuple2&lt;String, Integer&gt;(s, 1);
}
}, &quot;map&quot;);
//group by according to string
JavaStream&lt;Tuple2&lt;String, Integer&gt;&gt; groupedOnes = ones.groupBy(new GroupByFunction&lt;Tuple2&lt;String, Integer&gt;, String&gt;() {
@Override
public String apply(Tuple2&lt;String, Integer&gt; tuple) {
return tuple._1();
}
}, 1, &quot;groupBy&quot;);
//for each group, make the sum
JavaStream&lt;Tuple2&lt;String, Integer&gt;&gt; wordcount = groupedOnes.reduce(new ReduceFunction&lt;Tuple2&lt;String, Integer&gt;&gt;() {
@Override
public Tuple2&lt;String, Integer&gt; apply(Tuple2&lt;String, Integer&gt; t1, Tuple2&lt;String, Integer&gt; t2) {
return new Tuple2&lt;String, Integer&gt;(t1._1(), t1._2() + t2._2());
}
}, &quot;reduce&quot;);
//output result using log
wordcount.log();
app.run();
context.close();
}
}</code></pre>
<h2 id="low-level-api-based-wordcount">Low level API based Wordcount</h2>
<h3 id="define-processortask-class-and-partitioner-class">Define Processor(Task) class and Partitioner class</h3>
<p>An application is a Directed Acyclic Graph (DAG) of processors. In the wordcount example, We will firstly define two processors <code>Split</code> and <code>Sum</code>, and then weave them together.</p>
<h4 id="split-processor">Split processor</h4>
<p>In the <code>Split</code> processor, we simply split a predefined text (the content is simplified for conciseness) and send out each split word to <code>Sum</code>.</p>
<h4 id="in-scala_1">In Scala</h4>
<pre class="codehilite"><code class="language-scala">class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
override def onStart(startTime : StartTime) : Unit = {
self ! Message(&quot;start&quot;)
}
override def onNext(msg : Message) : Unit = {
Split.TEXT_TO_SPLIT.lines.foreach { line =&gt;
line.split(&quot;[\\s]+&quot;).filter(_.nonEmpty).foreach { msg =&gt;
output(new Message(msg, System.currentTimeMillis()))
}
}
self ! Message(&quot;continue&quot;, System.currentTimeMillis())
}
}
object Split {
val TEXT_TO_SPLIT = &quot;some text&quot;
}</code></pre>
<h4 id="in-java_1">In Java</h4>
<pre class="codehilite"><code class="language-java">public class Split extends Task {
public static String TEXT = &quot;This is a good start for java! bingo! bingo! &quot;;
public Split(TaskContext taskContext, UserConfig userConf) {
super(taskContext, userConf);
}
private Long now() {
return System.currentTimeMillis();
}
@Override
public void onStart(StartTime startTime) {
self().tell(new Message(&quot;start&quot;, now()), self());
}
@Override
public void onNext(Message msg) {
// Split the TEXT to words
String[] words = TEXT.split(&quot; &quot;);
for (int i = 0; i &lt; words.length; i++) {
context.output(new Message(words[i], now()));
}
self().tell(new Message(&quot;next&quot;, now()), self());
}
}
```</code></pre>
<p>Essentially, each processor consists of two descriptions:</p>
<ol>
<li>
<p>A <code>Task</code> to define the operation.</p>
</li>
<li>
<p>A parallelism level to define the number of tasks of this processor in parallel. </p>
</li>
</ol>
<p>Just like <code>Split</code>, every processor extends <code>Task</code>. The <code>onStart</code> method is called once before any message comes in; <code>onNext</code> method is called to process every incoming message. Note that Gearpump employs the message-driven model and that's why Split sends itself a message at the end of <code>onStart</code> and <code>onNext</code> to trigger next message processing.</p>
<h4 id="sum-processor">Sum Processor</h4>
<p>The structure of <code>Sum</code> processor looks much alike. <code>Sum</code> does not need to send messages to itself since it receives messages from <code>Split</code>.</p>
<h4 id="in-scala_2">In Scala</h4>
<pre class="codehilite"><code class="language-scala">class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
private[wordcount] var wordCount : Long = 0
private var snapShotTime : Long = System.currentTimeMillis()
private var snapShotWordCount : Long = 0
private var scheduler : Cancellable = null
override def onStart(startTime : StartTime) : Unit = {
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount)
}
override def onNext(msg : Message) : Unit = {
if (null == msg) {
return
}
val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
wordCount += 1
map.put(msg.msg.asInstanceOf[String], current + 1)
}
override def onStop() : Unit = {
if (scheduler != null) {
scheduler.cancel()
}
}
def reportWordCount() : Unit = {
val current : Long = System.currentTimeMillis()
LOG.info(s&quot;Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)&quot;)
snapShotWordCount = wordCount
snapShotTime = current
}
}</code></pre>
<h4 id="in-java_2">In Java</h4>
<pre class="codehilite"><code class="language-java">public class Sum extends Task {
private Logger LOG = super.LOG();
private HashMap&lt;String, Integer&gt; wordCount = new HashMap&lt;String, Integer&gt;();
public Sum(TaskContext taskContext, UserConfig userConf) {
super(taskContext, userConf);
}
@Override
public void onStart(StartTime startTime) {
//skip
}
@Override
public void onNext(Message messagePayLoad) {
String word = (String) (messagePayLoad.msg());
Integer current = wordCount.get(word);
if (current == null) {
current = 0;
}
Integer newCount = current + 1;
wordCount.put(word, newCount);
}
}</code></pre>
<p>Besides counting the sum, in Scala version, we also define a scheduler to report throughput every 5 seconds. The scheduler should be cancelled when the computation completes, which could be accomplished overriding the <code>onStop</code> method. The default implementation of <code>onStop</code> is a no-op.</p>
<h4 id="partitioner">Partitioner</h4>
<p>A processor could be parallelized to a list of tasks. A <code>Partitioner</code> defines how the data is shuffled among tasks of Split and Sum. Gearpump has already provided two partitioners</p>
<ul>
<li><code>HashPartitioner</code>: partitions data based on the message's hashcode</li>
<li><code>ShufflePartitioner</code>: partitions data in a round-robin way.</li>
</ul>
<p>You could define your own partitioner by extending the <code>Partitioner</code> trait/interface and overriding the <code>getPartition</code> method.</p>
<pre class="codehilite"><code class="language-scala">trait Partitioner extends Serializable {
def getPartition(msg : Message, partitionNum : Int) : Int
}</code></pre>
<h3 id="wrap-up-as-an-application">Wrap up as an application</h3>
<p>Now, we are able to write our application class, weaving the above components together.</p>
<p>The application class extends <code>App</code> and `ArgumentsParser which make it easier to parse arguments and run main functions.</p>
<h4 id="in-scala_3">In Scala</h4>
<pre class="codehilite"><code class="language-scala">object WordCount extends App with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
val RUN_FOR_EVER = -1
override val options: Array[(String, CLIOption[Any])] = Array(
&quot;split&quot; -&gt; CLIOption[Int](&quot;&lt;how many split tasks&gt;&quot;, required = false, defaultValue = Some(1)),
&quot;sum&quot; -&gt; CLIOption[Int](&quot;&lt;how many sum tasks&gt;&quot;, required = false, defaultValue = Some(1))
)
def application(config: ParseResult) : StreamApplication = {
val splitNum = config.getInt(&quot;split&quot;)
val sumNum = config.getInt(&quot;sum&quot;)
val partitioner = new HashPartitioner()
val split = Processor[Split](splitNum)
val sum = Processor[Sum](sumNum)
val app = StreamApplication(&quot;wordCount&quot;, Graph[Processor[_ &lt;: Task], Partitioner](split ~ partitioner ~&gt; sum), UserConfig.empty)
app
}
val config = parse(args)
val context = ClientContext()
val appId = context.submit(application(config))
context.close()
}</code></pre>
<p>We override <code>options</code> value and define an array of command line arguments to parse. We want application users to pass in masters' hosts and ports, the parallelism of split and sum tasks, and how long to run the example. We also specify whether an option is <code>required</code> and provide <code>defaultValue</code> for some arguments.</p>
<h4 id="in-java_3">In Java</h4>
<pre class="codehilite"><code class="language-java">/** Java version of WordCount with Processor Graph API */
public class WordCount {
public static void main(String[] args) throws InterruptedException {
main(ClusterConfig.defaultConfig(), args);
}
public static void main(Config akkaConf, String[] args) throws InterruptedException {
// For split task, we config to create two tasks
int splitTaskNumber = 2;
Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
// For sum task, we have two summer.
int sumTaskNumber = 2;
Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber);
// construct the graph
Graph graph = new Graph();
graph.addVertex(split);
graph.addVertex(sum);
Partitioner partitioner = new HashPartitioner();
graph.addEdge(split, partitioner, sum);
UserConfig conf = UserConfig.empty();
StreamApplication app = new StreamApplication(&quot;wordcountJava&quot;, conf, graph);
// create master client
// It will read the master settings under gearpump.cluster.masters
ClientContext masterClient = new ClientContext(akkaConf);
masterClient.submit(app);
masterClient.close();
}
}</code></pre>
<h2 id="submit-application">Submit application</h2>
<p>After all these, you need to package everything into a uber jar and submit the jar to Gearpump Cluster. Please check <a href="../../introduction/commandline">Application submission tool</a> to command line tool syntax.</p>
<h2 id="advanced-topic">Advanced topic</h2>
<p>For a real application, you definitely need to define your own customized message passing between processors.
Customized message needs customized serializer to help message passing over wire.
Check <a href="../dev-custom-serializer">this guide</a> for how to customize serializer.</p>
<h3 id="gearpump-for-non-streaming-usage">Gearpump for Non-Streaming Usage</h3>
<p>Gearpump is also able to as a base platform to develop non-streaming applications. See <a href="../dev-non-streaming-example">this guide</a> on how to use Gearpump to develop a distributed shell.</p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../dev-custom-serializer/index.html" class="btn btn-neutral float-right" title="Customized Message Passing">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../../deployment/hardware-requirement/index.html" class="btn btn-neutral" title="Hardware Requirement"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<a href="https://github.com/apache/incubator-gearpump" class="icon icon-github" style="float: left; color: #fcfcfc"> GitHub</a>
<span><a href="../../deployment/hardware-requirement/index.html" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../dev-custom-serializer/index.html" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
<script src="../../js/theme.js"></script>
</body>
</html>