| <!DOCTYPE html> |
| <!--[if lt IE 7]> |
| <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> |
| <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> |
| <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> |
| <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1"/> |
| <title>Write Your 1st Gearpump App - Gearpump 0.8.1 Documentation</title> |
| |
| <meta name="description" |
| content="Write Your 1st Gearpump App"> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap-3.3.5.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <link rel="stylesheet" href="css/main.css"> |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade |
| your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install |
| Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <div class="navbar navbar-inverse navbar-fixed-top" id="topbar"> |
| <div class="container"> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" |
| data-target="#navbar" aria-expanded="false" aria-controls="navbar"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <a class="navbar-brand" href="http://gearpump.apache.org">Gearpump |
| <span class="label label-primary" style="font-size: .6em">0.8.1</span> |
| </a> |
| </div> |
| <div id="navbar" class="collapse navbar-collapse"> |
| <ul class="nav navbar-nav"> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Introduction<b |
| class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="submit-your-1st-application.html">Submit Your 1st Application</a></li> |
| <li><a href="commandline.html">Client Command Line</a></li> |
| <li class="divider"></li> |
| <li><a href="basic-concepts.html">Basic Concepts</a></li> |
| <li><a href="features.html">Technical Highlights</a></li> |
| <li><a href="message-delivery.html">Reliable Message Delivery</a></li> |
| <li><a href="performance-report.html">Performance</a></li> |
| <li><a href="gearpump-internals.html">Gearpump Internals</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li class="dropdown-header">Deployment</li> |
| <li><a href="deployment-local.html">Local Mode</a> |
| <li> |
| <li><a href="deployment-standalone.html">Standalone Mode</a></li> |
| <li><a href="deployment-yarn.html">YARN Mode</a></li> |
| <li><a href="deployment-docker.html">Docker Mode</a> |
| <li> |
| <li class="divider"></li> |
| <li><a href="deployment-ui-authentication.html">UI Authentication</a></li> |
| <li><a href="deployment-ha.html">High Availability</a></li> |
| <li><a href="deployment-msg-delivery.html">Reliable Message Delivery</a></li> |
| <li><a href="deployment-configuration.html">Configuration</a></li> |
| <li><a href="deployment-resource-isolation.html">Resource Isolation</a></li> |
| <li class="divider"></li> |
| <li><a href="deployment-security.html">YARN Security Guide</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guide<b |
| class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="dev-write-1st-app.html">Write Your 1st App</a></li> |
| <li><a href="dev-custom-serializer.html">Customized Message Passing</a></li> |
| <li class="divider"></li> |
| <li><a href="api/scala/index.html">Scala API</a></li> |
| <li><a href="api/java/index.html">Java API</a></li> |
| <li><a href="dev-rest-api.html">RESTful API</a></li> |
| <li class="divider"></li> |
| <li><a href="dev-connectors.html">Gearpump Connectors</a></li> |
| <li class="divider"></li> |
| <li><a href="dev-storm.html">Storm Compatibility</a></li> |
| <!-- |
| <li><a href="dev-samoa.html">Samoa Compatibility</a></li> |
| <li class="divider"></li> |
| <li><a href="dev-iot.html">Gearpump with IoT</a></li> |
| --> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="faq.html">FAQ</a></li> |
| </ul> |
| </li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container" id="content"> |
| |
| <h1 class="title">Write Your 1st Gearpump App</h1> |
| |
| |
| <p>We’ll use <a href="https://github.com/apache/incubator-gearpump/tree/master/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount">wordcount</a> as an example to illustrate how to write Gearpump applications.</p> |
| |
| <h3 id="mavensbt-settings">Maven/Sbt Settings</h3> |
| |
| <p>Repository and library dependencies can be found at <a href="maven-setting.html">Maven Setting</a>.</p> |
| |
| <h3 id="ide-setup-optional">IDE Setup (Optional)</h3> |
| <p>You can get your preferred IDE ready for Gearpump by following <a href="dev-ide-setup.html">this guide</a>.</p> |
| |
| <h3 id="decide-which-language-and-api-to-use-for-writing">Decide which language and API to use for writing</h3> |
| <p>Gearpump supports two level APIs:</p> |
| |
| <ol> |
| <li> |
| <p>Low level API, which is more similar to Akka programming, operating on each event. The API document can be found at <a href="http://gearpump.apache.org/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.package">Low Level API Doc</a>.</p> |
| </li> |
| <li> |
| <p>High level API (aka DSL), which is operating on streaming instead of individual event. The API document can be found at <a href="http://gearpump.apache.org/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.dsl.package">DSL API Doc</a>.</p> |
| </li> |
| </ol> |
| |
| <p>And both APIs have their Java version and Scala version.</p> |
| |
| <p>So, before you writing your first Gearpump application, you need to decide which API to use and which language to use.</p> |
| |
| <h2 id="dsl-version-for-wordcount">DSL version for Wordcount</h2> |
| |
| <p>The easiest way to write your streaming application is to write it with Gearpump DSL. |
| Below will demostrate how to write WordCount application via Gearpump DSL.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="cm">/** WordCount with High level DSL */</span> |
| <span class="k">object</span> <span class="nc">WordCount</span> <span class="k">extends</span> <span class="nc">AkkaApp</span> <span class="k">with</span> <span class="nc">ArgumentsParser</span> <span class="o">{</span> |
| |
| <span class="k">override</span> <span class="k">val</span> <span class="n">options</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">CLIOption</span><span class="o">[</span><span class="kt">Any</span><span class="o">])]</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">.</span><span class="n">empty</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">akkaConf</span><span class="k">:</span> <span class="kt">Config</span><span class="o">,</span> <span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">context</span> <span class="k">=</span> <span class="nc">ClientContext</span><span class="o">(</span><span class="n">akkaConf</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">app</span> <span class="k">=</span> <span class="nc">StreamApp</span><span class="o">(</span><span class="s">"dsl"</span><span class="o">,</span> <span class="n">context</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">data</span> <span class="k">=</span> <span class="s">"This is a good start, bingo!! bingo!!"</span> |
| |
| <span class="c1">//count for each word and output to log</span> |
| <span class="n">app</span><span class="o">.</span><span class="n">source</span><span class="o">(</span><span class="n">data</span><span class="o">.</span><span class="n">lines</span><span class="o">.</span><span class="n">toList</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="s">"source"</span><span class="o">).</span> |
| <span class="c1">// word => (word, count)</span> |
| <span class="n">flatMap</span><span class="o">(</span><span class="n">line</span> <span class="k">=></span> <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">"[\\s]+"</span><span class="o">)).</span><span class="n">map</span><span class="o">((</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)).</span> |
| <span class="c1">// (word, count1), (word, count2) => (word, count1 + count2)</span> |
| <span class="n">groupByKey</span><span class="o">().</span><span class="n">sum</span><span class="o">.</span><span class="n">log</span> |
| |
| <span class="k">val</span> <span class="n">appId</span> <span class="k">=</span> <span class="n">context</span><span class="o">.</span><span class="n">submit</span><span class="o">(</span><span class="n">app</span><span class="o">)</span> |
| <span class="n">context</span><span class="o">.</span><span class="n">close</span><span class="o">()</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="cm">/** Java version of WordCount with high level DSL API */</span> |
| <span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCount</span> <span class="o">{</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">InterruptedException</span> <span class="o">{</span> |
| <span class="n">main</span><span class="o">(</span><span class="n">ClusterConfig</span><span class="o">.</span><span class="na">defaultConfig</span><span class="o">(),</span> <span class="n">args</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">Config</span> <span class="n">akkaConf</span><span class="o">,</span> <span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">InterruptedException</span> <span class="o">{</span> |
| <span class="n">ClientContext</span> <span class="n">context</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ClientContext</span><span class="o">(</span><span class="n">akkaConf</span><span class="o">);</span> |
| <span class="n">JavaStreamApp</span> <span class="n">app</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JavaStreamApp</span><span class="o">(</span><span class="s">"JavaDSL"</span><span class="o">,</span> <span class="n">context</span><span class="o">,</span> <span class="n">UserConfig</span><span class="o">.</span><span class="na">empty</span><span class="o">());</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">source</span> <span class="o">=</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="s">"This is a good start, bingo!! bingo!!"</span><span class="o">);</span> |
| |
| <span class="c1">//create a stream from the string list.</span> |
| <span class="n">JavaStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">sentence</span> <span class="o">=</span> <span class="n">app</span><span class="o">.</span><span class="na">source</span><span class="o">(</span><span class="n">source</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="n">UserConfig</span><span class="o">.</span><span class="na">empty</span><span class="o">(),</span> <span class="s">"source"</span><span class="o">);</span> |
| |
| <span class="c1">//tokenize the strings and create a new stream</span> |
| <span class="n">JavaStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentence</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="n">Iterator</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"\\s+"</span><span class="o">)).</span><span class="na">iterator</span><span class="o">();</span> |
| <span class="o">}</span> |
| <span class="o">},</span> <span class="s">"flatMap"</span><span class="o">);</span> |
| |
| <span class="c1">//map each string as (string, 1) pair</span> |
| <span class="n">JavaStream</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">ones</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">s</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">},</span> <span class="s">"map"</span><span class="o">);</span> |
| |
| <span class="c1">//group by according to string</span> |
| <span class="n">JavaStream</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">groupedOnes</span> <span class="o">=</span> <span class="n">ones</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="k">new</span> <span class="n">GroupByFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">tuple</span><span class="o">.</span><span class="na">_1</span><span class="o">();</span> |
| <span class="o">}</span> |
| <span class="o">},</span> <span class="mi">1</span><span class="o">,</span> <span class="s">"groupBy"</span><span class="o">);</span> |
| |
| <span class="c1">//for each group, make the sum</span> |
| <span class="n">JavaStream</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">wordcount</span> <span class="o">=</span> <span class="n">groupedOnes</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="n">ReduceFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">apply</span><span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">t1</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">t2</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">t1</span><span class="o">.</span><span class="na">_1</span><span class="o">(),</span> <span class="n">t1</span><span class="o">.</span><span class="na">_2</span><span class="o">()</span> <span class="o">+</span> <span class="n">t2</span><span class="o">.</span><span class="na">_2</span><span class="o">());</span> |
| <span class="o">}</span> |
| <span class="o">},</span> <span class="s">"reduce"</span><span class="o">);</span> |
| |
| <span class="c1">//output result using log</span> |
| <span class="n">wordcount</span><span class="o">.</span><span class="na">log</span><span class="o">();</span> |
| |
| <span class="n">app</span><span class="o">.</span><span class="na">run</span><span class="o">();</span> |
| <span class="n">context</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="low-level-api-based-wordcount">Low level API based Wordcount</h2> |
| |
| <h3 id="define-processortask-class-and-partitioner-class">Define Processor(Task) class and Partitioner class</h3> |
| |
| <p>An application is a Directed Acyclic Graph (DAG) of processors. In the wordcount example, We will firstly define two processors <code>Split</code> and <code>Sum</code>, and then weave them together.</p> |
| |
| <h4 id="split-processor">Split processor</h4> |
| |
| <p>In the <code>Split</code> processor, we simply split a predefined text (the content is simplified for conciseness) and send out each split word to <code>Sum</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">class</span> <span class="nc">Split</span><span class="o">(</span><span class="n">taskContext</span> <span class="k">:</span> <span class="kt">TaskContext</span><span class="o">,</span> <span class="n">conf</span><span class="k">:</span> <span class="kt">UserConfig</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Task</span><span class="o">(</span><span class="n">taskContext</span><span class="o">,</span> <span class="n">conf</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">import</span> <span class="nn">taskContext.output</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">onStart</span><span class="o">(</span><span class="n">startTime</span> <span class="k">:</span> <span class="kt">StartTime</span><span class="o">)</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">self</span> <span class="o">!</span> <span class="nc">Message</span><span class="o">(</span><span class="s">"start"</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">onNext</span><span class="o">(</span><span class="n">msg</span> <span class="k">:</span> <span class="kt">Message</span><span class="o">)</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="nc">Split</span><span class="o">.</span><span class="nc">TEXT_TO_SPLIT</span><span class="o">.</span><span class="n">lines</span><span class="o">.</span><span class="n">foreach</span> <span class="o">{</span> <span class="n">line</span> <span class="k">=></span> |
| <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">"[\\s]+"</span><span class="o">).</span><span class="n">filter</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">).</span><span class="n">foreach</span> <span class="o">{</span> <span class="n">msg</span> <span class="k">=></span> |
| <span class="n">output</span><span class="o">(</span><span class="k">new</span> <span class="nc">Message</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="nc">System</span><span class="o">.</span><span class="n">currentTimeMillis</span><span class="o">()))</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="n">self</span> <span class="o">!</span> <span class="nc">Message</span><span class="o">(</span><span class="s">"continue"</span><span class="o">,</span> <span class="nc">System</span><span class="o">.</span><span class="n">currentTimeMillis</span><span class="o">())</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="k">object</span> <span class="nc">Split</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="nc">TEXT_TO_SPLIT</span> <span class="k">=</span> <span class="s">"some text"</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Split</span> <span class="kd">extends</span> <span class="n">Task</span> <span class="o">{</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="n">String</span> <span class="n">TEXT</span> <span class="o">=</span> <span class="s">"This is a good start for java! bingo! bingo! "</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="nf">Split</span><span class="o">(</span><span class="n">TaskContext</span> <span class="n">taskContext</span><span class="o">,</span> <span class="n">UserConfig</span> <span class="n">userConf</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kd">super</span><span class="o">(</span><span class="n">taskContext</span><span class="o">,</span> <span class="n">userConf</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="kd">private</span> <span class="n">Long</span> <span class="nf">now</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">();</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStart</span><span class="o">(</span><span class="n">StartTime</span> <span class="n">startTime</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">self</span><span class="o">().</span><span class="na">tell</span><span class="o">(</span><span class="k">new</span> <span class="nf">Message</span><span class="o">(</span><span class="s">"start"</span><span class="o">,</span> <span class="n">now</span><span class="o">()),</span> <span class="n">self</span><span class="o">());</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onNext</span><span class="o">(</span><span class="n">Message</span> <span class="n">msg</span><span class="o">)</span> <span class="o">{</span> |
| |
| <span class="c1">// Split the TEXT to words</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">words</span> <span class="o">=</span> <span class="n">TEXT</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">);</span> |
| <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="n">words</span><span class="o">.</span><span class="na">length</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> |
| <span class="n">context</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="k">new</span> <span class="nf">Message</span><span class="o">(</span><span class="n">words</span><span class="o">[</span><span class="n">i</span><span class="o">],</span> <span class="n">now</span><span class="o">()));</span> |
| <span class="o">}</span> |
| <span class="n">self</span><span class="o">().</span><span class="na">tell</span><span class="o">(</span><span class="k">new</span> <span class="nf">Message</span><span class="o">(</span><span class="s">"next"</span><span class="o">,</span> <span class="n">now</span><span class="o">()),</span> <span class="n">self</span><span class="o">());</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <p>Essentially, each processor consists of two descriptions:</p> |
| |
| <ol> |
| <li> |
| <p>A <code>Task</code> to define the operation.</p> |
| </li> |
| <li> |
| <p>A parallelism level to define the number of tasks of this processor in parallel.</p> |
| </li> |
| </ol> |
| |
| <p>Just like <code>Split</code>, every processor extends <code>Task</code>. The <code>onStart</code> method is called once before any message comes in; <code>onNext</code> method is called to process every incoming message. Note that Gearpump employs the message-driven model and that’s why Split sends itself a message at the end of <code>onStart</code> and <code>onNext</code> to trigger next message processing.</p> |
| |
| <h4 id="sum-processor">Sum Processor</h4> |
| |
| <p>The structure of <code>Sum</code> processor looks much alike. <code>Sum</code> does not need to send messages to itself since it receives messages from <code>Split</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">class</span> <span class="nc">Sum</span> <span class="o">(</span><span class="n">taskContext</span> <span class="k">:</span> <span class="kt">TaskContext</span><span class="o">,</span> <span class="n">conf</span><span class="k">:</span> <span class="kt">UserConfig</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Task</span><span class="o">(</span><span class="n">taskContext</span><span class="o">,</span> <span class="n">conf</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">private</span><span class="o">[</span><span class="kt">wordcount</span><span class="o">]</span> <span class="k">val</span> <span class="n">map</span> <span class="k">:</span> <span class="kt">mutable.HashMap</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Long</span><span class="o">]</span> <span class="k">=</span> <span class="k">new</span> <span class="n">mutable</span><span class="o">.</span><span class="nc">HashMap</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Long</span><span class="o">]()</span> |
| |
| <span class="k">private</span><span class="o">[</span><span class="kt">wordcount</span><span class="o">]</span> <span class="k">var</span> <span class="n">wordCount</span> <span class="k">:</span> <span class="kt">Long</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">private</span> <span class="k">var</span> <span class="n">snapShotTime</span> <span class="k">:</span> <span class="kt">Long</span> <span class="o">=</span> <span class="nc">System</span><span class="o">.</span><span class="n">currentTimeMillis</span><span class="o">()</span> |
| <span class="k">private</span> <span class="k">var</span> <span class="n">snapShotWordCount</span> <span class="k">:</span> <span class="kt">Long</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">private</span> <span class="k">var</span> <span class="n">scheduler</span> <span class="k">:</span> <span class="kt">Cancellable</span> <span class="o">=</span> <span class="kc">null</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">onStart</span><span class="o">(</span><span class="n">startTime</span> <span class="k">:</span> <span class="kt">StartTime</span><span class="o">)</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">scheduler</span> <span class="k">=</span> <span class="n">taskContext</span><span class="o">.</span><span class="n">schedule</span><span class="o">(</span><span class="k">new</span> <span class="nc">FiniteDuration</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">SECONDS</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nc">FiniteDuration</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">SECONDS</span><span class="o">))(</span><span class="n">reportWordCount</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">onNext</span><span class="o">(</span><span class="n">msg</span> <span class="k">:</span> <span class="kt">Message</span><span class="o">)</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">if</span> <span class="o">(</span><span class="kc">null</span> <span class="o">==</span> <span class="n">msg</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> |
| <span class="o">}</span> |
| <span class="k">val</span> <span class="n">current</span> <span class="k">=</span> <span class="n">map</span><span class="o">.</span><span class="n">getOrElse</span><span class="o">(</span><span class="n">msg</span><span class="o">.</span><span class="n">msg</span><span class="o">.</span><span class="n">asInstanceOf</span><span class="o">[</span><span class="kt">String</span><span class="o">],</span> <span class="mi">0L</span><span class="o">)</span> |
| <span class="n">wordCount</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="n">map</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="n">msg</span><span class="o">.</span><span class="n">msg</span><span class="o">.</span><span class="n">asInstanceOf</span><span class="o">[</span><span class="kt">String</span><span class="o">],</span> <span class="n">current</span> <span class="o">+</span> <span class="mi">1</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">onStop</span><span class="o">()</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">if</span> <span class="o">(</span><span class="n">scheduler</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">scheduler</span><span class="o">.</span><span class="n">cancel</span><span class="o">()</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="k">def</span> <span class="n">reportWordCount</span><span class="o">()</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">current</span> <span class="k">:</span> <span class="kt">Long</span> <span class="o">=</span> <span class="nc">System</span><span class="o">.</span><span class="n">currentTimeMillis</span><span class="o">()</span> |
| <span class="nc">LOG</span><span class="o">.</span><span class="n">info</span><span class="o">(</span><span class="n">s</span><span class="s">"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)"</span><span class="o">)</span> |
| <span class="n">snapShotWordCount</span> <span class="k">=</span> <span class="n">wordCount</span> |
| <span class="n">snapShotTime</span> <span class="k">=</span> <span class="n">current</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Sum</span> <span class="kd">extends</span> <span class="n">Task</span> <span class="o">{</span> |
| |
| <span class="kd">private</span> <span class="n">Logger</span> <span class="n">LOG</span> <span class="o">=</span> <span class="kd">super</span><span class="o">.</span><span class="na">LOG</span><span class="o">();</span> |
| <span class="kd">private</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">wordCount</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>();</span> |
| |
| <span class="kd">public</span> <span class="nf">Sum</span><span class="o">(</span><span class="n">TaskContext</span> <span class="n">taskContext</span><span class="o">,</span> <span class="n">UserConfig</span> <span class="n">userConf</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kd">super</span><span class="o">(</span><span class="n">taskContext</span><span class="o">,</span> <span class="n">userConf</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStart</span><span class="o">(</span><span class="n">StartTime</span> <span class="n">startTime</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">//skip</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onNext</span><span class="o">(</span><span class="n">Message</span> <span class="n">messagePayLoad</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">String</span> <span class="n">word</span> <span class="o">=</span> <span class="o">(</span><span class="n">String</span><span class="o">)</span> <span class="o">(</span><span class="n">messagePayLoad</span><span class="o">.</span><span class="na">msg</span><span class="o">());</span> |
| <span class="n">Integer</span> <span class="n">current</span> <span class="o">=</span> <span class="n">wordCount</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">word</span><span class="o">);</span> |
| <span class="k">if</span> <span class="o">(</span><span class="n">current</span> <span class="o">==</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">current</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="n">Integer</span> <span class="n">newCount</span> <span class="o">=</span> <span class="n">current</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> |
| <span class="n">wordCount</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">newCount</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <p>Besides counting the sum, in Scala version, we also define a scheduler to report throughput every 5 seconds. The scheduler should be cancelled when the computation completes, which could be accomplished overriding the <code>onStop</code> method. The default implementation of <code>onStop</code> is a no-op.</p> |
| |
| <h4 id="partitioner">Partitioner</h4> |
| |
| <p>A processor could be parallelized to a list of tasks. A <code>Partitioner</code> defines how the data is shuffled among tasks of Split and Sum. Gearpump has already provided two partitioners</p> |
| |
| <ul> |
| <li><code>HashPartitioner</code>: partitions data based on the message’s hashcode</li> |
| <li><code>ShufflePartitioner</code>: partitions data in a round-robin way.</li> |
| </ul> |
| |
| <p>You could define your own partitioner by extending the <code>Partitioner</code> trait/interface and overriding the <code>getPartition</code> method.</p> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">trait</span> <span class="nc">Partitioner</span> <span class="k">extends</span> <span class="nc">Serializable</span> <span class="o">{</span> |
| <span class="k">def</span> <span class="n">getPartition</span><span class="o">(</span><span class="n">msg</span> <span class="k">:</span> <span class="kt">Message</span><span class="o">,</span> <span class="n">partitionNum</span> <span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">:</span> <span class="kt">Int</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <h3 id="wrap-up-as-an-application">Wrap up as an application</h3> |
| |
| <p>Now, we are able to write our application class, weaving the above components together.</p> |
| |
| <p>The application class extends <code>App</code> and `ArgumentsParser which make it easier to parse arguments and run main functions.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">object</span> <span class="nc">WordCount</span> <span class="k">extends</span> <span class="nc">App</span> <span class="k">with</span> <span class="nc">ArgumentsParser</span> <span class="o">{</span> |
| <span class="k">private</span> <span class="k">val</span> <span class="nc">LOG</span><span class="k">:</span> <span class="kt">Logger</span> <span class="o">=</span> <span class="nc">LogUtil</span><span class="o">.</span><span class="n">getLogger</span><span class="o">(</span><span class="n">getClass</span><span class="o">)</span> |
| <span class="k">val</span> <span class="nc">RUN_FOR_EVER</span> <span class="k">=</span> <span class="o">-</span><span class="mi">1</span> |
| |
| <span class="k">override</span> <span class="k">val</span> <span class="n">options</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">CLIOption</span><span class="o">[</span><span class="kt">Any</span><span class="o">])]</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span> |
| <span class="s">"split"</span> <span class="o">-></span> <span class="nc">CLIOption</span><span class="o">[</span><span class="kt">Int</span><span class="o">](</span><span class="s">"<how many split tasks>"</span><span class="o">,</span> <span class="n">required</span> <span class="k">=</span> <span class="kc">false</span><span class="o">,</span> <span class="n">defaultValue</span> <span class="k">=</span> <span class="nc">Some</span><span class="o">(</span><span class="mi">1</span><span class="o">)),</span> |
| <span class="s">"sum"</span> <span class="o">-></span> <span class="nc">CLIOption</span><span class="o">[</span><span class="kt">Int</span><span class="o">](</span><span class="s">"<how many sum tasks>"</span><span class="o">,</span> <span class="n">required</span> <span class="k">=</span> <span class="kc">false</span><span class="o">,</span> <span class="n">defaultValue</span> <span class="k">=</span> <span class="nc">Some</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span> |
| <span class="o">)</span> |
| |
| <span class="k">def</span> <span class="n">application</span><span class="o">(</span><span class="n">config</span><span class="k">:</span> <span class="kt">ParseResult</span><span class="o">)</span> <span class="k">:</span> <span class="kt">StreamApplication</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">splitNum</span> <span class="k">=</span> <span class="n">config</span><span class="o">.</span><span class="n">getInt</span><span class="o">(</span><span class="s">"split"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">sumNum</span> <span class="k">=</span> <span class="n">config</span><span class="o">.</span><span class="n">getInt</span><span class="o">(</span><span class="s">"sum"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">partitioner</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">HashPartitioner</span><span class="o">()</span> |
| <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="nc">Processor</span><span class="o">[</span><span class="kt">Split</span><span class="o">](</span><span class="n">splitNum</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">sum</span> <span class="k">=</span> <span class="nc">Processor</span><span class="o">[</span><span class="kt">Sum</span><span class="o">](</span><span class="n">sumNum</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">app</span> <span class="k">=</span> <span class="nc">StreamApplication</span><span class="o">(</span><span class="s">"wordCount"</span><span class="o">,</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">Processor</span><span class="o">[</span><span class="k">_</span> <span class="k"><:</span> <span class="kt">Task</span><span class="o">]</span>, <span class="kt">Partitioner</span><span class="o">](</span><span class="n">split</span> <span class="o">~</span> <span class="n">partitioner</span> <span class="o">~></span> <span class="n">sum</span><span class="o">),</span> <span class="nc">UserConfig</span><span class="o">.</span><span class="n">empty</span><span class="o">)</span> |
| <span class="n">app</span> |
| <span class="o">}</span> |
| |
| <span class="k">val</span> <span class="n">config</span> <span class="k">=</span> <span class="n">parse</span><span class="o">(</span><span class="n">args</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">context</span> <span class="k">=</span> <span class="nc">ClientContext</span><span class="o">()</span> |
| <span class="k">val</span> <span class="n">appId</span> <span class="k">=</span> <span class="n">context</span><span class="o">.</span><span class="n">submit</span><span class="o">(</span><span class="n">application</span><span class="o">(</span><span class="n">config</span><span class="o">))</span> |
| <span class="n">context</span><span class="o">.</span><span class="n">close</span><span class="o">()</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>We override <code>options</code> value and define an array of command line arguments to parse. We want application users to pass in masters’ hosts and ports, the parallelism of split and sum tasks, and how long to run the example. We also specify whether an option is <code>required</code> and provide <code>defaultValue</code> for some arguments.</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="cm">/** Java version of WordCount with Processor Graph API */</span> |
| <span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCount</span> <span class="o">{</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">InterruptedException</span> <span class="o">{</span> |
| <span class="n">main</span><span class="o">(</span><span class="n">ClusterConfig</span><span class="o">.</span><span class="na">defaultConfig</span><span class="o">(),</span> <span class="n">args</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">Config</span> <span class="n">akkaConf</span><span class="o">,</span> <span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">InterruptedException</span> <span class="o">{</span> |
| |
| <span class="c1">// For split task, we config to create two tasks</span> |
| <span class="kt">int</span> <span class="n">splitTaskNumber</span> <span class="o">=</span> <span class="mi">2</span><span class="o">;</span> |
| <span class="n">Processor</span> <span class="n">split</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Processor</span><span class="o">(</span><span class="n">Split</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">withParallelism</span><span class="o">(</span><span class="n">splitTaskNumber</span><span class="o">);</span> |
| |
| <span class="c1">// For sum task, we have two summer.</span> |
| <span class="kt">int</span> <span class="n">sumTaskNumber</span> <span class="o">=</span> <span class="mi">2</span><span class="o">;</span> |
| <span class="n">Processor</span> <span class="n">sum</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Processor</span><span class="o">(</span><span class="n">Sum</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">withParallelism</span><span class="o">(</span><span class="n">sumTaskNumber</span><span class="o">);</span> |
| |
| <span class="c1">// construct the graph</span> |
| <span class="n">Graph</span> <span class="n">graph</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Graph</span><span class="o">();</span> |
| <span class="n">graph</span><span class="o">.</span><span class="na">addVertex</span><span class="o">(</span><span class="n">split</span><span class="o">);</span> |
| <span class="n">graph</span><span class="o">.</span><span class="na">addVertex</span><span class="o">(</span><span class="n">sum</span><span class="o">);</span> |
| |
| <span class="n">Partitioner</span> <span class="n">partitioner</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HashPartitioner</span><span class="o">();</span> |
| <span class="n">graph</span><span class="o">.</span><span class="na">addEdge</span><span class="o">(</span><span class="n">split</span><span class="o">,</span> <span class="n">partitioner</span><span class="o">,</span> <span class="n">sum</span><span class="o">);</span> |
| |
| <span class="n">UserConfig</span> <span class="n">conf</span> <span class="o">=</span> <span class="n">UserConfig</span><span class="o">.</span><span class="na">empty</span><span class="o">();</span> |
| <span class="n">StreamApplication</span> <span class="n">app</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StreamApplication</span><span class="o">(</span><span class="s">"wordcountJava"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">graph</span><span class="o">);</span> |
| |
| <span class="c1">// create master client</span> |
| <span class="c1">// It will read the master settings under gearpump.cluster.masters</span> |
| <span class="n">ClientContext</span> <span class="n">masterClient</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ClientContext</span><span class="o">(</span><span class="n">akkaConf</span><span class="o">);</span> |
| |
| <span class="n">masterClient</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="n">app</span><span class="o">);</span> |
| |
| <span class="n">masterClient</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="submit-application">Submit application</h2> |
| |
| <p>After all these, you need to package everything into a uber jar and submit the jar to Gearpump Cluster. Please check <a href="commandline.html">Application submission tool</a> to command line tool syntax.</p> |
| |
| <h2 id="advanced-topic">Advanced topic</h2> |
| <p>For a real application, you definitely need to define your own customized message passing between processors. |
| Customized message needs customized serializer to help message passing over wire. |
| Check <a href="dev-custom-serializer.html">this guide</a> for how to customize serializer.</p> |
| |
| <h3 id="gearpump-for-non-streaming-usage">Gearpump for Non-Streaming Usage</h3> |
| <p>Gearpump is also able to as a base platform to develop non-streaming applications. See <a href="dev-non-streaming-example.html">this guide</a> on how to use Gearpump to develop a distributed shell.</p> |
| |
| |
| </div> |
| <!-- /container --> |
| |
| <script src="js/vendor/jquery-2.1.4.min.js"></script> |
| <script src="js/vendor/bootstrap-3.3.5.min.js"></script> |
| <script src="js/vendor/anchor-1.1.1.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function (d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function () { |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [["$", "$"], ["\\\\(", "\\\\)"]], |
| displayMath: [["$$", "$$"], ["\\[", "\\]"]], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |