blob: ae6f863de335f99f45f88a6ae45366ad16ffddca [file] [log] [blame] [view]
---
title: "Parallel Execution"
nav-parent_id: execution
nav-pos: 30
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
This section describes how the parallel execution of programs can be configured in Flink. A Flink
program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into
several parallel instances for execution and each parallel instance processes a subset of the task's
input data. The number of parallel instances of a task is called its *parallelism*.
If you want to use [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) you should also consider
setting a maximum parallelism (or `max parallelism`). When restoring from a savepoint you can
change the parallelism of specific operators or the whole program and this setting specifies
an upper bound on the parallelism. This is required because Flink internally partitions state
into key-groups and we cannot have `+Inf` number of key-groups because this would be detrimental
to performance.
* toc
{:toc}
## Setting the Parallelism
The parallelism of a task can be specified in Flink on different levels:
### Operator Level
The parallelism of an individual operator, data source, or data sink can be defined by calling its
`setParallelism()` method. For example, like this:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")
{% endhighlight %}
</div>
</div>
### Execution Environment Level
As mentioned [here]({{ site.baseurl }}/dev/api_concepts.html#anatomy-of-a-flink-program) Flink
programs are executed in the context of an execution environment. An
execution environment defines a default parallelism for all operators, data sources, and data sinks
it executes. Execution environment parallelism can be overwritten by explicitly configuring the
parallelism of an operator.
The default parallelism of an execution environment can be specified by calling the
`setParallelism()` method. To execute all operators, data sources, and data sinks with a parallelism
of `3`, set the default parallelism of the execution environment as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordCounts.print()
env.execute("Word Count Example")
{% endhighlight %}
</div>
</div>
### Client Level
The parallelism can be set at the Client when submitting jobs to Flink. The
Client can either be a Java or a Scala program. One example of such a Client is
Flink's Command-line Interface (CLI).
For the CLI client, the parallelism parameter can be specified with `-p`. For
example:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
In a Java/Scala program, the parallelism is set as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
try {
PackagedProgram program = new PackagedProgram(file, args)
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
Configuration config = new Configuration()
Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
// set the parallelism to 10 here
client.run(program, 10, true)
} catch {
case e: Exception => e.printStackTrace
}
{% endhighlight %}
</div>
</div>
### System Level
A system-wide default parallelism for all execution environments can be defined by setting the
`parallelism.default` property in `./conf/flink-conf.yaml`. See the
[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details.
## Setting the Maximum Parallelism
The maximum parallelism can be set in places where you can also set a parallelism
(except client level and system level). Instead of calling `setParallelism()` you call
`setMaxParallelism()` to set the maximum parallelism.
The default setting for the maximum parallelism is roughly `operatorParallelism + (operatorParallelism / 2)` with
a lower bound of `127` and an upper bound of `32768`.
<span class="label label-danger">Attention</span> Setting the maximum parallelism to a very large
value can be detrimental to performance because some state backends have to keep internal data
structures that scale with the number of key-groups (which are the internal implementation mechanism for
rescalable state).
{% top %}