blob: d6d53a9e76af6f023a3669df6e67b1bdfccf4e66 [file] [log] [blame] [view]
---
title: "Scala API Programming Guide"
---
<section id="top">
Scala Programming Guide
=======================
This guide explains how to develop Flink programs with the Scala
programming interface.
Here we will look at the general structure of a Scala job. You will learn how to
write data sources, data sinks, and operators to create data flows that can be
executed using the Flink system.
Writing Scala jobs requires an understanding of Scala, there is excellent
documentation available [here](http://scala-lang.org/documentation/). Most
of the examples can be understood by someone with a good understanding
of programming in general, though.
[Back to top](#top)
<section id="intro-example">
Word Count Example
------------------
To start, let's look at a Word Count job implemented in Scala. This program is
very simple but it will give you a basic idea of what a Scala job looks like.
```scala
import org.apache.flinkclient.LocalExecutor
import org.apache.flinkapi.scala._
import org.apache.flinkapi.scala.operators._
object WordCount {
def main(args: Array[String]) {
val input = TextFile(textInput)
val words = input.flatMap { _.split(" ") map { (_, 1) } }
val counts = words.groupBy { case (word, _) => word }
.reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
val output = counts.write(wordsOutput, CsvOutputFormat())
val plan = new ScalaPlan(Seq(output))
LocalExecutor.execute(plan)
}
}
```
Same as any Flink job a Scala job consists of one or several data
sources, one or several data sinks and operators in between these that transform
data. Together these parts are referred to as the data flow graph. It dictates
the way data is passed when a job is executed.
When using Scala in Flink an important concept to grasp is that of the
`DataSet`. `DataSet` is an abstract concept that represents actual data sets at
runtime and which has operations that transform data to create a new transformed
data set. In this example the `TextFile("/some/input")` call creates a
`DataSet[String]` that represents the lines of text from the input. The
`flatMap` operation that looks like a regular Scala flatMap is in fact an
operation on `DataSet` that passes (at runtime) the data items through the
provided anonymous function to transform them. The result of the `flatMap`
operation is a new `DataSet` that represents the transformed data. On this other
operations be performed. Another such operation are `groupBy` and `reduce`, but
we will go into details of those later in this guide.
The `write` operation of `DataSet` is used to create a data sink. You provide it
with a path where the data is to be written to and an output format. This is
enough for now but we will discuss data formats (for sources and sinks) later.
To execute a data flow graph one or several sinks have to wrapped in a `Plan`
which can then be executed on a cluster using `RemoteExecutor`. Here, the
`LocalExecutor` is used to run the flow on the local computer. This is useful
for debugging your job before running it on an actual cluster.
[Back to top](#top)
<section id="intro-example">
Project Setup
-------------
We will only cover maven here but the concepts should work equivalently with
other build systems such as Gradle or sbt. When wanting to develop a Scala job
all that is needed as dependency is is `flink-scala` (and `flink-clients`, if
you want to execute your jobs). So all that needs to be done is to add the
following lines to your POM.
```xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<version>{{site.FLINK_VERSION_STABLE}}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>{{site.FLINK_VERSION_STABLE}}</version>
</dependency>
</dependencies>
```
To quickly get started you can use the Flink Scala quickstart available
[here]({{site.baseurl}}/quickstart/scala.html). This will give you a
completeMaven project with some working example code that you can use to explore
the system or as basis for your own projects.
These imports are normally enough for any project:
```scala
import org.apache.flinkapi.scala._
import org.apache.flinkapi.scala.operators._
import org.apache.flinkclient.LocalExecutor
import org.apache.flinkclient.RemoteExecutor
```
The first two imports contain things like `DataSet`, `Plan`, data sources, data
sinks, and the operations. The last two imports are required if you want to run
a data flow on your local machine, respectively cluster.
[Back to top](#top)
<section id="dataset">
The DataSet Abstraction
-----------------------
As already alluded to in the introductory example you write Scala jobs by using
operations on a `DataSet` to create new transformed `DataSet`. This concept is
the core of the Flink Scala API so it merits some more explanation. A
`DataSet` can look and behave like a regular Scala collection in your code but
it does not contain any actual data but only represents data. For example: when
you use `TextFile()` you get back a `DataSource[String]` that represents each
line of text in the input as a `String`. No data is actually loaded or available
at this point. The set is only used to apply further operations which themselves
are not executed until the data flow is executed. An operation on `DataSet`
creates a new `DataSet` that represents the transformation and has a pointer to
the `DataSet` that represents the data to be transformed. In this way a tree of
data sets is created that contains both the specification of the flow of data as
well as all the transformations. This graph can be wrapped in a `Plan` and
executed.
Working with the system is like working with lazy collections, where execution
is postponed until the user submits the job.
`DataSet` has a generic parameter, this is the type of each data item or record
that would be processed by further transformations. This is similar to how
`List[A]` in Scala would behave. For example in:
```scala
val input: DataSet[(String, Int)] = ...
val mapped = input map { a => (a._1, a._2 + 1)}
```
The anonymous function would retrieve in `a` tuples of type `(String, Int)`.
[Back to top](#top)
<section id="datatypes">
Data Types
----------
There are some restrictions regarding the data types that can be used in Scala
jobs (basically the generic parameter of `DataSet`). The usable types are
the primitive Scala types, case classes (which includes tuples), and custom
data types.
Custom data types must implement the interface
{% gh_link /flink-core/src/main/java/org/apache/flink/types/Value.java "Value" %}.
For custom data types that should also be used as a grouping key or join key
the {% gh_link /flink-core/src/main/java/org/apache/flink/types/Key.java "Key" %}
interface must be implemented.
[Back to top](#top)
<section id="data-sources">
Creating Data Sources
---------------------
To get an initial `DataSet` on which to perform operations to build a data flow
graph the following construct is used:
```scala
val input = DataSource("<file-path>", <input-format>)
```
The value `input` is now a `DataSet` with the generic type depending on the
input format.
The file path can be on of either `file:///some/file` to acces files on the
local machine or `hdfs://some/path` to read files from HDFS. The input
format can be one of our builtin formats or a custom input format. The builtin
formats are:
* [TextInputFormat](#text-input-format)
* [CsvInputFormat](#csv-input-format)
* [DelimitedInputFormat](#delimited-input-format)
* [BinaryInputFormat](#binary-input-format)
* [BinarySerializedInputFormat](#binary-serialized-input-format)
* [FixedLengthInputFormat](#fixed-length-input-format)
We will now have a look at each of them and show how they are employed and in
which situations.
[Back to top](#top)
<section id="text-input-format">
#### TextInputFormat
This input format simply reads a text file line wise and creates a `String`
for each line. It is used as:
```scala
TextInputFormat()
```
As you have already seen in the Word Count Example there is a shortcut for this.
Instead of using a `DataSource` with `TextInputFormat` you can simply write:
```scala
val input = TextFile("<file-path>")
```
The `input` would then be a `DataSet[String]`.
[Back to top](#top)
<section id="csv-input-format">
#### CsvInputFormat
This input format is mainly used to read Csv-Files, as the name suggests. Input
files must be text files. You can specify the `String` that should be used
as the separator between individual records (this would often be newline) and
also the separator between fields of a record (this would often be a comma).
The `CsvInputFormat` will automatically read the records and create
Scala tuples or custom case class objects for you. The format can be used
in one of the following ways:
```scala
CsvInputFormat[Out]()
CsvInputFormat[Out](recordDelim: String)
CsvInputFormat[Out](recordDelim: String, fieldDelim: Char)
CsvInputFormat[Out](fieldIndices: Seq[Int])
CsvInputFormat[Out](fieldIndices: Seq[Int], recordDelim: String)
CsvInputFormat[Out](fieldIndices: Seq[Int], recordDelim: String, fieldDelim: Char)
```
The default record delimiter is a newline, the default field delimiter is a
comma. The type parameter `Out` must be a case class type, which also includes
tuple types since they are internally case classes.
Normally, all the fields of a record are read. If you want to explicitly
specify which fields of the record should be read you can use one of the
tree variants with a `fieldIndices` parameter. Here you give a list
of the fields that should be read. Field indices start from zero.
An example usage could look as follows:
```scala
val input = DataSource("file:///some/file", CsvInputFormat[(Int, Int, String)](Seq(1, 17, 42), "\n", ','))
```
Here only the specified fields would be read and 3-tuples created for you.
The type of input would be `DataSet[(Int, Int, String)]`.
[Back to top](#top)
<section id="delimited-input-format">
#### DelimitedInputFormat
This input format is meant for textual records that are separated by
some delimiter. The delimiter could be a newline, for example. It is used like
this:
```scala
DelimitedInputFormat[Out](parseFunction: String => Out, delim: String = "\n")
```
The input files will be split on the supplied delimiter (or the default newline)
and the supplied parse function must parse the textual representation in the
`String` and return an object. The type of this object will then also be the
type of the `DataSet` created by the `DataSource` operation.
Just as with `BinaryInputFormat` the function can be an anonymous function, so
you could have:
```scala
val input = DataSource("file:///some/file", BinaryInputFormat( { line =>
line match {
case EdgeInputPattern(from, to) => Path(from.toInt, to.toInt, 1)
}
}))
```
In this example EdgeInputPattern is some regular expression used for parsing
a line of text and `Path` is a custom case class that is used to represent
the data. The type of input would in this case be `DataSet[Path]`.
[Back to top](#top)
<section id="binary-input-format">
#### BinaryInputFormat
This input format is best used when you have a custom binary format that
you store the data in. It is created using one of the following:
```scala
BinaryInputFormat[Out](readFunction: DataInput => Out)
BinaryInputFormat[Out](readFunction: DataInput => Out, blocksize: Long)
```
So you have to provide a function that gets a
[java.io.DataInput](http://docs.oracle.com/javase/7/docs/api/java/io/DataInput.html)
and returns the object that
contains the data. The type of this object will then also be the type of the
`DataSet` created by the `DataSource` operation.
The provided function can also be an anonymous function, so you could
have something like this:
```scala
val input = DataSource("file:///some/file", BinaryInputFormat( { input =>
val one = input.readInt
val two = input.readDouble
(one, two)
}))
```
Here `input` would be of type `DataSet[(Int, Double)]`.
[Back to top](#top)
<section id="binary-serialized-input-format">
#### BinarySerializedInputFormat
This input format is only meant to be used in conjunction with
`BinarySerializedOutputFormat`. You can use these to write elements to files using a
Flink-internal format that can efficiently be read again. You should only
use this when output is only meant to be consumed by other Flink jobs.
The format can be used on one of two ways:
```scala
BinarySerializedInputFormat[Out]()
BinarySerializedInputFormat[Out](blocksize: Long)
```
So if input files contain elements of type `(String, Int)` (a tuple type) you
could use:
```scala
val input = DataSource("file:///some/file", BinarySerializedInputFormat[(String, Int)]())
```
[Back to top](#top)
<section id="fixed-length-input-format">
#### FixedLengthInputFormat
This input format is for cases where you want to read binary blocks
of a fixed size. The size of a block must be specified and you must
provide code that reads elements from a byte array.
The format is used like this:
```scala
FixedLengthInputFormat[Out](readFunction: (Array[Byte], Int) => Out, recordLength: Int)
```
The specified function gets an array and a position at which it must start
reading the array and returns the element read from the binary data.
[Back to top](#top)
<section id="operations">
Operations on DataSet
---------------------
As explained in [Programming Model](pmodel.html#operators),
a Flink job is a graph of operators that process data coming from
sources that is finally written to sinks. When you use the Scala front end
these operators as well as the graph is created behind the scenes. For example,
when you write code like this:
```scala
val input = TextFile("file:///some/file")
val words = input.map { x => (x, 1) }
val output = counts.write(words, CsvOutputFormat()))
val plan = new ScalaPlan(Seq(output))
```
What you get is a graph that has a data source, a map operator (that contains
the code written inside the anonymous function block), and a data sink. You
do not have to know about this to be able to use the Scala front end but
it helps to remember, that when you are using Scala you are building
a data flow graph that processes data only when executed.
There are operations on `DataSet` that correspond to all the types of operators
that the Flink system supports. We will shortly go trough all of them with
some examples.
[Back to top](#top)
<section id="operator-templates">
#### Basic Operator Templates
Most of the operations have three similar versions and we will
explain them here for all of the operators together. The three versions are `map`,
`flatMap`, and `filter`. All of them accept an anonymous function that
defines what the operation does but the semantics are different.
The `map` version is a simple one to one mapping. Take a look at the following
code:
```scala
val input: DataSet[(String, Int)]
val mapped = input.map { x => (x._1, x._2 + 3) }
```
This defines a map operator that operates on tuples of String and Int and just
adds three to the Int (the second fields of the tuple). So, if the input set had
the tuples (a, 1), (b, 2), and (c, 3) the result after the operator would be
(a, 4), (b, 5), and (c, 6).
The `flatMap` version works a bit differently,
here you return something iterable from the anonymous function. The iterable
could be a list or an array. The elements in this iterable are unnested.
So for every element in the input data you get a list of elements. The
concatenation of those is the result of the operator. If you had
the following code:
```scala
val input: DataSet[(String, Int)]
val mapped = input.flatMap { x => List( (x._1, x._2), (x._1, x._2 + 1) ) }
```
and as input the tuples (a, 1) and (b, 1) you would get (a, 1), (a, 2), (b, 1),
and (b, 2) as result. It is one flat list, and not the individual lists returned
from the anonymous function.
The third template is `filter`. Here you give an anonymous function that
returns a Boolean. The elements for which this Boolean is true are part of the
result of the operation, the others are culled. An example for a filter is this
code:
```scala
val input: DataSet[(String, Int)]
val mapped = input.filter { x => x._2 >= 3 }
```
[Back to top](#top)
<section id="key-selectors">
#### Field/Key Selectors
For some operations (group, join, and cogroup) it is necessary to specify which
parts of a data type are to be considered the key. This key is used for grouping
elements together for reduce and for joining in case of a join or cogroup.
In Scala the key is specified using a special anonymous function called
a key selector. The key selector has as input an element of the type of
the `DataSet` and must return a single value or a tuple of values that should
be considered the key. This will become clear with some examples: (Note that
we use the reduce operation here as an example, we will have a look at
that further down):
```scala
val input: DataSet[(String, Int)]
val reduced = input groupBy { x => (x._1) } reduce { ... }
val reduced2 = input groupBy { case (w, c) => w } reduce { ... }
case class Test(a: String, b: Int, c: Int)
val input2: DataSet[Test]
val reduced3 = input2 groupBy { x => (x.a, x.b) } reduce { ... }
val reduced4 = input2 groupBy { case Test(x,y,z) => (x,y) } reduce { ... }
```
The anonymous function block passed to `groupBy` is the key selector. The first
two examples both specify the `String` field of the tuple as key. In the second
set of examples we see a custom case class and here we select the first two
fields as a compound key.
It is worth noting that the key selector function is not actually executed
at runtime but it is parsed at job creation time where the key information is
extracted and stored for efficient computation at runtime.
#### Map Operation
Map is an operation that gets one element at a time and can output one or
several elements. The operations that result in a `MapOperator` in the graph are exactly
those mentioned in the previous section. For completeness' sake we will mention
their signatures here (in this and the following such lists `In` is the
type of the input data set, `DataSet[In]`):
```scala
def map[Out](fun: In => Out): DataSet[Out]
def flatMap[Out](fun: In => Iterator[Out]): DataSet[Out]
def filter(fun: In => Boolean): DataSet[Out]
```
#### Reduce Operation
Reduce is an operation that looks
at groups of elements at a time and can, for one group, output one or several
elements. To specify how elements should be grouped you need to give
a key selection function, as explained [above](#key-selectors).
The basic template of the reduce operation is:
```scala
input groupBy { <key selector> } reduce { <reduce function> }
```
The signature of the reduce function depends on the variety of reduce operation
selected. There are right now three different versions:
```scala
def reduce(fun: (In, In) => In): DataSet[In]
def reduceGroup[Out](fun: Iterator[In] => Out): DataSet[Out]
def combinableReduceGroup(fun: Iterator[In] => In): DataSet[In]
```
The `reduce` variant is like a `reduceLeft` on a Scala collection with
the limitation that the output data type must be the same as the input data
type. You specify how to elements of the selection should be combined,
this is then used to reduce the elements in one group (of the same key)
down to one element. This can be used to implement aggregation operators,
for example:
```scala
val words: DataSet[(String, Int)]
val counts = words.groupBy { case (word, count) => word}
.reduce { (w1, w1) => (w1._1, w1._2 + w2._2) }
```
This would add up the Int fields of those tuples that have the same String
in the first fields. As is for example required in Word Count.
The `reduceGroup` variant can be used when more control is required. Here
your reduce function gets an `Iterator` that can be used to iterate over
all the elements in a group. With this type or reduce operation the
output data type can be different from the input data type. An example
of this kind of operation is this:
```scala
val words: DataSet[(String, Int)]
val minCounts = words.groupBy { case (word, count) => word}
.reduceGroup { words => words.minBy { _._2 } }
```
Here we use the minBy function of Scala collections to determine the
element with the minimum count in a group.
The `combinableGroupReduce` works like the `groupReduce` with the difference
that the reduce operation is combinable. This is an optimization one can use,
please have a look at [Programming Model](pmodel.html "Programming Model") for
the details.
#### Join Operation
The join operation is similar to a database equi-join. It is a two input
iteration where you have to specify a key selector for each of the inputs
and then the anonymous function is called for every pair of matching
elements from the two input sides.
The basic template is:
```scala
input1 join input2 where { <key selector 1> } isEqualTo { <key selector 2>} map { <join function> }
```
or, because lines will get to long fast:
```scala
input1.join(input2)
.where { <key selector 1> }
.isEqualTo { <key selector 2>}
.map { <join function> }
```
(Scala can sometimes be quite finicky about where you can omit dots and
parentheses, so it's best to use dots in multi-line code like this.)
As mentioned in [here](#operator-templates) there are three versions of
this operator, so you can use one of these in the last position:
```scala
def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out]
def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out]
def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)]
```
One example where this can be used is database-style joining with projection:
```scala
input1.join(input2)
.where { case (a, b, c) => (a, b) }
.isEqualTo { case (a, b, c, d) => (c, d) }
.map { (left, right) => (left._3, right._1) }
```
Here the join key for the left input is a compound of the first two tuple fields
while the key for the second input is a compound of the last two fields. We then
pick one field each from both sides as the result of the operation.
#### CoGroup Operation
The cogroup operation is a cross between join and reduce. It has two inputs
and you have to specify a key selector for each of them. This is where the
similarities with join stop. Instead of having one invocation of your user
code per pair of matching elements all elements from the left and from the right
are grouped together for one single invocation. In your function you get
an `Iterator` for the elements from the left input and another `Iterator`
for the elements from the right input.
The basic template is:
```scala
input1 cogroup input2 where { <key selector 1> } isEqualTo { <key selector 2>} map { <cogroup function> }
```
or, because lines will get to long fast:
```scala
input1.cogroup(input2)
.where { <key selector 1> }
.isEqualTo { <key selector 2>}
.map { <cogroup function> }
```
There are to variants you can use, with the semantics explained
[here](#operator-templates).
```scala
def map[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Out): DataSet[Out]
def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Iterator[Out]): DataSet[Out]
```
#### Cross Operation
The cross operation is used to form the Cartesian product of the elements
from two inputs. The basic template is:
```scala
input1 cross input2 map { <cogroup function> }
```
Again there are three variants, with the semantics explained
[here](#operator-templates).
```scala
def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out]
def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out]
def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)]
```
#### Union
When you want to have the combination of several data sets as the input of
an operation you can use a union to combine them. It is used like this
```scala
val input1: DataSet[String]
val input2: DataSet[String]
val unioned = input1.union(input2)
```
The signature of union is:
```scala
def union(secondInput: DataSet[A])
```
Where `A` is the generic type of the `DataSet` on which you execute the `union`.
[Back to top](#top)
<section id="iterations">
Iterations
----------
Iterations allow you to implement *loops* in Flink programs.
[This page](iterations.html) gives a
general introduction to iterations. This section here provides quick examples
of how to use the concepts using the Scala API.
The iteration operators encapsulate a part of the program and execute it
repeatedly, feeding back the result of one iteration (the partial solution) into
the next iteration. Flink has two different types of iterations,
*Bulk Iteration* and *Delta Iteration*.
For both types of iterations you provide the iteration body as a function
that has data sets as input and returns a new data set. The difference is
that bulk iterations map from one data set two one new data set while
delta iterations map two data sets to two new data sets.
#### Bulk Iteration
The signature of the bulk iterate method is this:
```scala
def iterate(n: Int, stepFunction: DataSet[A] => DataSet[A])
```
where `A` is the type of the `DataSet` on which `iterate` is called. The number
of steps is given in `n`. This is how you use it in practice:
```scala
val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput))
val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput))
def kMeansStep(centers: DataSet[(Int, Point)]) = {
val distances = dataPoints cross centers map computeDistance
val nearestCenters = distances.groupBy { case (pid, _) => pid }
.reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled
val newCenters = nearestCenters.groupBy { case (cid, _) => cid }
.reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() }
newCenters
}
val finalCenters = clusterPoints.iterate(numIterations, kMeansStep)
val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled))
```
Not that we use some functions here which we don't show. If you want, you
can check out the complete code in our KMeans example.
#### Delta Iteration
The signature of the delta iterate method is this:
```scala
def iterateWithDelta(workset: DataSet[W], solutionSetKey: A => K, stepFunction: (DataSet[A], DataSet[W]) => (DataSet[A], DataSet[W]), maxIterations: Int)
```
where `A` is the type of the `DataSet` on which `iterateWithDelta` is called,
`W` is the type of the `DataSet` that represents the workset and `K` is the
key type. The maximum number of iterations must always be given.
For information on how delta iterations in general work on our system, please
refer to [iterations](iterations.html). A working example job is
available here:
[Scala Connected Components Example](examples_scala.html#connected_components)
[Back to top](#top)
<section id="data-sinks">
Creating Data Sinks
-------------------
The creation of data sinks is analog to the creation of data sources. `DataSet`
has a `write` method that is used to create a sink that writes the output
of the operation to a file in the local file system or HDFS. The general pattern
is this:
```scala
val sink = out.write("<file-path>", <output-format>)
```
Where `out` is some `DataSet`. Just as for data sources, the file path can be
on of either `file:///some/file` to acces files on the local machine or
`hdfs://some/path` to read files from HDFS. The output format can be one of our
builtin formats or a custom output format. The builtin formats are:
* [DelimitedOutputFormat](#delimited-output-format)
* [CsvOutputFormat](#csv-output-format)
* [RawOutputFormat](#raw-output-format)
* [BinaryOutputFormat](#binary-output-format)
* [BinarySerializedOutputFormat](#binary-serialized-output-format)
We will now have a look at each of them and show how they are employed and in
which situations.
[Back to top](#top)
<section id="delimited-output-format">
#### DelimitedOutputFormat
This output format is meant for writing textual records that are separated by
some delimiter. The delimiter could be a newline, for example. It is used like
this:
```scala
DelimitedOutputFormat[In](formatFunction: In => String, delim: String = "\n")
```
For every element in the `DataSet` the formatting function is called and
the result of that is appended to the output file. In between the elements
the `delim` string is inserted.
An example would be:
```scala
val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", DelimitedOutputFormat( { elem =>
"%s|%d".format(elem._1, elem._2)
}))
```
Here we use Scala String formatting to write the two fields of the tuple
separated by a pipe character. The default newline delimiter will be inserted
between the elements in the output files.
[Back to top](#top)
<section id="csv-output-format">
#### CsvOutputFormat
This output format can be used to automatically write fields of tuple
elements or case classes to CSV files. You can specify what separator should
be used between fields of an element and also the separator between elements.
```scala
CsvOutputFormat[In]()
CsvOutputFormat[In](recordDelim: String)
CsvOutputFormat[In](recordDelim: String, fieldDelim: Char)
```
The default record delimiter is a newline, the default field delimiter is a
comma.
An example usage could look as follows:
```scala
val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", CsvOutputFormat())
```
Notice how we don't need to specify the generic type here, it is inferred.
[Back to top](#top)
<section id="raw-output-format">
#### RawOutputFormat
This input format can be used when you want to have complete control over
what gets written. You get an
[OutputStream](http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html)
and can write the elements of the `DataSet` exactly as you see fit.
A `RawOutputFormat` is created like this:
```scala
RawOutputFormat[In](writeFunction: (In, OutputStream) => Unit)
```
The function you pass in gets one element from the `DataSet` and must
write it to the given `OutputStream`. An example would be the following:
```scala
val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", RawOutputFormat( { (elem, output) =>
/* write elem._1 and elem._2 to output */
}))
```
<section id="binary-output-format">
#### BinaryOutputFormat
This format is very similar to the RawOutputFormat. The difference is that
instead of an [OutputStream](http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html)
you get a [DataOutput](http://docs.oracle.com/javase/7/docs/api/java/io/DataOutput.html)
to which you can write binary data. You can also specify the block size for
the binary output file. When you don't specify a block size some default
is used.
A `BinaryOutputFormat` is created like this:
```scala
BinaryOutputFormat[In](writeFunction: (In, DataOutput) => Unit)
BinaryOutputFormat[In](writeFunction: (In, DataOutput) => Unit, blockSize: Long)
```
[Back to top](#top)
<section id="binary-serialized-output-format">
#### BinarySerializedOutputFormat
This output format is only meant to be used in conjunction with
`BinarySerializedInputFormat`. You can use these to write elements to files using a
Flink-internal format that can efficiently be read again. You should only
use this when output is only meant to be consumed by other Flink jobs.
The output format can be used on one of two ways:
```scala
BinarySerializedOutputFormat[In]()
BinarySerializedOutputFormat[In](blocksize: Long)
```
So to write elements of some `DataSet[A]` to a binary file you could use:
```scala
val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", BinarySerializedInputFormat())
```
As you can see the type of the elements need not be specified, it is inferred
by Scala.
[Back to top](#top)
<section id="execution">
Executing Jobs
--------------
To execute a data flow graph the sinks need to be wrapped in a {% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala "ScalaPlan" %} object like this:
```scala
val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", CsvOutputFormat())
val plan = new ScalaPlan(Seq(sink))
```
You can put several sinks into the `Seq` that is passed to the constructor.
There are two ways one can execute a data flow plan: local execution and
remote/cluster execution. When using local execution the plan is executed on
the local computer. This is handy while developing jobs because you can
easily debug your code and iterate quickly. When a job is ready to be
used on bigger data sets it can be executed on a cluster. We will
now give an example for each of the two execution modes.
First up is local execution:
```scala
import org.apache.flinkclient.LocalExecutor
...
val plan: ScalaPlan = ...
LocalExecutor.execute(plan)
```
This is all there is to it.
Remote (or cluster) execution is a bit more complicated because you have
to package your code in a jar file so that it can be distributed on the cluster.
Have a look at the [scala quickstart](scala_api_quickstart.html) to see how you
can set up a maven project that does the packaging. Remote execution is done
using the {% gh_link /flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java "RemoteExecutor" %}, like this:
```scala
import org.apache.flinkclient.RemoteExecutor
...
val plan: ScalaPlan = ...
val ex = new RemoteExecutor("<job manager ip address>", <job manager port>, "your.jar");
ex.executePlan(plan);
```
The IP address and the port of the Flink job manager depend on your
setup. Have a look at [cluster quickstart](/quickstart/setup.html) for a quick
guide about how to set up a cluster. The default cluster port is 6123, so
if you run a job manger on your local computer you can give this and "localhost"
as the first to parameters to the `RemoteExecutor` constructor.
[Back to top](#top)
<section id="rich-functions">
Rich Functions
--------------
Sometimes having a single function that is passed to an operation is not enough.
Using Rich Functions it is possible to have state inside your operations and
have code executed before the first element is processed and after the last
element is processed. For example, instead of a simple function as in this
example:
```scala
val mapped = input map { x => x + 1 }
```
you can have a rich function like this:
```scala
val mapped = input map( new MapFunction[(String, Int), (String, Int)] {
val someState: SomeType = ...
override def open(config: Configuration) = {
// one-time initialization code
}
override def close() = {
// one-time clean-up code
}
override def apply(in: (String, Int)) = {
// do complex stuff
val result = ...
result
}
})
```
You could also create a custom class that derives from `MapFunction`
instead of the anonymous class we used here.
There are rich functions for all the various operator types. The basic
template is the some, though. The common interface that they implement
is {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java "Function" %}. The `open` and `close` methods can be overridden to run set-up
and tear-down code. The other methods can be used in a rich function to
work with the runtime context which gives information about the context
of the operator. Your operation code must now reside in an `apply` method
that has the same signature as the anonymous function you would normally
supply.
The rich functions reside in the package `org.apache.flinkapi.scala.functions`.
This is a list of all the rich functions can can be used instead of
simple functions in the respective operations:
```scala
abstract class MapFunction[In, Out]
abstract class FlatMapFunction[In, Out]
abstract class FilterFunction[In, Out]
abstract class ReduceFunction[In]
abstract class GroupReduceFunction[In, Out]
abstract class CombinableGroupReduceFunction[In, Out]
abstract class JoinFunction[LeftIn, RightIn, Out]
abstract class FlatJoinFunction[LeftIn, RightIn, Out]
abstract class CoGroupFunction[LeftIn, RightIn, Out]
abstract class FlatCoGroupFunction[LeftIn, RightIn, Out]
abstract class CrossFunction[LeftIn, RightIn, Out]
abstract class FlatCrossFunction[LeftIn, RightIn, Out]
```
Note that for all the rich stubs, you need to specify the generic type of
the input (or inputs) and the output type.
[Back to top](#top)