| --- |
| title: "Python Programming Guide" |
| is_beta: true |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| <a href="#top"></a> |
| |
| Analysis programs in Flink are regular programs that implement transformations on data sets |
| (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain |
| sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for |
| example write the data to (distributed) files, or to standard output (for example the command line |
| terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. |
| The execution can happen in a local JVM, or on clusters of many machines. |
| |
| In order to create your own Flink program, we encourage you to start with the |
| [program skeleton](#program-skeleton) and gradually add your own |
| [transformations](#transformations). The remaining sections act as references for additional |
| operations and advanced features. |
| |
| * This will be replaced by the TOC |
| {:toc} |
| |
| Example Program |
| --------------- |
| |
| The following program is a complete, working example of WordCount. You can copy & paste the code |
| to run it locally. |
| |
| {% highlight python %} |
| from flink.plan.Environment import get_environment |
| from flink.plan.Constants import INT, STRING |
| from flink.functions.GroupReduceFunction import GroupReduceFunction |
| |
| class Adder(GroupReduceFunction): |
| def reduce(self, iterator, collector): |
| count, word = iterator.next() |
| count += sum([x[0] for x in iterator]) |
| collector.collect((count, word)) |
| |
| if __name__ == "__main__": |
| env = get_environment() |
| data = env.from_elements("Who's there?", |
| "I think I hear them. Stand, ho! Who's there?") |
| |
| data \ |
| .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \ |
| .group_by(1) \ |
| .reduce_group(Adder(), (INT, STRING), combinable=True) \ |
| .output() |
| |
| env.execute(local=True) |
| {% endhighlight %} |
| |
| [Back to top](#top) |
| |
| Program Skeleton |
| ---------------- |
| |
| As we already saw in the example, Flink programs look like regular python |
| programs with a `if __name__ == "__main__":` block. Each program consists of the same basic parts: |
| |
| 1. Obtain an `Environment`, |
| 2. Load/create the initial data, |
| 3. Specify transformations on this data, |
| 4. Specify where to put the results of your computations, and |
| 5. Execute your program. |
| |
| We will now give an overview of each of those steps but please refer to the respective sections for |
| more details. |
| |
| |
| The `Environment` is the basis for all Flink programs. You can |
| obtain one using these static methods on class `Environment`: |
| |
| {% highlight python %} |
| get_environment() |
| {% endhighlight %} |
| |
| For specifying data sources the execution environment has several methods |
| to read from files. To just read a text file as a sequence of lines, you can use: |
| |
| {% highlight python %} |
| env = get_environment() |
| text = env.read_text("file:///path/to/file") |
| {% endhighlight %} |
| |
| This will give you a DataSet on which you can then apply transformations. For |
| more information on data sources and input formats, please refer to |
| [Data Sources](#data-sources). |
| |
| Once you have a DataSet you can apply transformations to create a new |
| DataSet which you can then write to a file, transform again, or |
| combine with other DataSets. You apply transformations by calling |
| methods on DataSet with your own custom transformation function. For example, |
| a map transformation looks like this: |
| |
| {% highlight python %} |
| data.map(lambda x: x*2, INT) |
| {% endhighlight %} |
| |
| This will create a new DataSet by doubling every value in the original DataSet. |
| For more information and a list of all the transformations, |
| please refer to [Transformations](#transformations). |
| |
| Once you have a DataSet that needs to be written to disk you can call one |
| of these methods on DataSet: |
| |
| {% highlight python %} |
| data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE) |
| write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE) |
| output() |
| {% endhighlight %} |
| |
| The last method is only useful for developing/debugging on a local machine, |
| it will output the contents of the DataSet to standard output. (Note that in |
| a cluster, the result goes to the standard out stream of the cluster nodes and ends |
| up in the *.out* files of the workers). |
| The first two do as the name suggests. |
| Please refer to [Data Sinks](#data-sinks) for more information on writing to files. |
| |
| Once you specified the complete program you need to call `execute` on |
| the `Environment`. This will either execute on your local machine or submit your program |
| for execution on a cluster, depending on how Flink was started. You can force |
| a local execution by using `execute(local=True)`. |
| |
| [Back to top](#top) |
| |
| Project setup |
| --------------- |
| |
| Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job. |
| |
| The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed. |
| |
| [Back to top](#top) |
| |
| Lazy Evaluation |
| --------------- |
| |
| All Flink programs are executed lazily: When the program's main method is executed, the data loading |
| and transformations do not happen directly. Rather, each operation is created and added to the |
| program's plan. The operations are actually executed when one of the `execute()` methods is invoked |
| on the Environment object. Whether the program is executed locally or on a cluster depends |
| on the environment of the program. |
| |
| The lazy evaluation lets you construct sophisticated programs that Flink executes as one |
| holistically planned unit. |
| |
| [Back to top](#top) |
| |
| |
| Transformations |
| --------------- |
| |
| Data transformations transform one or more DataSets into a new DataSet. Programs can combine |
| multiple transformations into sophisticated assemblies. |
| |
| This section gives a brief overview of the available transformations. The [transformations |
| documentation](dataset_transformations.html) has a full description of all transformations with |
| examples. |
| |
| <br /> |
| |
| <table class="table table-bordered"> |
| <thead> |
| <tr> |
| <th class="text-left" style="width: 20%">Transformation</th> |
| <th class="text-center">Description</th> |
| </tr> |
| </thead> |
| |
| <tbody> |
| <tr> |
| <td><strong>Map</strong></td> |
| <td> |
| <p>Takes one element and produces one element.</p> |
| {% highlight python %} |
| data.map(lambda x: x * 2, INT) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>FlatMap</strong></td> |
| <td> |
| <p>Takes one element and produces zero, one, or more elements. </p> |
| {% highlight python %} |
| data.flat_map( |
| lambda x,c: [(1,word) for word in line.lower().split() for line in x], |
| (INT, STRING)) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>MapPartition</strong></td> |
| <td> |
| <p>Transforms a parallel partition in a single function call. The function get the partition |
| as an `Iterator` and can produce an arbitrary number of result values. The number of |
| elements in each partition depends on the degree-of-parallelism and previous operations.</p> |
| {% highlight python %} |
| data.map_partition(lambda x,c: [value * 2 for value in x], INT) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>Filter</strong></td> |
| <td> |
| <p>Evaluates a boolean function for each element and retains those for which the function |
| returns true.</p> |
| {% highlight python %} |
| data.filter(lambda x: x > 1000) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>Reduce</strong></td> |
| <td> |
| <p>Combines a group of elements into a single element by repeatedly combining two elements |
| into one. Reduce may be applied on a full data set, or on a grouped data set.</p> |
| {% highlight python %} |
| data.reduce(lambda x,y : x + y) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>ReduceGroup</strong></td> |
| <td> |
| <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a |
| full data set, or on a grouped data set.</p> |
| {% highlight python %} |
| class Adder(GroupReduceFunction): |
| def reduce(self, iterator, collector): |
| count, word = iterator.next() |
| count += sum([x[0] for x in iterator) |
| collector.collect((count, word)) |
| |
| data.reduce_group(Adder(), (INT, STRING)) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| </tr> |
| <td><strong>Join</strong></td> |
| <td> |
| Joins two data sets by creating all pairs of elements that are equal on their keys. |
| Optionally uses a JoinFunction to turn the pair of elements into a single element. |
| See <a href="#specifying-keys">keys</a> on how to define join keys. |
| {% highlight python %} |
| # In this case tuple fields are used as keys. |
| # "0" is the join field on the first tuple |
| # "1" is the join field on the second tuple. |
| result = input1.join(input2).where(0).equal_to(1) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>CoGroup</strong></td> |
| <td> |
| <p>The two-dimensional variant of the reduce operation. Groups each input on one or more |
| fields and then joins the groups. The transformation function is called per pair of groups. |
| See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p> |
| {% highlight python %} |
| data1.co_group(data2).where(0).equal_to(1) |
| {% endhighlight %} |
| </td> |
| </tr> |
| |
| <tr> |
| <td><strong>Cross</strong></td> |
| <td> |
| <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of |
| elements. Optionally uses a CrossFunction to turn the pair of elements into a single |
| element.</p> |
| {% highlight python %} |
| result = data1.cross(data2) |
| {% endhighlight %} |
| </td> |
| </tr> |
| <tr> |
| <td><strong>Union</strong></td> |
| <td> |
| <p>Produces the union of two data sets.</p> |
| {% highlight python %} |
| data.union(data2) |
| {% endhighlight %} |
| </td> |
| </tr> |
| </tbody> |
| </table> |
| |
| [Back to Top](#top) |
| |
| |
| Specifying Keys |
| ------------- |
| |
| Some transformations (like Join or CoGroup) require that a key is defined on |
| its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are |
| applied. |
| |
| A DataSet is grouped as |
| {% highlight python %} |
| reduced = data \ |
| .group_by(<define key here>) \ |
| .reduce_group(<do something>) |
| {% endhighlight %} |
| |
| The data model of Flink is not based on key-value pairs. Therefore, |
| you do not need to physically pack the data set types into keys and |
| values. Keys are "virtual": they are defined as functions over the |
| actual data to guide the grouping operator. |
| |
| ### Define keys for Tuples |
| {:.no_toc} |
| |
| The simplest case is grouping a data set of Tuples on one or more |
| fields of the Tuple: |
| {% highlight python %} |
| reduced = data \ |
| .group_by(0) \ |
| .reduce_group(<do something>) |
| {% endhighlight %} |
| |
| The data set is grouped on the first field of the tuples. |
| The group-reduce function will thus receive groups of tuples with |
| the same value in the first field. |
| |
| {% highlight python %} |
| grouped = data \ |
| .group_by(0,1) \ |
| .reduce(/*do something*/) |
| {% endhighlight %} |
| |
| The data set is grouped on the composite key consisting of the first and the |
| second fields, therefore the reduce function will receive groups |
| with the same value for both fields. |
| |
| A note on nested Tuples: If you have a DataSet with a nested tuple |
| specifying `group_by(<index of tuple>)` will cause the system to use the full tuple as a key. |
| |
| [Back to top](#top) |
| |
| |
| Passing Functions to Flink |
| -------------------------- |
| |
| Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments. |
| |
| {% highlight python %} |
| data.filter(lambda x: x > 5) |
| {% endhighlight %} |
| |
| {% highlight python %} |
| class Filter(FilterFunction): |
| def filter(self, value): |
| return value > 5 |
| |
| data.filter(Filter()) |
| {% endhighlight %} |
| |
| Rich functions allow the use of imported functions, provide access to broadcast-variables, |
| can be parameterized using __init__(), and are the go-to-option for complex functions. |
| They are also the only way to define an optional `combine` function for a reduce operation. |
| |
| Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return |
| an iterable, if the operation can return multiple values. (All functions receiving a collector argument) |
| |
| Flink requires type information at the time when it prepares the program for execution |
| (when the main method of the program is called). This is done by passing an exemplary |
| object that has the desired type. This holds also for tuples. |
| |
| {% highlight python %} |
| (INT, STRING) |
| {% endhighlight %} |
| |
| Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required. |
| |
| There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion. |
| |
| [Back to top](#top) |
| |
| Data Types |
| ---------- |
| |
| Flink's Python API currently only supports primitive python types (int, float, bool, string) and byte arrays. |
| |
| #### Tuples/Lists |
| |
| You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain |
| a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples. |
| |
| {% highlight python %} |
| word_counts = env.from_elements(("hello", 1), ("world",2)) |
| |
| counts = word_counts.map(lambda x: x[1], INT) |
| {% endhighlight %} |
| |
| When working with operators that require a Key for grouping or matching records, |
| Tuples let you simply specify the positions of the fields to be used as key. You can specify more |
| than one position to use composite keys (see [Section Data Transformations](#transformations)). |
| |
| {% highlight python %} |
| wordCounts \ |
| .group_by(0) \ |
| .reduce(MyReduceFunction()) |
| {% endhighlight %} |
| |
| [Back to top](#top) |
| |
| Data Sources |
| ------------ |
| |
| Data sources create the initial data sets, such as from files or from collections. |
| |
| File-based: |
| |
| - `read_text(path)` - Reads files line wise and returns them as Strings. |
| - `read_csv(path, type)` - Parses files of comma (or another char) delimited fields. |
| Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field |
| types. |
| |
| Collection-based: |
| |
| - `from_elements(*args)` - Creates a data set from a Seq. All elements |
| |
| **Examples** |
| |
| {% highlight python %} |
| env = get_environment |
| |
| # read text file from local files system |
| localLiens = env.read_text("file:#/path/to/my/textfile") |
| |
| read text file from a HDFS running at nnHost:nnPort |
| hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile") |
| |
| read a CSV file with three fields |
| csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE)) |
| |
| create a set from some given elements |
| values = env.from_elements("Foo", "bar", "foobar", "fubar") |
| {% endhighlight %} |
| |
| [Back to top](#top) |
| |
| Data Sinks |
| ---------- |
| |
| Data sinks consume DataSets and are used to store or return them: |
| |
| - `write_text()` - Writes elements line-wise as Strings. The Strings are |
| obtained by calling the *str()* method of each element. |
| - `write_csv(...)` - Writes tuples as comma-separated value files. Row and field |
| delimiters are configurable. The value for each field comes from the *str()* method of the objects. |
| - `output()` - Prints the *str()* value of each element on the |
| standard out. |
| |
| A DataSet can be input to multiple operations. Programs can write or print a data set and at the |
| same time run additional transformations on them. |
| |
| **Examples** |
| |
| Standard data sink methods: |
| |
| {% highlight scala %} |
| write DataSet to a file on the local file system |
| textData.write_text("file:///my/result/on/localFS") |
| |
| write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort |
| textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS") |
| |
| write DataSet to a file and overwrite the file if it exists |
| textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE) |
| |
| tuples as lines with pipe as the separator "a|b|c" |
| values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|") |
| |
| this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines |
| values.write_text("file:///path/to/the/result/file") |
| {% endhighlight %} |
| |
| [Back to top](#top) |
| |
| Broadcast Variables |
| ------------------- |
| |
| Broadcast variables allow you to make a data set available to all parallel instances of an |
| operation, in addition to the regular input of the operation. This is useful for auxiliary data |
| sets, or data-dependent parameterization. The data set will then be accessible at the operator as a |
| Collection. |
| |
| - **Broadcast**: broadcast sets are registered by name via `with_broadcast_set(DataSet, String)` |
| - **Access**: accessible via `self.context.get_broadcast_variable(String)` at the target operator |
| |
| {% highlight python %} |
| class MapperBcv(MapFunction): |
| def map(self, value): |
| factor = self.context.get_broadcast_variable("bcv")[0][0] |
| return value * factor |
| |
| # 1. The DataSet to be broadcasted |
| toBroadcast = env.from_elements(1, 2, 3) |
| data = env.from_elements("a", "b") |
| |
| # 2. Broadcast the DataSet |
| data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) |
| {% endhighlight %} |
| |
| Make sure that the names (`bcv` in the previous example) match when registering and |
| accessing broadcasted data sets. |
| |
| **Note**: As the content of broadcast variables is kept in-memory on each node, it should not become |
| too large. For simpler things like scalar values you can simply parameterize the rich function. |
| |
| [Back to top](#top) |
| |
| Parallel Execution |
| ------------------ |
| |
| This section describes how the parallel execution of programs can be configured in Flink. A Flink |
| program consists of multiple tasks (operators, data sources, and sinks). A task is split into |
| several parallel instances for execution and each parallel instance processes a subset of the task's |
| input data. The number of parallel instances of a task is called its *parallelism* or *degree of |
| parallelism (DOP)*. |
| |
| The degree of parallelism of a task can be specified in Flink on different levels. |
| |
| ### Execution Environment Level |
| |
| Flink programs are executed in the context of an [execution environmentt](#program-skeleton). An |
| execution environment defines a default parallelism for all operators, data sources, and data sinks |
| it executes. Execution environment parallelism can be overwritten by explicitly configuring the |
| parallelism of an operator. |
| |
| The default parallelism of an execution environment can be specified by calling the |
| `set_degree_of_parallelism()` method. To execute all operators, data sources, and data sinks of the |
| [WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the |
| execution environment as follows: |
| |
| {% highlight python %} |
| env = get_environment() |
| env.set_degree_of_parallelism(3) |
| |
| text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \ |
| .group_by(1) \ |
| .reduce_group(Adder(), (INT, STRING), combinable=True) \ |
| .output() |
| |
| env.execute() |
| {% endhighlight %} |
| |
| ### System Level |
| |
| A system-wide default parallelism for all execution environments can be defined by setting the |
| `parallelism.default` property in `./conf/flink-conf.yaml`. See the |
| [Configuration](config.html) documentation for details. |
| |
| [Back to top](#top) |
| |
| Executing Plans |
| --------------- |
| |
| To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. |
| use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed |
| as the first argument, followed by a number of additional python packages, and finally, separated by - additional |
| arguments that will be fed to the script. |
| |
| {% highlight python %} |
| ./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] |
| {% endhighlight %} |
| |
| [Back to top](#top) |
| |
| Debugging |
| --------------- |
| |
| If you are running Flink programs locally, you can debug your program following this guide. |
| First you have to enable debugging by setting the debug switch in the `env.execute(debug=True)` call. After |
| submitting your program, open the jobmanager log file, and look for a line that says |
| `Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port>` Now open `/tmp/flink` in your python |
| IDE and run the `executor.py <port>`. |
| |
| [Back to top](#top) |