blob: ecead03286a23bf3ed00b472c1f540ca2fc10152 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1"/>
<title>Gearpump Non-Streaming Example - GearPump 0.6.2 Documentation</title>
<link rel="stylesheet" href="css/bootstrap-3.3.5.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<link rel="stylesheet" href="css/main.css">
<link rel="stylesheet" href="css/pygments-default.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<div class="navbar navbar-inverse navbar-fixed-top" id="topbar">
<div class="container">
<div class="navbar-header">
<a class="navbar-brand" href="/">GearPump
<span class="label label-primary" style="font-size: .6em">0.6.2</span>
</a>
</div>
<div class="collapse navbar-collapse">
<ul class="nav navbar-nav">
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Introduction<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="submit-your-1st-application.html">Submit Your 1st Application</a></li>
<li><a href="commandline.html">Client Command Line</a></li>
<li class="divider"></li>
<li><a href="basic-concepts.html">Basic Concepts</a></li>
<li><a href="features.html">Technical Highlights</a></li>
<li><a href="message-delivery.html">Reliable Message Delivery</a></li>
<li><a href="performance-report.html">Performance</a></li>
<li><a href="gearpump-internals.html">Gearpump Internals</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li class="dropdown-header">Deployment</li>
<li><a href="deployment-docker.html">Docker</a><li>
<li><a href="deployment-local.html">Local</a><li>
<li><a href="deployment-standalone.html">Standalone</a></li>
<li><a href="deployment-yarn.html">YARN</a></li>
<li class="divider"></li>
<li><a href="deployment-ha.html">High Availability</a></li>
<li><a href="deployment-msg-delivery.html">Reliable Message Delivery</a></li>
<li><a href="deployment-configuration.html">Configuration</a></li>
<li class="divider"></li>
<li><a href="deployment-security.html">Security</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guide<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="dev-write-1st-app.html">Write Your 1st App</a></li>
<li><a href="dev-custom-serializer.html">Customized Message Passing</a></li>
<li class="divider"></li>
<li><a href="api/scala/index.html">Scala API</a></li>
<li><a href="api/java/index.html">Java API</a></li>
<li><a href="dev-rest-api.html">RESTful API</a></li>
<li class="divider"></li>
<li><a href="dev-connectors.html">Gearpump Connectors</a></li>
<li class="divider"></li>
<li><a href="dev-storm.html">Storm Compatibility</a></li>
<!--
<li><a href="dev-samoa.html">Samoa Compatibility</a></li>
<li class="divider"></li>
<li><a href="dev-iot.html">Gearpump with IoT</a></li>
-->
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="how-to-contribute.html">How to Contribute</a></li>
<li><a href="coding-style.html">Coding Style</a></li>
<li class="divider"></li>
<li><a href="faq.html">FAQ</a><li>
<li><a href="about.html">About</a></li>
</ul>
</li>
</ul>
</div>
</div>
</div>
<div class="container" id="content">
<h1 class="title">Gearpump Non-Streaming Example</h1>
<p>We&#8217;ll use <a href="https://github.com/gearpump/gearpump/tree/master/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell">Distributed Shell</a> as an example to illustrate how to do that.</p>
<p>What Distributed Shell do is that user send a shell command to the cluster and the command will the executed on each node, then the result will be return to user.</p>
<h3 id="mavensbt-settings">Maven/Sbt Settings</h3>
<p>Repository and library dependencies can be found at <a href="maven-setting.html">Maven Setting</a></p>
<h3 id="define-executor-class">Define Executor Class</h3>
<div class="highlight"><pre><code class="language-scala"><span class="k">class</span> <span class="nc">ShellExecutor</span><span class="o">(</span><span class="n">executorContext</span><span class="k">:</span> <span class="kt">ExecutorContext</span><span class="o">,</span> <span class="n">userConf</span> <span class="k">:</span> <span class="kt">UserConfig</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Actor</span><span class="o">{</span>
<span class="k">import</span> <span class="nn">executorContext._</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">receive</span><span class="k">:</span> <span class="kt">Receive</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">case</span> <span class="nc">ShellCommand</span><span class="o">(</span><span class="n">command</span><span class="o">,</span> <span class="n">args</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="n">process</span> <span class="k">=</span> <span class="nc">Try</span><span class="o">(</span><span class="n">s</span><span class="s">&quot;$command $args&quot;</span> <span class="o">!!)</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">process</span> <span class="k">match</span> <span class="o">{</span>
<span class="k">case</span> <span class="nc">Success</span><span class="o">(</span><span class="n">msg</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">msg</span>
<span class="k">case</span> <span class="nc">Failure</span><span class="o">(</span><span class="n">ex</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">ex</span><span class="o">.</span><span class="n">getMessage</span>
<span class="o">}</span>
<span class="n">sender</span> <span class="o">!</span> <span class="nc">ShellCommandResult</span><span class="o">(</span><span class="n">executorId</span><span class="o">,</span> <span class="n">result</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>So ShellExecutor just receive the ShellCommand and try to execute it and return the result to the sender, which is quite simple.</p>
<h3 id="define-appmaster-class">Define AppMaster Class</h3>
<p>For a non-streaming application, you have to write your own AppMaster.</p>
<p>Here is a typical user defined AppMaster, please note that some trivial codes are omitted.</p>
<div class="highlight"><pre><code class="language-scala"><span class="k">class</span> <span class="nc">DistShellAppMaster</span><span class="o">(</span><span class="n">appContext</span> <span class="k">:</span> <span class="kt">AppMasterContext</span><span class="o">,</span> <span class="n">app</span> <span class="k">:</span> <span class="kt">Application</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">ApplicationMaster</span> <span class="o">{</span>
<span class="k">protected</span> <span class="k">var</span> <span class="n">currentExecutorId</span> <span class="k">=</span> <span class="mi">0</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">preStart</span><span class="o">()</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="nc">ActorUtil</span><span class="o">.</span><span class="n">launchExecutorOnEachWorker</span><span class="o">(</span><span class="n">masterProxy</span><span class="o">,</span> <span class="n">getExecutorJvmConfig</span><span class="o">,</span> <span class="n">self</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">receive</span><span class="k">:</span> <span class="kt">Receive</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">case</span> <span class="nc">ExecutorSystemStarted</span><span class="o">(</span><span class="n">executorSystem</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="k">import</span> <span class="nn">executorSystem.</span><span class="o">{</span><span class="n">address</span><span class="o">,</span> <span class="n">worker</span><span class="o">,</span> <span class="n">resource</span> <span class="k">=&gt;</span> <span class="n">executorResource</span><span class="o">}</span>
<span class="k">val</span> <span class="n">executorContext</span> <span class="k">=</span> <span class="nc">ExecutorContext</span><span class="o">(</span><span class="n">currentExecutorId</span><span class="o">,</span> <span class="n">worker</span><span class="o">.</span><span class="n">workerId</span><span class="o">,</span> <span class="n">appId</span><span class="o">,</span> <span class="n">self</span><span class="o">,</span> <span class="n">executorResource</span><span class="o">)</span>
<span class="k">val</span> <span class="n">executor</span> <span class="k">=</span> <span class="n">context</span><span class="o">.</span><span class="n">actorOf</span><span class="o">(</span><span class="nc">Props</span><span class="o">(</span><span class="n">classOf</span><span class="o">[</span><span class="kt">ShellExecutor</span><span class="o">],</span> <span class="n">executorContext</span><span class="o">,</span> <span class="n">app</span><span class="o">.</span><span class="n">userConfig</span><span class="o">)</span>
<span class="o">.</span><span class="n">withDeploy</span><span class="o">(</span><span class="nc">Deploy</span><span class="o">(</span><span class="n">scope</span> <span class="k">=</span> <span class="nc">RemoteScope</span><span class="o">(</span><span class="n">address</span><span class="o">))),</span> <span class="n">currentExecutorId</span><span class="o">.</span><span class="n">toString</span><span class="o">)</span>
<span class="n">executorSystem</span><span class="o">.</span><span class="n">bindLifeCycleWith</span><span class="o">(</span><span class="n">executor</span><span class="o">)</span>
<span class="n">currentExecutorId</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">case</span> <span class="nc">StartExecutorSystemTimeout</span> <span class="k">=&gt;</span>
<span class="n">masterProxy</span> <span class="o">!</span> <span class="nc">ShutdownApplication</span><span class="o">(</span><span class="n">appId</span><span class="o">)</span>
<span class="n">context</span><span class="o">.</span><span class="n">stop</span><span class="o">(</span><span class="n">self</span><span class="o">)</span>
<span class="k">case</span> <span class="n">msg</span><span class="k">:</span> <span class="kt">ShellCommand</span> <span class="o">=&gt;</span>
<span class="nc">Future</span><span class="o">.</span><span class="n">fold</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="n">children</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span> <span class="o">?</span> <span class="n">msg</span><span class="o">))(</span><span class="k">new</span> <span class="nc">ShellCommandResultAggregator</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">aggregator</span><span class="o">,</span> <span class="n">response</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="n">aggregator</span><span class="o">.</span><span class="n">aggregate</span><span class="o">(</span><span class="n">response</span><span class="o">.</span><span class="n">asInstanceOf</span><span class="o">[</span><span class="kt">ShellCommandResult</span><span class="o">])</span>
<span class="o">}.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">toString</span><span class="o">())</span> <span class="n">pipeTo</span> <span class="n">sender</span>
<span class="o">}</span>
<span class="k">private</span> <span class="k">def</span> <span class="n">getExecutorJvmConfig</span><span class="k">:</span> <span class="kt">ExecutorSystemJvmConfig</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">config</span><span class="k">:</span> <span class="kt">Config</span> <span class="o">=</span> <span class="nc">Option</span><span class="o">(</span><span class="n">app</span><span class="o">.</span><span class="n">clusterConfig</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">getConfig</span><span class="o">).</span><span class="n">getOrElse</span><span class="o">(</span><span class="nc">ConfigFactory</span><span class="o">.</span><span class="n">empty</span><span class="o">())</span>
<span class="k">val</span> <span class="n">jvmSetting</span> <span class="k">=</span> <span class="nc">Util</span><span class="o">.</span><span class="n">resolveJvmSetting</span><span class="o">(</span><span class="n">config</span><span class="o">.</span><span class="n">withFallback</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">.</span><span class="n">settings</span><span class="o">.</span><span class="n">config</span><span class="o">)).</span><span class="n">executor</span>
<span class="nc">ExecutorSystemJvmConfig</span><span class="o">(</span><span class="n">jvmSetting</span><span class="o">.</span><span class="n">classPath</span><span class="o">,</span> <span class="n">jvmSetting</span><span class="o">.</span><span class="n">vmargs</span><span class="o">,</span>
<span class="n">appJar</span><span class="o">,</span> <span class="n">username</span><span class="o">,</span> <span class="n">config</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>So when this DistShellAppMaster started, first it will request resources to launch one executor on each node, which is done in method <code>preStart</code></p>
<p>Then the DistShellAppMaster&#8217;s receive handler will handle the allocated resource to launch the <code>ShellExecutor</code> we want. If you want to write your application, you can just use this part of code. The only thing needed is replacing the Executor class.</p>
<p>There may be a situation that the resource allocation failed which will bring the message <code>StartExecutorSystemTimeout</code>, the normal pattern to handle that is just what we do: shut down the application.</p>
<p>The real application logic part is in <code>ShellCommand</code> message handler, which is specific to different applications. Here we distribute the shell command to each executor and aggregate the results to the client.</p>
<p>For method <code>getExecutorJvmConfig</code>, you can just use this part of code in your own application.</p>
<h3 id="define-application">Define Application</h3>
<p>Now its time to launch the application.</p>
<div class="highlight"><pre><code class="language-scala"><span class="k">object</span> <span class="nc">DistributedShell</span> <span class="k">extends</span> <span class="nc">App</span> <span class="k">with</span> <span class="nc">ArgumentsParser</span> <span class="o">{</span>
<span class="k">private</span> <span class="k">val</span> <span class="nc">LOG</span><span class="k">:</span> <span class="kt">Logger</span> <span class="o">=</span> <span class="nc">LogUtil</span><span class="o">.</span><span class="n">getLogger</span><span class="o">(</span><span class="n">getClass</span><span class="o">)</span>
<span class="k">override</span> <span class="k">val</span> <span class="n">options</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">CLIOption</span><span class="o">[</span><span class="kt">Any</span><span class="o">])]</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">.</span><span class="n">empty</span>
<span class="nc">LOG</span><span class="o">.</span><span class="n">info</span><span class="o">(</span><span class="n">s</span><span class="s">&quot;Distributed shell submitting application...&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">context</span> <span class="k">=</span> <span class="nc">ClientContext</span><span class="o">()</span>
<span class="k">val</span> <span class="n">appId</span> <span class="k">=</span> <span class="n">context</span><span class="o">.</span><span class="n">submit</span><span class="o">(</span><span class="nc">Application</span><span class="o">[</span><span class="kt">DistShellAppMaster</span><span class="o">](</span><span class="s">&quot;DistributedShell&quot;</span><span class="o">,</span> <span class="nc">UserConfig</span><span class="o">.</span><span class="n">empty</span><span class="o">))</span>
<span class="n">context</span><span class="o">.</span><span class="n">close</span><span class="o">()</span>
<span class="nc">LOG</span><span class="o">.</span><span class="n">info</span><span class="o">(</span><span class="n">s</span><span class="s">&quot;Distributed Shell Application started with appId $appId !&quot;</span><span class="o">)</span>
<span class="o">}</span></code></pre></div>
<p>The application class extends <code>App</code> and `ArgumentsParser which make it easier to parse arguments and run main functions. This part is similar to the streaming applications.</p>
<p>The main class DistributeShell will submit an Application to Master, whose AppMaster is DistShellAppMaster.</p>
<h3 id="define-an-optional-client-class">Define an optional Client class</h3>
<p>Now, we can define a Client class to talk with AppMaster to pass our commands to it.</p>
<div class="highlight"><pre><code class="language-scala"><span class="k">object</span> <span class="nc">DistributedShellClient</span> <span class="k">extends</span> <span class="nc">App</span> <span class="k">with</span> <span class="nc">ArgumentsParser</span> <span class="o">{</span>
<span class="k">implicit</span> <span class="k">val</span> <span class="n">timeout</span> <span class="k">=</span> <span class="nc">Constants</span><span class="o">.</span><span class="nc">FUTURE_TIMEOUT</span>
<span class="k">import</span> <span class="nn">scala.concurrent.ExecutionContext.Implicits.global</span>
<span class="k">private</span> <span class="k">val</span> <span class="nc">LOG</span><span class="k">:</span> <span class="kt">Logger</span> <span class="o">=</span> <span class="nc">LoggerFactory</span><span class="o">.</span><span class="n">getLogger</span><span class="o">(</span><span class="n">getClass</span><span class="o">)</span>
<span class="k">override</span> <span class="k">val</span> <span class="n">options</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">CLIOption</span><span class="o">[</span><span class="kt">Any</span><span class="o">])]</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span>
<span class="s">&quot;master&quot;</span> <span class="o">-&gt;</span> <span class="nc">CLIOption</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;&lt;host1:port1,host2:port2,host3:port3&gt;&quot;</span><span class="o">,</span> <span class="n">required</span> <span class="k">=</span> <span class="kc">true</span><span class="o">),</span>
<span class="s">&quot;appid&quot;</span> <span class="o">-&gt;</span> <span class="nc">CLIOption</span><span class="o">[</span><span class="kt">Int</span><span class="o">](</span><span class="s">&quot;&lt;the distributed shell appid&gt;&quot;</span><span class="o">,</span> <span class="n">required</span> <span class="k">=</span> <span class="kc">true</span><span class="o">),</span>
<span class="s">&quot;command&quot;</span> <span class="o">-&gt;</span> <span class="nc">CLIOption</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;&lt;shell command&gt;&quot;</span><span class="o">,</span> <span class="n">required</span> <span class="k">=</span> <span class="kc">true</span><span class="o">),</span>
<span class="s">&quot;args&quot;</span> <span class="o">-&gt;</span> <span class="nc">CLIOption</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;&lt;shell arguments&gt;&quot;</span><span class="o">,</span> <span class="n">required</span> <span class="k">=</span> <span class="kc">true</span><span class="o">)</span>
<span class="o">)</span>
<span class="k">val</span> <span class="n">config</span> <span class="k">=</span> <span class="n">parse</span><span class="o">(</span><span class="n">args</span><span class="o">)</span>
<span class="k">val</span> <span class="n">context</span> <span class="k">=</span> <span class="nc">ClientContext</span><span class="o">(</span><span class="n">config</span><span class="o">.</span><span class="n">getString</span><span class="o">(</span><span class="s">&quot;master&quot;</span><span class="o">))</span>
<span class="k">val</span> <span class="n">appid</span> <span class="k">=</span> <span class="n">config</span><span class="o">.</span><span class="n">getInt</span><span class="o">(</span><span class="s">&quot;appid&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">command</span> <span class="k">=</span> <span class="n">config</span><span class="o">.</span><span class="n">getString</span><span class="o">(</span><span class="s">&quot;command&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">arguments</span> <span class="k">=</span> <span class="n">config</span><span class="o">.</span><span class="n">getString</span><span class="o">(</span><span class="s">&quot;args&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">appMaster</span> <span class="k">=</span> <span class="n">context</span><span class="o">.</span><span class="n">resolveAppID</span><span class="o">(</span><span class="n">appid</span><span class="o">)</span>
<span class="o">(</span><span class="n">appMaster</span> <span class="o">?</span> <span class="nc">ShellCommand</span><span class="o">(</span><span class="n">command</span><span class="o">,</span> <span class="n">arguments</span><span class="o">)).</span><span class="n">map</span> <span class="o">{</span> <span class="n">reslut</span> <span class="k">=&gt;</span>
<span class="nc">LOG</span><span class="o">.</span><span class="n">info</span><span class="o">(</span><span class="n">s</span><span class="s">&quot;Result: $reslut&quot;</span><span class="o">)</span>
<span class="n">context</span><span class="o">.</span><span class="n">close</span><span class="o">()</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>In the DistributedShellClient, it will resolve the appid to the real appmaster(the applicaton id will be printed when launching DistributedShell).</p>
<p>Once we got the AppMaster, then we can send ShellCommand to it and wait for the result.</p>
<h3 id="submit-application">Submit application</h3>
<p>After all these, you need to package everything into a uber jar and submit the jar to Gearpump Cluster. Please check <a href="commandline.html">Application submission tool</a> to command line tool syntax.</p>
</div> <!-- /container -->
<script src="js/vendor/jquery-2.1.4.min.js"></script>
<script src="js/vendor/bootstrap-3.3.5.min.js"></script>
<script src="js/vendor/anchor-1.1.1.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>