| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| <!DOCTYPE html> |
| |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| |
| <title>Apache Flink 0.10-SNAPSHOT Documentation: Python Programming Guide</title> |
| |
| <link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon"> |
| |
| <!-- Bootstrap --> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css"> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| <body> |
| |
| |
| |
| |
| |
| |
| <!-- Top navbar. --> |
| <nav class="navbar navbar-default navbar-fixed-top"> |
| <div class="container"> |
| <!-- The logo. --> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <div class="navbar-logo"> |
| <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a> |
| </div> |
| </div><!-- /.navbar-header --> |
| |
| <!-- The navigation links. --> |
| <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> |
| <ul class="nav navbar-nav"> |
| <li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li> |
| |
| <!-- Setup --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li> |
| |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li> |
| <li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li> |
| <li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li> |
| <li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li> |
| <li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li> |
| <li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li> |
| </ul> |
| </li> |
| |
| <!-- Programming Guides --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="scala_shell.html">Interactive Scala Shell</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li> |
| <li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Libraries --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li> |
| <li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Internals --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li> |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Internals</strong></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture & Process Model</a></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction & Serialization</a></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs & Scheduling</a></li> |
| <li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li> |
| </ul> |
| </li> |
| </ul> |
| <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html"> |
| <div class="form-group"> |
| <input type="text" class="form-control" name="q" placeholder="Search all pages"> |
| </div> |
| <button type="submit" class="btn btn-default">Search</button> |
| </form> |
| </div><!-- /.navbar-collapse --> |
| </div><!-- /.container --> |
| </nav> |
| |
| |
| |
| |
| <!-- Main content. --> |
| <div class="container"> |
| |
| |
| <div class="row"> |
| <div class="col-sm-10 col-sm-offset-1"> |
| <h1>Python Programming Guide <span class="badge">Beta</span></h1> |
| |
| |
| |
| <p><a href="#top"></a></p> |
| |
| <p>Analysis programs in Flink are regular programs that implement transformations on data sets |
| (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain |
| sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for |
| example write the data to (distributed) files, or to standard output (for example the command line |
| terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. |
| The execution can happen in a local JVM, or on clusters of many machines.</p> |
| |
| <p>In order to create your own Flink program, we encourage you to start with the |
| <a href="#program-skeleton">program skeleton</a> and gradually add your own |
| <a href="#transformations">transformations</a>. The remaining sections act as references for additional |
| operations and advanced features.</p> |
| |
| <ul id="markdown-toc"> |
| <li><a href="#example-program" id="markdown-toc-example-program">Example Program</a></li> |
| <li><a href="#program-skeleton" id="markdown-toc-program-skeleton">Program Skeleton</a></li> |
| <li><a href="#project-setup" id="markdown-toc-project-setup">Project setup</a></li> |
| <li><a href="#lazy-evaluation" id="markdown-toc-lazy-evaluation">Lazy Evaluation</a></li> |
| <li><a href="#transformations" id="markdown-toc-transformations">Transformations</a></li> |
| <li><a href="#specifying-keys" id="markdown-toc-specifying-keys">Specifying Keys</a></li> |
| <li><a href="#passing-functions-to-flink" id="markdown-toc-passing-functions-to-flink">Passing Functions to Flink</a></li> |
| <li><a href="#data-types" id="markdown-toc-data-types">Data Types</a></li> |
| <li><a href="#data-sources" id="markdown-toc-data-sources">Data Sources</a></li> |
| <li><a href="#data-sinks" id="markdown-toc-data-sinks">Data Sinks</a></li> |
| <li><a href="#broadcast-variables" id="markdown-toc-broadcast-variables">Broadcast Variables</a></li> |
| <li><a href="#parallel-execution" id="markdown-toc-parallel-execution">Parallel Execution</a> <ul> |
| <li><a href="#execution-environment-level" id="markdown-toc-execution-environment-level">Execution Environment Level</a></li> |
| <li><a href="#system-level" id="markdown-toc-system-level">System Level</a></li> |
| </ul> |
| </li> |
| <li><a href="#executing-plans" id="markdown-toc-executing-plans">Executing Plans</a></li> |
| <li><a href="#debugging" id="markdown-toc-debugging">Debugging</a></li> |
| </ul> |
| |
| <h2 id="example-program">Example Program</h2> |
| |
| <p>The following program is a complete, working example of WordCount. You can copy & paste the code |
| to run it locally.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">flink.plan.Environment</span> <span class="kn">import</span> <span class="n">get_environment</span> |
| <span class="kn">from</span> <span class="nn">flink.plan.Constants</span> <span class="kn">import</span> <span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span> |
| <span class="kn">from</span> <span class="nn">flink.functions.GroupReduceFunction</span> <span class="kn">import</span> <span class="n">GroupReduceFunction</span> |
| |
| <span class="k">class</span> <span class="nc">Adder</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> |
| <span class="n">count</span><span class="p">,</span> <span class="n">word</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span> |
| <span class="n">count</span> <span class="o">+=</span> <span class="nb">sum</span><span class="p">([</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">])</span> |
| <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">count</span><span class="p">,</span> <span class="n">word</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="n">__name__</span> <span class="o">==</span> <span class="s">"__main__"</span><span class="p">:</span> |
| <span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span><span class="p">()</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="s">"Who's there?"</span><span class="p">,</span> |
| <span class="s">"I think I hear them. Stand, ho! Who's there?"</span><span class="p">)</span> |
| |
| <span class="n">data</span> \ |
| <span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">c</span><span class="p">:</span> <span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="n">word</span><span class="p">)</span> <span class="k">for</span> <span class="n">word</span> <span class="ow">in</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">()],</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span> \ |
| <span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">Adder</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">output</span><span class="p">()</span> |
| |
| <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">local</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span></code></pre></div> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="program-skeleton">Program Skeleton</h2> |
| |
| <p>As we already saw in the example, Flink programs look like regular python |
| programs with a <code>if __name__ == "__main__":</code> block. Each program consists of the same basic parts:</p> |
| |
| <ol> |
| <li>Obtain an <code>Environment</code>,</li> |
| <li>Load/create the initial data,</li> |
| <li>Specify transformations on this data,</li> |
| <li>Specify where to put the results of your computations, and</li> |
| <li>Execute your program.</li> |
| </ol> |
| |
| <p>We will now give an overview of each of those steps but please refer to the respective sections for |
| more details.</p> |
| |
| <p>The <code>Environment</code> is the basis for all Flink programs. You can |
| obtain one using these static methods on class <code>Environment</code>:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">get_environment</span><span class="p">()</span></code></pre></div> |
| |
| <p>For specifying data sources the execution environment has several methods |
| to read from files. To just read a text file as a sequence of lines, you can use:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span><span class="p">()</span> |
| <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_text</span><span class="p">(</span><span class="s">"file:///path/to/file"</span><span class="p">)</span></code></pre></div> |
| |
| <p>This will give you a DataSet on which you can then apply transformations. For |
| more information on data sources and input formats, please refer to |
| <a href="#data-sources">Data Sources</a>.</p> |
| |
| <p>Once you have a DataSet you can apply transformations to create a new |
| DataSet which you can then write to a file, transform again, or |
| combine with other DataSets. You apply transformations by calling |
| methods on DataSet with your own custom transformation function. For example, |
| a map transformation looks like this:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">*</span><span class="mi">2</span><span class="p">,</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> |
| |
| <p>This will create a new DataSet by doubling every value in the original DataSet. |
| For more information and a list of all the transformations, |
| please refer to <a href="#transformations">Transformations</a>.</p> |
| |
| <p>Once you have a DataSet that needs to be written to disk you can call one |
| of these methods on DataSet:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">write_text</span><span class="p">(</span><span class="s">"<file-path>"</span><span class="p">,</span> <span class="n">WriteMode</span><span class="o">=</span><span class="n">Constants</span><span class="o">.</span><span class="n">NO_OVERWRITE</span><span class="p">)</span> |
| <span class="n">write_csv</span><span class="p">(</span><span class="s">"<file-path>"</span><span class="p">,</span> <span class="n">line_delimiter</span><span class="o">=</span><span class="s">'</span><span class="se">\n</span><span class="s">'</span><span class="p">,</span> <span class="n">field_delimiter</span><span class="o">=</span><span class="s">','</span><span class="p">,</span> <span class="n">write_mode</span><span class="o">=</span><span class="n">Constants</span><span class="o">.</span><span class="n">NO_OVERWRITE</span><span class="p">)</span> |
| <span class="n">output</span><span class="p">()</span></code></pre></div> |
| |
| <p>The last method is only useful for developing/debugging on a local machine, |
| it will output the contents of the DataSet to standard output. (Note that in |
| a cluster, the result goes to the standard out stream of the cluster nodes and ends |
| up in the <em>.out</em> files of the workers). |
| The first two do as the name suggests. |
| Please refer to <a href="#data-sinks">Data Sinks</a> for more information on writing to files.</p> |
| |
| <p>Once you specified the complete program you need to call <code>execute</code> on |
| the <code>Environment</code>. This will either execute on your local machine or submit your program |
| for execution on a cluster, depending on how Flink was started. You can force |
| a local execution by using <code>execute(local=True)</code>.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="project-setup">Project setup</h2> |
| |
| <p>Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.</p> |
| |
| <p>The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="lazy-evaluation">Lazy Evaluation</h2> |
| |
| <p>All Flink programs are executed lazily: When the program’s main method is executed, the data loading |
| and transformations do not happen directly. Rather, each operation is created and added to the |
| program’s plan. The operations are actually executed when one of the <code>execute()</code> methods is invoked |
| on the Environment object. Whether the program is executed locally or on a cluster depends |
| on the environment of the program.</p> |
| |
| <p>The lazy evaluation lets you construct sophisticated programs that Flink executes as one |
| holistically planned unit.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="transformations">Transformations</h2> |
| |
| <p>Data transformations transform one or more DataSets into a new DataSet. Programs can combine |
| multiple transformations into sophisticated assemblies.</p> |
| |
| <p>This section gives a brief overview of the available transformations. The <a href="dataset_transformations.html">transformations |
| documentation</a> has a full description of all transformations with |
| examples.</p> |
| |
| <p><br /></p> |
| |
| <table class="table table-bordered"> |
| <thead> |
| <tr> |
| <th class="text-left" style="width: 20%">Transformation</th> |
| <th class="text-center">Description</th> |
| </tr> |
| </thead> |
| |
| <tbody> |
| <tr> |
| <td><strong>Map</strong></td> |
| <td> |
| <p>Takes one element and produces one element.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">*</span> <span class="mi">2</span><span class="p">,</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>FlatMap</strong></td> |
| <td> |
| <p>Takes one element and produces zero, one, or more elements. </p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[(</span><span class="mi">1</span><span class="p">,</span><span class="n">word</span><span class="p">)</span> <span class="k">for</span> <span class="n">word</span> <span class="ow">in</span> <span class="n">line</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">()</span> <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span> |
| <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>MapPartition</strong></td> |
| <td> |
| <p>Transforms a parallel partition in a single function call. The function get the partition |
| as an `Iterator` and can produce an arbitrary number of result values. The number of |
| elements in each partition depends on the degree-of-parallelism and previous operations.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">map_partition</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="n">value</span> <span class="o">*</span> <span class="mi">2</span> <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>Filter</strong></td> |
| <td> |
| <p>Evaluates a boolean function for each element and retains those for which the function |
| returns true.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">></span> <span class="mi">1000</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>Reduce</strong></td> |
| <td> |
| <p>Combines a group of elements into a single element by repeatedly combining two elements |
| into one. Reduce may be applied on a full data set, or on a grouped data set.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">y</span> <span class="p">:</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>ReduceGroup</strong></td> |
| <td> |
| <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a |
| full data set, or on a grouped data set.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">Adder</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span> |
| <span class="n">count</span><span class="p">,</span> <span class="n">word</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span> |
| <span class="n">count</span> <span class="o">+=</span> <span class="nb">sum</span><span class="p">([</span><span class="n">x</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">)</span> |
| <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">count</span><span class="p">,</span> <span class="n">word</span><span class="p">))</span> |
| |
| <span class="n">data</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">Adder</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| |
| <td><strong>Join</strong></td> |
| <td> |
| Joins two data sets by creating all pairs of elements that are equal on their keys. |
| Optionally uses a JoinFunction to turn the pair of elements into a single element. |
| See <a href="#specifying-keys">keys</a> on how to define join keys. |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># In this case tuple fields are used as keys. </span> |
| <span class="c"># "0" is the join field on the first tuple</span> |
| <span class="c"># "1" is the join field on the second tuple.</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">input1</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">input2</span><span class="p">)</span><span class="o">.</span><span class="n">where</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">equal_to</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| |
| |
| <tr> |
| <td><strong>CoGroup</strong></td> |
| <td> |
| <p>The two-dimensional variant of the reduce operation. Groups each input on one or more |
| fields and then joins the groups. The transformation function is called per pair of groups. |
| See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data1</span><span class="o">.</span><span class="n">co_group</span><span class="p">(</span><span class="n">data2</span><span class="p">)</span><span class="o">.</span><span class="n">where</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">equal_to</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>Cross</strong></td> |
| <td> |
| <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of |
| elements. Optionally uses a CrossFunction to turn the pair of elements into a single |
| element.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">result</span> <span class="o">=</span> <span class="n">data1</span><span class="o">.</span><span class="n">cross</span><span class="p">(</span><span class="n">data2</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| <tr> |
| <td><strong>Union</strong></td> |
| <td> |
| <p>Produces the union of two data sets.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">data2</span><span class="p">)</span></code></pre></div> |
| |
| </td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <p><a href="#top">Back to Top</a></p> |
| |
| <h2 id="specifying-keys">Specifying Keys</h2> |
| |
| <p>Some transformations (like Join or CoGroup) require that a key is defined on |
| its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are |
| applied.</p> |
| |
| <p>A DataSet is grouped as</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">reduced</span> <span class="o">=</span> <span class="n">data</span> \ |
| <span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="o"><</span><span class="n">define</span> <span class="n">key</span> <span class="n">here</span><span class="o">></span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="o"><</span><span class="n">do</span> <span class="n">something</span><span class="o">></span><span class="p">)</span></code></pre></div> |
| |
| <p>The data model of Flink is not based on key-value pairs. Therefore, |
| you do not need to physically pack the data set types into keys and |
| values. Keys are “virtual”: they are defined as functions over the |
| actual data to guide the grouping operator.</p> |
| |
| <h3 class="no_toc" id="define-keys-for-tuples">Define keys for Tuples</h3> |
| |
| <p>The simplest case is grouping a data set of Tuples on one or more |
| fields of the Tuple:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">reduced</span> <span class="o">=</span> <span class="n">data</span> \ |
| <span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="o"><</span><span class="n">do</span> <span class="n">something</span><span class="o">></span><span class="p">)</span></code></pre></div> |
| |
| <p>The data set is grouped on the first field of the tuples. |
| The group-reduce function will thus receive groups of tuples with |
| the same value in the first field.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">grouped</span> <span class="o">=</span> <span class="n">data</span> \ |
| <span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span><span class="mi">1</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="o">/*</span><span class="n">do</span> <span class="n">something</span><span class="o">*/</span><span class="p">)</span></code></pre></div> |
| |
| <p>The data set is grouped on the composite key consisting of the first and the |
| second fields, therefore the reduce function will receive groups |
| with the same value for both fields.</p> |
| |
| <p>A note on nested Tuples: If you have a DataSet with a nested tuple |
| specifying <code>group_by(<index of tuple>)</code> will cause the system to use the full tuple as a key.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="passing-functions-to-flink">Passing Functions to Flink</h2> |
| |
| <p>Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">></span> <span class="mi">5</span><span class="p">)</span></code></pre></div> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">Filter</span><span class="p">(</span><span class="n">FilterFunction</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">filter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">value</span> <span class="o">></span> <span class="mi">5</span> |
| |
| <span class="n">data</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">Filter</span><span class="p">())</span></code></pre></div> |
| |
| <p>Rich functions allow the use of imported functions, provide access to broadcast-variables, |
| can be parameterized using <strong>init</strong>(), and are the go-to-option for complex functions. |
| They are also the only way to define an optional <code>combine</code> function for a reduce operation.</p> |
| |
| <p>Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return |
| an iterable, if the operation can return multiple values. (All functions receiving a collector argument)</p> |
| |
| <p>Flink requires type information at the time when it prepares the program for execution |
| (when the main method of the program is called). This is done by passing an exemplary |
| object that has the desired type. This holds also for tuples.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div> |
| |
| <p>Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.</p> |
| |
| <p>There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="data-types">Data Types</h2> |
| |
| <p>Flink’s Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.</p> |
| |
| <h4 id="tupleslists">Tuples/Lists</h4> |
| |
| <p>You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain |
| a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">word_counts</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">((</span><span class="s">"hello"</span><span class="p">,</span> <span class="mi">1</span><span class="p">),</span> <span class="p">(</span><span class="s">"world"</span><span class="p">,</span><span class="mi">2</span><span class="p">))</span> |
| |
| <span class="n">counts</span> <span class="o">=</span> <span class="n">word_counts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div> |
| |
| <p>When working with operators that require a Key for grouping or matching records, |
| Tuples let you simply specify the positions of the fields to be used as key. You can specify more |
| than one position to use composite keys (see <a href="#transformations">Section Data Transformations</a>).</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">wordCounts</span> \ |
| <span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="n">MyReduceFunction</span><span class="p">())</span></code></pre></div> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="data-sources">Data Sources</h2> |
| |
| <p>Data sources create the initial data sets, such as from files or from collections.</p> |
| |
| <p>File-based:</p> |
| |
| <ul> |
| <li><code>read_text(path)</code> - Reads files line wise and returns them as Strings.</li> |
| <li><code>read_csv(path, type)</code> - Parses files of comma (or another char) delimited fields. |
| Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field |
| types.</li> |
| </ul> |
| |
| <p>Collection-based:</p> |
| |
| <ul> |
| <li><code>from_elements(*args)</code> - Creates a data set from a Seq. All elements</li> |
| </ul> |
| |
| <p><strong>Examples</strong></p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span> |
| |
| <span class="c"># read text file from local files system</span> |
| <span class="n">localLiens</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_text</span><span class="p">(</span><span class="s">"file:#/path/to/my/textfile"</span><span class="p">)</span> |
| |
| <span class="n">read</span> <span class="n">text</span> <span class="nb">file</span> <span class="kn">from</span> <span class="nn">a</span> <span class="nn">HDFS</span> <span class="nn">running</span> <span class="nn">at</span> <span class="nn">nnHost</span><span class="p">:</span><span class="n">nnPort</span> |
| <span class="n">hdfsLines</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_text</span><span class="p">(</span><span class="s">"hdfs://nnHost:nnPort/path/to/my/textfile"</span><span class="p">)</span> |
| |
| <span class="n">read</span> <span class="n">a</span> <span class="n">CSV</span> <span class="nb">file</span> <span class="k">with</span> <span class="n">three</span> <span class="n">fields</span> |
| <span class="n">csvInput</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">read_csv</span><span class="p">(</span><span class="s">"hdfs:///the/CSV/file"</span><span class="p">,</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">,</span> <span class="n">DOUBLE</span><span class="p">))</span> |
| |
| <span class="n">create</span> <span class="n">a</span> <span class="nb">set</span> <span class="kn">from</span> <span class="nn">some</span> <span class="nn">given</span> <span class="nn">elements</span> |
| <span class="n">values</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="s">"Foo"</span><span class="p">,</span> <span class="s">"bar"</span><span class="p">,</span> <span class="s">"foobar"</span><span class="p">,</span> <span class="s">"fubar"</span><span class="p">)</span></code></pre></div> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="data-sinks">Data Sinks</h2> |
| |
| <p>Data sinks consume DataSets and are used to store or return them:</p> |
| |
| <ul> |
| <li><code>write_text()</code> - Writes elements line-wise as Strings. The Strings are |
| obtained by calling the <em>str()</em> method of each element.</li> |
| <li><code>write_csv(...)</code> - Writes tuples as comma-separated value files. Row and field |
| delimiters are configurable. The value for each field comes from the <em>str()</em> method of the objects.</li> |
| <li><code>output()</code> - Prints the <em>str()</em> value of each element on the |
| standard out.</li> |
| </ul> |
| |
| <p>A DataSet can be input to multiple operations. Programs can write or print a data set and at the |
| same time run additional transformations on them.</p> |
| |
| <p><strong>Examples</strong></p> |
| |
| <p>Standard data sink methods:</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">write</span> <span class="nc">DataSet</span> <span class="n">to</span> <span class="n">a</span> <span class="n">file</span> <span class="n">on</span> <span class="n">the</span> <span class="n">local</span> <span class="n">file</span> <span class="n">system</span> |
| <span class="n">textData</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">"file:///my/result/on/localFS"</span><span class="o">)</span> |
| |
| <span class="n">write</span> <span class="nc">DataSet</span> <span class="n">to</span> <span class="n">a</span> <span class="n">file</span> <span class="n">on</span> <span class="n">a</span> <span class="nc">HDFS</span> <span class="k">with</span> <span class="n">a</span> <span class="n">namenode</span> <span class="n">running</span> <span class="n">at</span> <span class="n">nnHost</span><span class="k">:</span><span class="kt">nnPort</span> |
| <span class="n">textData</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">"hdfs://nnHost:nnPort/my/result/on/localFS"</span><span class="o">)</span> |
| |
| <span class="n">write</span> <span class="nc">DataSet</span> <span class="n">to</span> <span class="n">a</span> <span class="n">file</span> <span class="n">and</span> <span class="n">overwrite</span> <span class="n">the</span> <span class="n">file</span> <span class="k">if</span> <span class="n">it</span> <span class="n">exists</span> |
| <span class="n">textData</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">"file:///my/result/on/localFS"</span><span class="o">,</span> <span class="nc">WriteMode</span><span class="o">.</span><span class="nc">OVERWRITE</span><span class="o">)</span> |
| |
| <span class="n">tuples</span> <span class="n">as</span> <span class="n">lines</span> <span class="k">with</span> <span class="n">pipe</span> <span class="n">as</span> <span class="n">the</span> <span class="n">separator</span> <span class="s">"a|b|c"</span> |
| <span class="n">values</span><span class="o">.</span><span class="n">write_csv</span><span class="o">(</span><span class="s">"file:///path/to/the/result/file"</span><span class="o">,</span> <span class="n">line_delimiter</span><span class="o">=</span><span class="s">"\n"</span><span class="o">,</span> <span class="n">field_delimiter</span><span class="o">=</span><span class="s">"|"</span><span class="o">)</span> |
| |
| <span class="k">this</span> <span class="n">writes</span> <span class="n">tuples</span> <span class="n">in</span> <span class="n">the</span> <span class="n">text</span> <span class="n">formatting</span> <span class="s">"(a, b, c)"</span><span class="o">,</span> <span class="n">rather</span> <span class="n">than</span> <span class="n">as</span> <span class="nc">CSV</span> <span class="n">lines</span> |
| <span class="n">values</span><span class="o">.</span><span class="n">write_text</span><span class="o">(</span><span class="s">"file:///path/to/the/result/file"</span><span class="o">)</span></code></pre></div> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="broadcast-variables">Broadcast Variables</h2> |
| |
| <p>Broadcast variables allow you to make a data set available to all parallel instances of an |
| operation, in addition to the regular input of the operation. This is useful for auxiliary data |
| sets, or data-dependent parameterization. The data set will then be accessible at the operator as a |
| Collection.</p> |
| |
| <ul> |
| <li><strong>Broadcast</strong>: broadcast sets are registered by name via <code>with_broadcast_set(DataSet, String)</code></li> |
| <li><strong>Access</strong>: accessible via <code>self.context.get_broadcast_variable(String)</code> at the target operator</li> |
| </ul> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">MapperBcv</span><span class="p">(</span><span class="n">MapFunction</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">map</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="n">factor</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">context</span><span class="o">.</span><span class="n">get_broadcast_variable</span><span class="p">(</span><span class="s">"bcv"</span><span class="p">)[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">value</span> <span class="o">*</span> <span class="n">factor</span> |
| |
| <span class="c"># 1. The DataSet to be broadcasted</span> |
| <span class="n">toBroadcast</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">)</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_elements</span><span class="p">(</span><span class="s">"a"</span><span class="p">,</span> <span class="s">"b"</span><span class="p">)</span> |
| |
| <span class="c"># 2. Broadcast the DataSet</span> |
| <span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">MapperBcv</span><span class="p">(),</span> <span class="n">INT</span><span class="p">)</span><span class="o">.</span><span class="n">with_broadcast_set</span><span class="p">(</span><span class="s">"bcv"</span><span class="p">,</span> <span class="n">toBroadcast</span><span class="p">)</span></code></pre></div> |
| |
| <p>Make sure that the names (<code>bcv</code> in the previous example) match when registering and |
| accessing broadcasted data sets.</p> |
| |
| <p><strong>Note</strong>: As the content of broadcast variables is kept in-memory on each node, it should not become |
| too large. For simpler things like scalar values you can simply parameterize the rich function.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="parallel-execution">Parallel Execution</h2> |
| |
| <p>This section describes how the parallel execution of programs can be configured in Flink. A Flink |
| program consists of multiple tasks (operators, data sources, and sinks). A task is split into |
| several parallel instances for execution and each parallel instance processes a subset of the task’s |
| input data. The number of parallel instances of a task is called its <em>parallelism</em> or <em>degree of |
| parallelism (DOP)</em>.</p> |
| |
| <p>The degree of parallelism of a task can be specified in Flink on different levels.</p> |
| |
| <h3 id="execution-environment-level">Execution Environment Level</h3> |
| |
| <p>Flink programs are executed in the context of an <a href="#program-skeleton">execution environmentt</a>. An |
| execution environment defines a default parallelism for all operators, data sources, and data sinks |
| it executes. Execution environment parallelism can be overwritten by explicitly configuring the |
| parallelism of an operator.</p> |
| |
| <p>The default parallelism of an execution environment can be specified by calling the |
| <code>set_degree_of_parallelism()</code> method. To execute all operators, data sources, and data sinks of the |
| <a href="#example-program">WordCount</a> example program with a parallelism of <code>3</code>, set the default parallelism of the |
| execution environment as follows:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">env</span> <span class="o">=</span> <span class="n">get_environment</span><span class="p">()</span> |
| <span class="n">env</span><span class="o">.</span><span class="n">set_degree_of_parallelism</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span> |
| |
| <span class="n">text</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">))</span> \ |
| <span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">Adder</span><span class="p">(),</span> <span class="p">(</span><span class="n">INT</span><span class="p">,</span> <span class="n">STRING</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">output</span><span class="p">()</span> |
| |
| <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="p">()</span></code></pre></div> |
| |
| <h3 id="system-level">System Level</h3> |
| |
| <p>A system-wide default parallelism for all execution environments can be defined by setting the |
| <code>parallelization.degree.default</code> property in <code>./conf/flink-conf.yaml</code>. See the |
| <a href="config.html">Configuration</a> documentation for details.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="executing-plans">Executing Plans</h2> |
| |
| <p>To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. |
| use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed |
| as the first argument, followed by a number of additional python packages, and finally, separated by - additional |
| arguments that will be fed to the script.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">./</span><span class="nb">bin</span><span class="o">/</span><span class="n">pyflink</span><span class="o"><</span><span class="mi">2</span><span class="o">/</span><span class="mi">3</span><span class="o">>.</span><span class="n">sh</span> <span class="o"><</span><span class="n">Script</span><span class="o">></span><span class="p">[</span> <span class="o"><</span><span class="n">pathToPackage1</span><span class="o">></span><span class="p">[</span> <span class="o"><</span><span class="n">pathToPackageX</span><span class="p">]][</span> <span class="o">-</span> <span class="o"><</span><span class="n">param1</span><span class="o">></span><span class="p">[</span> <span class="o"><</span><span class="n">paramX</span><span class="o">></span><span class="p">]]</span></code></pre></div> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| <h2 id="debugging">Debugging</h2> |
| |
| <p>If you are running Flink programs locally, you can debug your program following this guide. |
| First you have to enable debugging by setting the debug switch in the <code>env.execute(debug=True)</code> call. After |
| submitting your program, open the jobmanager log file, and look for a line that says |
| <code>Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port></code> Now open <code>/tmp/flink</code> in your python |
| IDE and run the <code>executor.py <port></code>.</p> |
| |
| <p><a href="#top">Back to top</a></p> |
| |
| </div> |
| |
| <div class="col-sm-10 col-sm-offset-1"> |
| <!-- Disqus thread and some vertical offset --> |
| <div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div> |
| </div> |
| </div> |
| |
| </div><!-- /.container --> |
| |
| <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script> |
| <!-- Include all compiled plugins (below), or include individual files as needed --> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script> |
| <script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script> |
| |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-52545728-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| |
| <!-- Disqus --> |
| <script type="text/javascript"> |
| var disqus_shortname = 'stratosphere-eu'; |
| (function() { |
| var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; |
| dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; |
| (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); |
| })(); |
| </script> |
| </body> |
| </html> |