blob: d57e11765de21cb46404cef7564e904395a4d64c [file] [log] [blame] [view]
---
title: "Python Programming Guide"
is_beta: true
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<a href="#top"></a>
Analysis programs in Flink are regular programs that implement transformations on data sets
(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for
example write the data to (distributed) files, or to standard output (for example the command line
terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
The execution can happen in a local JVM, or on clusters of many machines.
In order to create your own Flink program, we encourage you to start with the
[program skeleton](#program-skeleton) and gradually add your own
[transformations](#transformations). The remaining sections act as references for additional
operations and advanced features.
* This will be replaced by the TOC
{:toc}
Example Program
---------------
The following program is a complete, working example of WordCount. You can copy &amp; paste the code
to run it locally.
{% highlight python %}
from flink.plan.Environment import get_environment
from flink.plan.Constants import INT, STRING
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
if __name__ == "__main__":
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \
.group_by(1) \
.reduce_group(Adder(), (INT, STRING), combinable=True) \
.output()
env.execute(local=True)
{% endhighlight %}
[Back to top](#top)
Program Skeleton
----------------
As we already saw in the example, Flink programs look like regular python
programs with a `if __name__ == "__main__":` block. Each program consists of the same basic parts:
1. Obtain an `Environment`,
2. Load/create the initial data,
3. Specify transformations on this data,
4. Specify where to put the results of your computations, and
5. Execute your program.
We will now give an overview of each of those steps but please refer to the respective sections for
more details.
The `Environment` is the basis for all Flink programs. You can
obtain one using these static methods on class `Environment`:
{% highlight python %}
get_environment()
{% endhighlight %}
For specifying data sources the execution environment has several methods
to read from files. To just read a text file as a sequence of lines, you can use:
{% highlight python %}
env = get_environment()
text = env.read_text("file:///path/to/file")
{% endhighlight %}
This will give you a DataSet on which you can then apply transformations. For
more information on data sources and input formats, please refer to
[Data Sources](#data-sources).
Once you have a DataSet you can apply transformations to create a new
DataSet which you can then write to a file, transform again, or
combine with other DataSets. You apply transformations by calling
methods on DataSet with your own custom transformation function. For example,
a map transformation looks like this:
{% highlight python %}
data.map(lambda x: x*2, INT)
{% endhighlight %}
This will create a new DataSet by doubling every value in the original DataSet.
For more information and a list of all the transformations,
please refer to [Transformations](#transformations).
Once you have a DataSet that needs to be written to disk you can call one
of these methods on DataSet:
{% highlight python %}
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()
{% endhighlight %}
The last method is only useful for developing/debugging on a local machine,
it will output the contents of the DataSet to standard output. (Note that in
a cluster, the result goes to the standard out stream of the cluster nodes and ends
up in the *.out* files of the workers).
The first two do as the name suggests.
Please refer to [Data Sinks](#data-sinks) for more information on writing to files.
Once you specified the complete program you need to call `execute` on
the `Environment`. This will either execute on your local machine or submit your program
for execution on a cluster, depending on how Flink was started. You can force
a local execution by using `execute(local=True)`.
[Back to top](#top)
Project setup
---------------
Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.
The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed.
[Back to top](#top)
Lazy Evaluation
---------------
All Flink programs are executed lazily: When the program's main method is executed, the data loading
and transformations do not happen directly. Rather, each operation is created and added to the
program's plan. The operations are actually executed when one of the `execute()` methods is invoked
on the Environment object. Whether the program is executed locally or on a cluster depends
on the environment of the program.
The lazy evaluation lets you construct sophisticated programs that Flink executes as one
holistically planned unit.
[Back to top](#top)
Transformations
---------------
Data transformations transform one or more DataSets into a new DataSet. Programs can combine
multiple transformations into sophisticated assemblies.
This section gives a brief overview of the available transformations. The [transformations
documentation](dataset_transformations.html) has a full description of all transformations with
examples.
<br />
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element.</p>
{% highlight python %}
data.map(lambda x: x * 2, INT)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements. </p>
{% highlight python %}
data.flat_map(
lambda x,c: [(1,word) for word in line.lower().split() for line in x],
(INT, STRING))
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>MapPartition</strong></td>
<td>
<p>Transforms a parallel partition in a single function call. The function get the partition
as an `Iterator` and can produce an arbitrary number of result values. The number of
elements in each partition depends on the degree-of-parallelism and previous operations.</p>
{% highlight python %}
data.map_partition(lambda x,c: [value * 2 for value in x], INT)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function
returns true.</p>
{% highlight python %}
data.filter(lambda x: x > 1000)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a group of elements into a single element by repeatedly combining two elements
into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
{% highlight python %}
data.reduce(lambda x,y : x + y)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>ReduceGroup</strong></td>
<td>
<p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
full data set, or on a grouped data set.</p>
{% highlight python %}
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator)
collector.collect((count, word))
data.reduce_group(Adder(), (INT, STRING))
{% endhighlight %}
</td>
</tr>
</tr>
<td><strong>Join</strong></td>
<td>
Joins two data sets by creating all pairs of elements that are equal on their keys.
Optionally uses a JoinFunction to turn the pair of elements into a single element.
See <a href="#specifying-keys">keys</a> on how to define join keys.
{% highlight python %}
# In this case tuple fields are used as keys.
# "0" is the join field on the first tuple
# "1" is the join field on the second tuple.
result = input1.join(input2).where(0).equal_to(1)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>CoGroup</strong></td>
<td>
<p>The two-dimensional variant of the reduce operation. Groups each input on one or more
fields and then joins the groups. The transformation function is called per pair of groups.
See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
{% highlight python %}
data1.co_group(data2).where(0).equal_to(1)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Cross</strong></td>
<td>
<p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
elements. Optionally uses a CrossFunction to turn the pair of elements into a single
element.</p>
{% highlight python %}
result = data1.cross(data2)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Produces the union of two data sets.</p>
{% highlight python %}
data.union(data2)
{% endhighlight %}
</td>
</tr>
</tbody>
</table>
[Back to Top](#top)
Specifying Keys
-------------
Some transformations (like Join or CoGroup) require that a key is defined on
its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are
applied.
A DataSet is grouped as
{% highlight python %}
reduced = data \
.group_by(<define key here>) \
.reduce_group(<do something>)
{% endhighlight %}
The data model of Flink is not based on key-value pairs. Therefore,
you do not need to physically pack the data set types into keys and
values. Keys are "virtual": they are defined as functions over the
actual data to guide the grouping operator.
### Define keys for Tuples
{:.no_toc}
The simplest case is grouping a data set of Tuples on one or more
fields of the Tuple:
{% highlight python %}
reduced = data \
.group_by(0) \
.reduce_group(<do something>)
{% endhighlight %}
The data set is grouped on the first field of the tuples.
The group-reduce function will thus receive groups of tuples with
the same value in the first field.
{% highlight python %}
grouped = data \
.group_by(0,1) \
.reduce(/*do something*/)
{% endhighlight %}
The data set is grouped on the composite key consisting of the first and the
second fields, therefore the reduce function will receive groups
with the same value for both fields.
A note on nested Tuples: If you have a DataSet with a nested tuple
specifying `group_by(<index of tuple>)` will cause the system to use the full tuple as a key.
[Back to top](#top)
Passing Functions to Flink
--------------------------
Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.
{% highlight python %}
data.filter(lambda x: x > 5)
{% endhighlight %}
{% highlight python %}
class Filter(FilterFunction):
def filter(self, value):
return value > 5
data.filter(Filter())
{% endhighlight %}
Rich functions allow the use of imported functions, provide access to broadcast-variables,
can be parameterized using __init__(), and are the go-to-option for complex functions.
They are also the only way to define an optional `combine` function for a reduce operation.
Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return
an iterable, if the operation can return multiple values. (All functions receiving a collector argument)
Flink requires type information at the time when it prepares the program for execution
(when the main method of the program is called). This is done by passing an exemplary
object that has the desired type. This holds also for tuples.
{% highlight python %}
(INT, STRING)
{% endhighlight %}
Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.
There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.
[Back to top](#top)
Data Types
----------
Flink's Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.
#### Tuples/Lists
You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain
a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.
{% highlight python %}
word_counts = env.from_elements(("hello", 1), ("world",2))
counts = word_counts.map(lambda x: x[1], INT)
{% endhighlight %}
When working with operators that require a Key for grouping or matching records,
Tuples let you simply specify the positions of the fields to be used as key. You can specify more
than one position to use composite keys (see [Section Data Transformations](#transformations)).
{% highlight python %}
wordCounts \
.group_by(0) \
.reduce(MyReduceFunction())
{% endhighlight %}
[Back to top](#top)
Data Sources
------------
Data sources create the initial data sets, such as from files or from collections.
File-based:
- `read_text(path)` - Reads files line wise and returns them as Strings.
- `read_csv(path, type)` - Parses files of comma (or another char) delimited fields.
Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
types.
Collection-based:
- `from_elements(*args)` - Creates a data set from a Seq. All elements
**Examples**
{% highlight python %}
env = get_environment
# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")
read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
read a CSV file with three fields
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
{% endhighlight %}
[Back to top](#top)
Data Sinks
----------
Data sinks consume DataSets and are used to store or return them:
- `write_text()` - Writes elements line-wise as Strings. The Strings are
obtained by calling the *str()* method of each element.
- `write_csv(...)` - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the *str()* method of the objects.
- `output()` - Prints the *str()* value of each element on the
standard out.
A DataSet can be input to multiple operations. Programs can write or print a data set and at the
same time run additional transformations on them.
**Examples**
Standard data sink methods:
{% highlight scala %}
write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")
write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")
{% endhighlight %}
[Back to top](#top)
Broadcast Variables
-------------------
Broadcast variables allow you to make a data set available to all parallel instances of an
operation, in addition to the regular input of the operation. This is useful for auxiliary data
sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
Collection.
- **Broadcast**: broadcast sets are registered by name via `with_broadcast_set(DataSet, String)`
- **Access**: accessible via `self.context.get_broadcast_variable(String)` at the target operator
{% highlight python %}
class MapperBcv(MapFunction):
def map(self, value):
factor = self.context.get_broadcast_variable("bcv")[0][0]
return value * factor
# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")
# 2. Broadcast the DataSet
data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast)
{% endhighlight %}
Make sure that the names (`bcv` in the previous example) match when registering and
accessing broadcasted data sets.
**Note**: As the content of broadcast variables is kept in-memory on each node, it should not become
too large. For simpler things like scalar values you can simply parameterize the rich function.
[Back to top](#top)
Parallel Execution
------------------
This section describes how the parallel execution of programs can be configured in Flink. A Flink
program consists of multiple tasks (operators, data sources, and sinks). A task is split into
several parallel instances for execution and each parallel instance processes a subset of the task's
input data. The number of parallel instances of a task is called its *parallelism* or *degree of
parallelism (DOP)*.
The degree of parallelism of a task can be specified in Flink on different levels.
### Execution Environment Level
Flink programs are executed in the context of an [execution environmentt](#program-skeleton). An
execution environment defines a default parallelism for all operators, data sources, and data sinks
it executes. Execution environment parallelism can be overwritten by explicitly configuring the
parallelism of an operator.
The default parallelism of an execution environment can be specified by calling the
`set_degree_of_parallelism()` method. To execute all operators, data sources, and data sinks of the
[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the
execution environment as follows:
{% highlight python %}
env = get_environment()
env.set_degree_of_parallelism(3)
text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \
.group_by(1) \
.reduce_group(Adder(), (INT, STRING), combinable=True) \
.output()
env.execute()
{% endhighlight %}
### System Level
A system-wide default parallelism for all execution environments can be defined by setting the
`parallelism.default` property in `./conf/flink-conf.yaml`. See the
[Configuration](config.html) documentation for details.
[Back to top](#top)
Executing Plans
---------------
To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder.
use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed
as the first argument, followed by a number of additional python packages, and finally, separated by - additional
arguments that will be fed to the script.
{% highlight python %}
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
{% endhighlight %}
[Back to top](#top)
Debugging
---------------
If you are running Flink programs locally, you can debug your program following this guide.
First you have to enable debugging by setting the debug switch in the `env.execute(debug=True)` call. After
submitting your program, open the jobmanager log file, and look for a line that says
`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port>` Now open `/tmp/flink` in your python
IDE and run the `executor.py <port>`.
[Back to top](#top)