blob: 124981e9841b398e6dcffff54747a3c9f2317be7 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.10-SNAPSHOT Documentation: Iterations</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Iterations</h1>
<p>Iterative algorithms occur in many domains of data analysis, such as <em>machine learning</em> or <em>graph analysis</em>. Such algorithms are crucial in order to realize the promise of Big Data to extract meaningful information out of your data. With increasing interest to run these kinds of algorithms on very large data sets, there is a need to execute iterations in a massively parallel fashion.</p>
<p>Flink programs implement iterative algorithms by defining a <strong>step function</strong> and embedding it into a special iteration operator. There are two variants of this operator: <strong>Iterate</strong> and <strong>Delta Iterate</strong>. Both operators repeatedly invoke the step function on the current iteration state until a certain termination condition is reached.</p>
<p>Here, we provide background on both operator variants and outline their usage. The <a href="programming_guide.html">programming guide</a> explains how to implement the operators in both Scala and Java. We also support both <strong>vertex-centric and gather-sum-apply iterations</strong> through Flink’s graph processing API, <a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Gelly</a>.</p>
<p>The following table provides an overview of both operators:</p>
<table class="table table-striped table-hover table-bordered">
<thead>
<th></th>
<th class="text-center">Iterate</th>
<th class="text-center">Delta Iterate</th>
</thead>
<tr>
<td class="text-center" width="20%"><strong>Iteration Input</strong></td>
<td class="text-center" width="40%"><strong>Partial Solution</strong></td>
<td class="text-center" width="40%"><strong>Workset</strong> and <strong>Solution Set</strong></td>
</tr>
<tr>
<td class="text-center"><strong>Step Function</strong></td>
<td colspan="2" class="text-center">Arbitrary Data Flows</td>
</tr>
<tr>
<td class="text-center"><strong>State Update</strong></td>
<td class="text-center">Next <strong>partial solution</strong></td>
<td>
<ul>
<li>Next workset</li>
<li><strong>Changes to solution set</strong></li>
</ul>
</td>
</tr>
<tr>
<td class="text-center"><strong>Iteration Result</strong></td>
<td class="text-center">Last partial solution</td>
<td class="text-center">Solution set state after last iteration</td>
</tr>
<tr>
<td class="text-center"><strong>Termination</strong></td>
<td>
<ul>
<li><strong>Maximum number of iterations</strong> (default)</li>
<li>Custom aggregator convergence</li>
</ul>
</td>
<td>
<ul>
<li><strong>Maximum number of iterations or empty workset</strong> (default)</li>
<li>Custom aggregator convergence</li>
</ul>
</td>
</tr>
</table>
<ul id="markdown-toc">
<li><a href="#iterate-operator" id="markdown-toc-iterate-operator">Iterate Operator</a> <ul>
<li><a href="#example-incrementing-numbers" id="markdown-toc-example-incrementing-numbers">Example: Incrementing Numbers</a></li>
</ul>
</li>
<li><a href="#delta-iterate-operator" id="markdown-toc-delta-iterate-operator">Delta Iterate Operator</a> <ul>
<li><a href="#example-propagate-minimum-in-graph" id="markdown-toc-example-propagate-minimum-in-graph">Example: Propagate Minimum in Graph</a></li>
</ul>
</li>
<li><a href="#superstep-synchronization" id="markdown-toc-superstep-synchronization">Superstep Synchronization</a></li>
</ul>
<h2 id="iterate-operator">Iterate Operator</h2>
<p>The <strong>iterate operator</strong> covers the <em>simple form of iterations</em>: in each iteration, the <strong>step function</strong> consumes the <strong>entire input</strong> (the <em>result of the previous iteration</em>, or the <em>initial data set</em>), and computes the <strong>next version of the partial solution</strong> (e.g. <code>map</code>, <code>reduce</code>, <code>join</code>, etc.).</p>
<p class="text-center">
<img alt="Iterate Operator" width="60%" src="fig/iterations_iterate_operator.png" />
</p>
<ol>
<li><strong>Iteration Input</strong>: Initial input for the <em>first iteration</em> from a <em>data source</em> or <em>previous operators</em>.</li>
<li><strong>Step Function</strong>: The step function will be executed in each iteration. It is an arbitrary data flow consisting of operators like <code>map</code>, <code>reduce</code>, <code>join</code>, etc. and depends on your specific task at hand.</li>
<li><strong>Next Partial Solution</strong>: In each iteration, the output of the step function will be fed back into the <em>next iteration</em>.</li>
<li><strong>Iteration Result</strong>: Output of the <em>last iteration</em> is written to a <em>data sink</em> or used as input to the <em>following operators</em>.</li>
</ol>
<p>There are multiple options to specify <strong>termination conditions</strong> for an iteration:</p>
<ul>
<li><strong>Maximum number of iterations</strong>: Without any further conditions, the iteration will be executed this many times.</li>
<li><strong>Custom aggregator convergence</strong>: Iterations allow to specify <em>custom aggregators</em> and <em>convergence criteria</em> like sum aggregate the number of emitted records (aggregator) and terminate if this number is zero (convergence criterion).</li>
</ul>
<p>You can also think about the iterate operator in pseudo-code:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">IterationState</span> <span class="n">state</span> <span class="o">=</span> <span class="n">getInitialState</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(!</span><span class="n">terminationCriterion</span><span class="o">())</span> <span class="o">{</span>
<span class="n">state</span> <span class="o">=</span> <span class="n">step</span><span class="o">(</span><span class="n">state</span><span class="o">);</span>
<span class="o">}</span>
<span class="n">setFinalState</span><span class="o">(</span><span class="n">state</span><span class="o">);</span></code></pre></div>
<div class="panel panel-default">
<div class="panel-body">
See the <strong><a href="programming_guide.html">Programming Guide</a> </strong> for details and code examples.</div>
</div>
<h3 id="example-incrementing-numbers">Example: Incrementing Numbers</h3>
<p>In the following example, we <strong>iteratively incremenet a set numbers</strong>:</p>
<p class="text-center">
<img alt="Iterate Operator Example" width="60%" src="fig/iterations_iterate_operator_example.png" />
</p>
<ol>
<li><strong>Iteration Input</strong>: The inital input is read from a data source and consists of five single-field records (integers <code>1</code> to <code>5</code>).</li>
<li><strong>Step function</strong>: The step function is a single <code>map</code> operator, which increments the integer field from <code>i</code> to <code>i+1</code>. It will be applied to every record of the input.</li>
<li><strong>Next Partial Solution</strong>: The output of the step function will be the output of the map operator, i.e. records with incremented integers.</li>
<li><strong>Iteration Result</strong>: After ten iterations, the initial numbers will have been incremented ten times, resulting in integers <code>11</code> to <code>15</code>.</li>
</ol>
<div class="highlight"><pre><code>// 1st 2nd 10th
map(1) -&gt; 2 map(2) -&gt; 3 ... map(10) -&gt; 11
map(2) -&gt; 3 map(3) -&gt; 4 ... map(11) -&gt; 12
map(3) -&gt; 4 map(4) -&gt; 5 ... map(12) -&gt; 13
map(4) -&gt; 5 map(5) -&gt; 6 ... map(13) -&gt; 14
map(5) -&gt; 6 map(6) -&gt; 7 ... map(14) -&gt; 15
</code></pre></div>
<p>Note that <strong>1</strong>, <strong>2</strong>, and <strong>4</strong> can be arbitrary data flows.</p>
<h2 id="delta-iterate-operator">Delta Iterate Operator</h2>
<p>The <strong>delta iterate operator</strong> covers the case of <strong>incremental iterations</strong>. Incremental iterations <strong>selectively modify elements</strong> of their <strong>solution</strong> and evolve the solution rather than fully recompute it.</p>
<p>Where applicable, this leads to <strong>more efficient algorithms</strong>, because not every element in the solution set changes in each iteration. This allows to <strong>focus on the hot parts</strong> of the solution and leave the <strong>cold parts untouched</strong>. Frequently, the majority of the solution cools down comparatively fast and the later iterations operate only on a small subset of the data.</p>
<p class="text-center">
<img alt="Delta Iterate Operator" width="60%" src="fig/iterations_delta_iterate_operator.png" />
</p>
<ol>
<li><strong>Iteration Input</strong>: The initial workset and solution set are read from <em>data sources</em> or <em>previous operators</em> as input to the first iteration.</li>
<li><strong>Step Function</strong>: The step function will be executed in each iteration. It is an arbitrary data flow consisting of operators like <code>map</code>, <code>reduce</code>, <code>join</code>, etc. and depends on your specific task at hand.</li>
<li><strong>Next Workset/Update Solution Set</strong>: The <em>next workset</em> drives the iterative computation and will be fed back into the <em>next iteration</em>. Furthermore, the solution set will be updated and implicitly forwarded (it is not required to be rebuild). Both data sets can be updated by different operators of the step function.</li>
<li><strong>Iteration Result</strong>: After the <em>last iteration</em>, the <em>solution set</em> is written to a <em>data sink</em> or used as input to the <em>following operators</em>.</li>
</ol>
<p>The default <strong>termination condition</strong> for delta iterations is specified by the <strong>empty workset convergence criterion</strong> and a <strong>maximum number of iterations</strong>. The iteration will terminate when a produced <em>next workset</em> is empty or when the maximum number of iterations is reached. It is also possible to specify a <strong>custom aggregator</strong> and <strong>convergence criterion</strong>.</p>
<p>You can also think about the iterate operator in pseudo-code:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">IterationState</span> <span class="n">workset</span> <span class="o">=</span> <span class="n">getInitialState</span><span class="o">();</span>
<span class="n">IterationState</span> <span class="n">solution</span> <span class="o">=</span> <span class="n">getInitialSolution</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(!</span><span class="n">terminationCriterion</span><span class="o">())</span> <span class="o">{</span>
<span class="o">(</span><span class="n">delta</span><span class="o">,</span> <span class="n">workset</span><span class="o">)</span> <span class="o">=</span> <span class="n">step</span><span class="o">(</span><span class="n">workset</span><span class="o">,</span> <span class="n">solution</span><span class="o">);</span>
<span class="n">solution</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="n">delta</span><span class="o">)</span>
<span class="o">}</span>
<span class="n">setFinalState</span><span class="o">(</span><span class="n">solution</span><span class="o">);</span></code></pre></div>
<div class="panel panel-default">
<div class="panel-body">
See the <strong><a href="programming_guide.html">programming guide</a></strong> for details and code examples.</div>
</div>
<h3 id="example-propagate-minimum-in-graph">Example: Propagate Minimum in Graph</h3>
<p>In the following example, every vertex has an <strong>ID</strong> and a <strong>coloring</strong>. Each vertex will propagate its vertex ID to neighboring vertices. The <strong>goal</strong> is to <em>assign the minimum ID to every vertex in a subgraph</em>. If a received ID is smaller then the current one, it changes to the color of the vertex with the received ID. One application of this can be found in <em>community analysis</em> or <em>connected components</em> computation.</p>
<p class="text-center">
<img alt="Delta Iterate Operator Example" width="100%" src="fig/iterations_delta_iterate_operator_example.png" />
</p>
<p>The <strong>intial input</strong> is set as <strong>both workset and solution set.</strong> In the above figure, the colors visualize the <strong>evolution of the solution set</strong>. With each iteration, the color of the minimum ID is spreading in the respective subgraph. At the same time, the amount of work (exchanged and compared vertex IDs) decreases with each iteration. This corresponds to the <strong>decreasing size of the workset</strong>, which goes from all seven vertices to zero after three iterations, at which time the iteration terminates. The <strong>important observation</strong> is that <em>the lower subgraph converges before the upper half</em> does and the delta iteration is able to capture this with the workset abstraction.</p>
<p>In the upper subgraph <strong>ID 1</strong> (<em>orange</em>) is the <strong>minimum ID</strong>. In the <strong>first iteration</strong>, it will get propagated to vertex 2, which will subsequently change its color to orange. Vertices 3 and 4 will receive <strong>ID 2</strong> (in <em>yellow</em>) as their current minimum ID and change to yellow. Because the color of <em>vertex 1</em> didn’t change in the first iteration, it can be skipped it in the next workset.</p>
<p>In the lower subgraph <strong>ID 5</strong> (<em>cyan</em>) is the <strong>minimum ID</strong>. All vertices of the lower subgraph will receive it in the first iteration. Again, we can skip the unchanged vertices (<em>vertex 5</em>) for the next workset.</p>
<p>In the <strong>2nd iteration</strong>, the workset size has already decreased from seven to five elements (vertices 2, 3, 4, 6, and 7). These are part of the iteration and further propagate their current minimum IDs. After this iteration, the lower subgraph has already converged (<strong>cold part</strong> of the graph), as it has no elements in the workset, whereas the upper half needs a further iteration (<strong>hot part</strong> of the graph) for the two remaining workset elements (vertices 3 and 4).</p>
<p>The iteration <strong>terminates</strong>, when the workset is empty after the <strong>3rd iteration</strong>.</p>
<p><a href="#supersteps"></a></p>
<h2 id="superstep-synchronization">Superstep Synchronization</h2>
<p>We referred to each execution of the step function of an iteration operator as <em>a single iteration</em>. In parallel setups, <strong>multiple instances of the step function are evaluated in parallel</strong> on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called <strong>superstep</strong>, which is also the granularity of synchronization. Therefore, <em>all</em> parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. <strong>Termination criteria</strong> will also be evaluated at superstep barriers.</p>
<p class="text-center">
<img alt="Supersteps" width="50%" src="fig/iterations_supersteps.png" />
</p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>