blob: ad0c507c6f52032b72a989692c8d723c694e2f49 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.9.0 Documentation: Python Programming Guide</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Python Programming Guide <span class="badge">Beta</span></h1>
<p><a href="#top"></a></p>
<p>Analysis programs in Flink are regular programs that implement transformations on data sets
(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for
example write the data to (distributed) files, or to standard output (for example the command line
terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
The execution can happen in a local JVM, or on clusters of many machines.</p>
<p>In order to create your own Flink program, we encourage you to start with the
<a href="#program-skeleton">program skeleton</a> and gradually add your own
<a href="#transformations">transformations</a>. The remaining sections act as references for additional
operations and advanced features.</p>
<ul id="markdown-toc">
<li><a href="#example-program" id="markdown-toc-example-program">Example Program</a></li>
<li><a href="#program-skeleton" id="markdown-toc-program-skeleton">Program Skeleton</a></li>
<li><a href="#project-setup" id="markdown-toc-project-setup">Project setup</a></li>
<li><a href="#lazy-evaluation" id="markdown-toc-lazy-evaluation">Lazy Evaluation</a></li>
<li><a href="#transformations" id="markdown-toc-transformations">Transformations</a></li>
<li><a href="#specifying-keys" id="markdown-toc-specifying-keys">Specifying Keys</a></li>
<li><a href="#passing-functions-to-flink" id="markdown-toc-passing-functions-to-flink">Passing Functions to Flink</a></li>
<li><a href="#data-types" id="markdown-toc-data-types">Data Types</a></li>
<li><a href="#data-sources" id="markdown-toc-data-sources">Data Sources</a></li>
<li><a href="#data-sinks" id="markdown-toc-data-sinks">Data Sinks</a></li>
<li><a href="#broadcast-variables" id="markdown-toc-broadcast-variables">Broadcast Variables</a></li>
<li><a href="#parallel-execution" id="markdown-toc-parallel-execution">Parallel Execution</a> <ul>
<li><a href="#execution-environment-level" id="markdown-toc-execution-environment-level">Execution Environment Level</a></li>
<li><a href="#system-level" id="markdown-toc-system-level">System Level</a></li>
</ul>
</li>
<li><a href="#executing-plans" id="markdown-toc-executing-plans">Executing Plans</a></li>
<li><a href="#debugging" id="markdown-toc-debugging">Debugging</a></li>
</ul>
<h2 id="example-program">Example Program</h2>
<p>The following program is a complete, working example of WordCount. You can copy &amp; paste the code
to run it locally.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">flink.plan.Environment</span> <span class="kn">import</span> <span class="n">get_environment</span>
<span class="kn">from</span> <span class="nn">flink.plan.Constants</span> <span class="kn">import</span> <span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span>
<span class="kn">from</span> <span class="nn">flink.functions.GroupReduceFunction</span> <span class="kn">import</span> <span class="n">GroupReduceFunction</span>
<span class="k">class</span> <span class="nc">Adder</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span>
<span class="n">count</span><span class="p">,</span> <span class="n">word</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span>
<span class="n">count</span> <span class="o">+=</span> <span class="nb">sum</span><span class="p">([</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">])</span>
<span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">count</span><span class="p">,</span> <span class="n">word</span><span class="p">))</span>
<span class="k">if</span> <span class="n">__name__</span> <span class="o">==</span> <span class="s">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span><span class="p">()</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="s">&quot;Who&#39;s there?&quot;</span><span class="p">,</span>
<span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="p">)</span>
<span class="n">data</span> \
<span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">c</span><span class="p">:</span> <span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="n">word</span><span class="p">)</span> <span class="k">for</span> <span class="n">word</span> <span class="ow">in</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">()],</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span> \
<span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> \
<span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">Adder</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span> \
<span class="o">.</span><span class="n">output</span><span class="p">()</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">local</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="program-skeleton">Program Skeleton</h2>
<p>As we already saw in the example, Flink programs look like regular python
programs with a <code>if __name__ == "__main__":</code> block. Each program consists of the same basic parts:</p>
<ol>
<li>Obtain an <code>Environment</code>,</li>
<li>Load/create the initial data,</li>
<li>Specify transformations on this data,</li>
<li>Specify where to put the results of your computations, and</li>
<li>Execute your program.</li>
</ol>
<p>We will now give an overview of each of those steps but please refer to the respective sections for
more details.</p>
<p>The <code>Environment</code> is the basis for all Flink programs. You can
obtain one using these static methods on class <code>Environment</code>:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">get_environment</span><span class="p">()</span></code></pre></div>
<p>For specifying data sources the execution environment has several methods
to read from files. To just read a text file as a sequence of lines, you can use:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span><span class="p">()</span>
<span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_text</span><span class="p">(</span><span class="s">&quot;file:///path/to/file&quot;</span><span class="p">)</span></code></pre></div>
<p>This will give you a DataSet on which you can then apply transformations. For
more information on data sources and input formats, please refer to
<a href="#data-sources">Data Sources</a>.</p>
<p>Once you have a DataSet you can apply transformations to create a new
DataSet which you can then write to a file, transform again, or
combine with other DataSets. You apply transformations by calling
methods on DataSet with your own custom transformation function. For example,
a map transformation looks like this:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">*</span><span class="mi">2</span><span class="p">,</span> <span class="n">INT</span><span class="p">)</span></code></pre></div>
<p>This will create a new DataSet by doubling every value in the original DataSet.
For more information and a list of all the transformations,
please refer to <a href="#transformations">Transformations</a>.</p>
<p>Once you have a DataSet that needs to be written to disk you can call one
of these methods on DataSet:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">write_text</span><span class="p">(</span><span class="s">&quot;&lt;file-path&gt;&quot;</span><span class="p">,</span> <span class="n">WriteMode</span><span class="o">=</span><span class="n">Constants</span><span class="o">.</span><span class="n">NO_OVERWRITE</span><span class="p">)</span>
<span class="n">write_csv</span><span class="p">(</span><span class="s">&quot;&lt;file-path&gt;&quot;</span><span class="p">,</span> <span class="n">line_delimiter</span><span class="o">=</span><span class="s">&#39;</span><span class="se">\n</span><span class="s">&#39;</span><span class="p">,</span> <span class="n">field_delimiter</span><span class="o">=</span><span class="s">&#39;,&#39;</span><span class="p">,</span> <span class="n">write_mode</span><span class="o">=</span><span class="n">Constants</span><span class="o">.</span><span class="n">NO_OVERWRITE</span><span class="p">)</span>
<span class="n">output</span><span class="p">()</span></code></pre></div>
<p>The last method is only useful for developing/debugging on a local machine,
it will output the contents of the DataSet to standard output. (Note that in
a cluster, the result goes to the standard out stream of the cluster nodes and ends
up in the <em>.out</em> files of the workers).
The first two do as the name suggests.
Please refer to <a href="#data-sinks">Data Sinks</a> for more information on writing to files.</p>
<p>Once you specified the complete program you need to call <code>execute</code> on
the <code>Environment</code>. This will either execute on your local machine or submit your program
for execution on a cluster, depending on how Flink was started. You can force
a local execution by using <code>execute(local=True)</code>.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="project-setup">Project setup</h2>
<p>Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.</p>
<p>The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="lazy-evaluation">Lazy Evaluation</h2>
<p>All Flink programs are executed lazily: When the program’s main method is executed, the data loading
and transformations do not happen directly. Rather, each operation is created and added to the
program’s plan. The operations are actually executed when one of the <code>execute()</code> methods is invoked
on the Environment object. Whether the program is executed locally or on a cluster depends
on the environment of the program.</p>
<p>The lazy evaluation lets you construct sophisticated programs that Flink executes as one
holistically planned unit.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="transformations">Transformations</h2>
<p>Data transformations transform one or more DataSets into a new DataSet. Programs can combine
multiple transformations into sophisticated assemblies.</p>
<p>This section gives a brief overview of the available transformations. The <a href="dataset_transformations.html">transformations
documentation</a> has a full description of all transformations with
examples.</p>
<p><br /></p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">*</span> <span class="mi">2</span><span class="p">,</span> <span class="n">INT</span><span class="p">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements. </p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[(</span><span class="mi">1</span><span class="p">,</span><span class="n">word</span><span class="p">)</span> <span class="k">for</span> <span class="n">word</span> <span class="ow">in</span> <span class="n">line</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">()</span> <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span>
<span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>MapPartition</strong></td>
<td>
<p>Transforms a parallel partition in a single function call. The function get the partition
as an `Iterator` and can produce an arbitrary number of result values. The number of
elements in each partition depends on the degree-of-parallelism and previous operations.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">map_partition</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="n">value</span> <span class="o">*</span> <span class="mi">2</span> <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function
returns true.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">&gt;</span> <span class="mi">1000</span><span class="p">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a group of elements into a single element by repeatedly combining two elements
into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">y</span> <span class="p">:</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="p">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>ReduceGroup</strong></td>
<td>
<p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
full data set, or on a grouped data set.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">Adder</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span>
<span class="n">count</span><span class="p">,</span> <span class="n">word</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span>
<span class="n">count</span> <span class="o">+=</span> <span class="nb">sum</span><span class="p">([</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">)</span>
<span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">count</span><span class="p">,</span> <span class="n">word</span><span class="p">))</span>
<span class="n">data</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">Adder</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span></code></pre></div>
</td>
</tr>
<td><strong>Join</strong></td>
<td>
Joins two data sets by creating all pairs of elements that are equal on their keys.
Optionally uses a JoinFunction to turn the pair of elements into a single element.
See <a href="#specifying-keys">keys</a> on how to define join keys.
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># In this case tuple fields are used as keys. </span>
<span class="c"># &quot;0&quot; is the join field on the first tuple</span>
<span class="c"># &quot;1&quot; is the join field on the second tuple.</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">input1</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">input2</span><span class="p">)</span><span class="o">.</span><span class="n">where</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">equal_to</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></code></pre></div>
</td>
<tr>
<td><strong>CoGroup</strong></td>
<td>
<p>The two-dimensional variant of the reduce operation. Groups each input on one or more
fields and then joins the groups. The transformation function is called per pair of groups.
See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data1</span><span class="o">.</span><span class="n">co_group</span><span class="p">(</span><span class="n">data2</span><span class="p">)</span><span class="o">.</span><span class="n">where</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">equal_to</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Cross</strong></td>
<td>
<p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
elements. Optionally uses a CrossFunction to turn the pair of elements into a single
element.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">result</span> <span class="o">=</span> <span class="n">data1</span><span class="o">.</span><span class="n">cross</span><span class="p">(</span><span class="n">data2</span><span class="p">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Produces the union of two data sets.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">data2</span><span class="p">)</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
<p><a href="#top">Back to Top</a></p>
<h2 id="specifying-keys">Specifying Keys</h2>
<p>Some transformations (like Join or CoGroup) require that a key is defined on
its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are
applied.</p>
<p>A DataSet is grouped as</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">reduced</span> <span class="o">=</span> <span class="n">data</span> \
<span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="o">&lt;</span><span class="n">define</span> <span class="n">key</span> <span class="n">here</span><span class="o">&gt;</span><span class="p">)</span> \
<span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="o">&lt;</span><span class="n">do</span> <span class="n">something</span><span class="o">&gt;</span><span class="p">)</span></code></pre></div>
<p>The data model of Flink is not based on key-value pairs. Therefore,
you do not need to physically pack the data set types into keys and
values. Keys are “virtual”: they are defined as functions over the
actual data to guide the grouping operator.</p>
<h3 class="no_toc" id="define-keys-for-tuples">Define keys for Tuples</h3>
<p>The simplest case is grouping a data set of Tuples on one or more
fields of the Tuple:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">reduced</span> <span class="o">=</span> <span class="n">data</span> \
<span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> \
<span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="o">&lt;</span><span class="n">do</span> <span class="n">something</span><span class="o">&gt;</span><span class="p">)</span></code></pre></div>
<p>The data set is grouped on the first field of the tuples.
The group-reduce function will thus receive groups of tuples with
the same value in the first field.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">grouped</span> <span class="o">=</span> <span class="n">data</span> \
<span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span><span class="mi">1</span><span class="p">)</span> \
<span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="o">/*</span><span class="n">do</span> <span class="n">something</span><span class="o">*/</span><span class="p">)</span></code></pre></div>
<p>The data set is grouped on the composite key consisting of the first and the
second fields, therefore the reduce function will receive groups
with the same value for both fields.</p>
<p>A note on nested Tuples: If you have a DataSet with a nested tuple
specifying <code>group_by(&lt;index of tuple&gt;)</code> will cause the system to use the full tuple as a key.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="passing-functions-to-flink">Passing Functions to Flink</h2>
<p>Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">&gt;</span> <span class="mi">5</span><span class="p">)</span></code></pre></div>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">Filter</span><span class="p">(</span><span class="n">FilterFunction</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">filter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">return</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="mi">5</span>
<span class="n">data</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">Filter</span><span class="p">())</span></code></pre></div>
<p>Rich functions allow the use of imported functions, provide access to broadcast-variables,
can be parameterized using <strong>init</strong>(), and are the go-to-option for complex functions.
They are also the only way to define an optional <code>combine</code> function for a reduce operation.</p>
<p>Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return
an iterable, if the operation can return multiple values. (All functions receiving a collector argument)</p>
<p>Flink requires type information at the time when it prepares the program for execution
(when the main method of the program is called). This is done by passing an exemplary
object that has the desired type. This holds also for tuples.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div>
<p>Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.</p>
<p>There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="data-types">Data Types</h2>
<p>Flink’s Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.</p>
<h4 id="tupleslists">Tuples/Lists</h4>
<p>You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain
a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">word_counts</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">((</span><span class="s">&quot;hello&quot;</span><span class="p">,</span> <span class="mi">1</span><span class="p">),</span> <span class="p">(</span><span class="s">&quot;world&quot;</span><span class="p">,</span><span class="mi">2</span><span class="p">))</span>
<span class="n">counts</span> <span class="o">=</span> <span class="n">word_counts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div>
<p>When working with operators that require a Key for grouping or matching records,
Tuples let you simply specify the positions of the fields to be used as key. You can specify more
than one position to use composite keys (see <a href="#transformations">Section Data Transformations</a>).</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">wordCounts</span> \
<span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> \
<span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="n">MyReduceFunction</span><span class="p">())</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="data-sources">Data Sources</h2>
<p>Data sources create the initial data sets, such as from files or from collections.</p>
<p>File-based:</p>
<ul>
<li><code>read_text(path)</code> - Reads files line wise and returns them as Strings.</li>
<li><code>read_csv(path, type)</code> - Parses files of comma (or another char) delimited fields.
Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
types.</li>
</ul>
<p>Collection-based:</p>
<ul>
<li><code>from_elements(*args)</code> - Creates a data set from a Seq. All elements</li>
</ul>
<p><strong>Examples</strong></p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span>
<span class="c"># read text file from local files system</span>
<span class="n">localLiens</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_text</span><span class="p">(</span><span class="s">&quot;file:#/path/to/my/textfile&quot;</span><span class="p">)</span>
<span class="n">read</span> <span class="n">text</span> <span class="nb">file</span> <span class="kn">from</span> <span class="nn">a</span> <span class="nn">HDFS</span> <span class="nn">running</span> <span class="nn">at</span> <span class="nn">nnHost</span><span class="p">:</span><span class="n">nnPort</span>
<span class="n">hdfsLines</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_text</span><span class="p">(</span><span class="s">&quot;hdfs://nnHost:nnPort/path/to/my/textfile&quot;</span><span class="p">)</span>
<span class="n">read</span> <span class="n">a</span> <span class="n">CSV</span> <span class="nb">file</span> <span class="k">with</span> <span class="n">three</span> <span class="n">fields</span>
<span class="n">csvInput</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_csv</span><span class="p">(</span><span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="p">,</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">,</span> <span class="n">DOUBLE</span><span class="p">))</span>
<span class="n">create</span> <span class="n">a</span> <span class="nb">set</span> <span class="kn">from</span> <span class="nn">some</span> <span class="nn">given</span> <span class="nn">elements</span>
<span class="n">values</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="s">&quot;Foo&quot;</span><span class="p">,</span> <span class="s">&quot;bar&quot;</span><span class="p">,</span> <span class="s">&quot;foobar&quot;</span><span class="p">,</span> <span class="s">&quot;fubar&quot;</span><span class="p">)</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="data-sinks">Data Sinks</h2>
<p>Data sinks consume DataSets and are used to store or return them:</p>
<ul>
<li><code>write_text()</code> - Writes elements line-wise as Strings. The Strings are
obtained by calling the <em>str()</em> method of each element.</li>
<li><code>write_csv(...)</code> - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the <em>str()</em> method of the objects.</li>
<li><code>output()</code> - Prints the <em>str()</em> value of each element on the
standard out.</li>
</ul>
<p>A DataSet can be input to multiple operations. Programs can write or print a data set and at the
same time run additional transformations on them.</p>
<p><strong>Examples</strong></p>
<p>Standard data sink methods:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">write</span> <span class="nc">DataSet</span> <span class="n">to</span> <span class="n">a</span> <span class="n">file</span> <span class="n">on</span> <span class="n">the</span> <span class="n">local</span> <span class="n">file</span> <span class="n">system</span>
<span class="n">textData</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">&quot;file:///my/result/on/localFS&quot;</span><span class="o">)</span>
<span class="n">write</span> <span class="nc">DataSet</span> <span class="n">to</span> <span class="n">a</span> <span class="n">file</span> <span class="n">on</span> <span class="n">a</span> <span class="nc">HDFS</span> <span class="k">with</span> <span class="n">a</span> <span class="n">namenode</span> <span class="n">running</span> <span class="n">at</span> <span class="n">nnHost</span><span class="k">:</span><span class="kt">nnPort</span>
<span class="n">textData</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">&quot;hdfs://nnHost:nnPort/my/result/on/localFS&quot;</span><span class="o">)</span>
<span class="n">write</span> <span class="nc">DataSet</span> <span class="n">to</span> <span class="n">a</span> <span class="n">file</span> <span class="n">and</span> <span class="n">overwrite</span> <span class="n">the</span> <span class="n">file</span> <span class="k">if</span> <span class="n">it</span> <span class="n">exists</span>
<span class="n">textData</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">&quot;file:///my/result/on/localFS&quot;</span><span class="o">,</span> <span class="nc">WriteMode</span><span class="o">.</span><span class="nc">OVERWRITE</span><span class="o">)</span>
<span class="n">tuples</span> <span class="n">as</span> <span class="n">lines</span> <span class="k">with</span> <span class="n">pipe</span> <span class="n">as</span> <span class="n">the</span> <span class="n">separator</span> <span class="s">&quot;a|b|c&quot;</span>
<span class="n">values</span><span class="o">.</span><span class="n">write_csv</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">,</span> <span class="n">line_delimiter</span><span class="o">=</span><span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="n">field_delimiter</span><span class="o">=</span><span class="s">&quot;|&quot;</span><span class="o">)</span>
<span class="k">this</span> <span class="n">writes</span> <span class="n">tuples</span> <span class="n">in</span> <span class="n">the</span> <span class="n">text</span> <span class="n">formatting</span> <span class="s">&quot;(a, b, c)&quot;</span><span class="o">,</span> <span class="n">rather</span> <span class="n">than</span> <span class="n">as</span> <span class="nc">CSV</span> <span class="n">lines</span>
<span class="n">values</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">)</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="broadcast-variables">Broadcast Variables</h2>
<p>Broadcast variables allow you to make a data set available to all parallel instances of an
operation, in addition to the regular input of the operation. This is useful for auxiliary data
sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
Collection.</p>
<ul>
<li><strong>Broadcast</strong>: broadcast sets are registered by name via <code>with_broadcast_set(DataSet, String)</code></li>
<li><strong>Access</strong>: accessible via <code>self.context.get_broadcast_variable(String)</code> at the target operator</li>
</ul>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">MapperBcv</span><span class="p">(</span><span class="n">MapFunction</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">map</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="n">factor</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">context</span><span class="o">.</span><span class="n">get_broadcast_variable</span><span class="p">(</span><span class="s">&quot;bcv&quot;</span><span class="p">)[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span>
<span class="k">return</span> <span class="n">value</span> <span class="o">*</span> <span class="n">factor</span>
<span class="c"># 1. The DataSet to be broadcasted</span>
<span class="n">toBroadcast</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">)</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="s">&quot;a&quot;</span><span class="p">,</span> <span class="s">&quot;b&quot;</span><span class="p">)</span>
<span class="c"># 2. Broadcast the DataSet</span>
<span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">MapperBcv</span><span class="p">(),</span> <span class="n">INT</span><span class="p">)</span><span class="o">.</span><span class="n">with_broadcast_set</span><span class="p">(</span><span class="s">&quot;bcv&quot;</span><span class="p">,</span> <span class="n">toBroadcast</span><span class="p">)</span></code></pre></div>
<p>Make sure that the names (<code>bcv</code> in the previous example) match when registering and
accessing broadcasted data sets.</p>
<p><strong>Note</strong>: As the content of broadcast variables is kept in-memory on each node, it should not become
too large. For simpler things like scalar values you can simply parameterize the rich function.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="parallel-execution">Parallel Execution</h2>
<p>This section describes how the parallel execution of programs can be configured in Flink. A Flink
program consists of multiple tasks (operators, data sources, and sinks). A task is split into
several parallel instances for execution and each parallel instance processes a subset of the task’s
input data. The number of parallel instances of a task is called its <em>parallelism</em> or <em>degree of
parallelism (DOP)</em>.</p>
<p>The degree of parallelism of a task can be specified in Flink on different levels.</p>
<h3 id="execution-environment-level">Execution Environment Level</h3>
<p>Flink programs are executed in the context of an <a href="#program-skeleton">execution environmentt</a>. An
execution environment defines a default parallelism for all operators, data sources, and data sinks
it executes. Execution environment parallelism can be overwritten by explicitly configuring the
parallelism of an operator.</p>
<p>The default parallelism of an execution environment can be specified by calling the
<code>set_degree_of_parallelism()</code> method. To execute all operators, data sources, and data sinks of the
<a href="#example-program">WordCount</a> example program with a parallelism of <code>3</code>, set the default parallelism of the
execution environment as follows:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span><span class="p">()</span>
<span class="n">env</span><span class="o">.</span><span class="n">set_degree_of_parallelism</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span>
<span class="n">text</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span> \
<span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> \
<span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">Adder</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span> \
<span class="o">.</span><span class="n">output</span><span class="p">()</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="p">()</span></code></pre></div>
<h3 id="system-level">System Level</h3>
<p>A system-wide default parallelism for all execution environments can be defined by setting the
<code>parallelization.degree.default</code> property in <code>./conf/flink-conf.yaml</code>. See the
<a href="config.html">Configuration</a> documentation for details.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="executing-plans">Executing Plans</h2>
<p>To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder.
use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed
as the first argument, followed by a number of additional python packages, and finally, separated by - additional
arguments that will be fed to the script.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">./</span><span class="nb">bin</span><span class="o">/</span><span class="n">pyflink</span><span class="o">&lt;</span><span class="mi">2</span><span class="o">/</span><span class="mi">3</span><span class="o">&gt;.</span><span class="n">sh</span> <span class="o">&lt;</span><span class="n">Script</span><span class="o">&gt;</span><span class="p">[</span> <span class="o">&lt;</span><span class="n">pathToPackage1</span><span class="o">&gt;</span><span class="p">[</span> <span class="o">&lt;</span><span class="n">pathToPackageX</span><span class="p">]][</span> <span class="o">-</span> <span class="o">&lt;</span><span class="n">param1</span><span class="o">&gt;</span><span class="p">[</span> <span class="o">&lt;</span><span class="n">paramX</span><span class="o">&gt;</span><span class="p">]]</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="debugging">Debugging</h2>
<p>If you are running Flink programs locally, you can debug your program following this guide.
First you have to enable debugging by setting the debug switch in the <code>env.execute(debug=True)</code> call. After
submitting your program, open the jobmanager log file, and look for a line that says
<code>Waiting for external Process : &lt;taskname&gt;. Run python /tmp/flink/executor.py &lt;port&gt;</code> Now open <code>/tmp/flink</code> in your python
IDE and run the <code>executor.py &lt;port&gt;</code>.</p>
<p><a href="#top">Back to top</a></p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>