blob: 0c6daab2b4b52c9611acdc9c6df3856bfe82e84c [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.10-SNAPSHOT Documentation: Configuration</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Configuration</h1>
<h2 id="overview">Overview</h2>
<p>The default configuration parameters allow Flink to run out-of-the-box
in single node setups.</p>
<p>This page lists the most common options that are typically needed to set
up a well performing (distributed) installation. In addition a full
list of all available configuration parameters is listed here.</p>
<p>All configuration is done in <code>conf/flink-conf.yaml</code>, which is expected to be
a flat collection of <a href="http://www.yaml.org/spec/1.2/spec.html">YAML key value pairs</a>
with format <code>key: value</code>.</p>
<p>The system and run scripts parse the config at startup time. Changes to the configuration
file require restarting the Flink JobManager and TaskManagers.</p>
<p>The configuration files for the TaskManagers can be different, Flink does not assume
uniform machines in the cluster.</p>
<ul id="markdown-toc">
<li><a href="#overview" id="markdown-toc-overview">Overview</a></li>
<li><a href="#common-options" id="markdown-toc-common-options">Common Options</a></li>
<li><a href="#advanced-options" id="markdown-toc-advanced-options">Advanced Options</a></li>
<li><a href="#full-reference" id="markdown-toc-full-reference">Full Reference</a> <ul>
<li><a href="#hdfs" id="markdown-toc-hdfs">HDFS</a></li>
<li><a href="#jobmanager-amp-taskmanager" id="markdown-toc-jobmanager-amp-taskmanager">JobManager &amp; TaskManager</a></li>
<li><a href="#distributed-coordination-via-akka" id="markdown-toc-distributed-coordination-via-akka">Distributed Coordination (via Akka)</a></li>
<li><a href="#jobmanager-web-frontend" id="markdown-toc-jobmanager-web-frontend">JobManager Web Frontend</a></li>
<li><a href="#webclient" id="markdown-toc-webclient">Webclient</a></li>
<li><a href="#file-systems" id="markdown-toc-file-systems">File Systems</a></li>
<li><a href="#compileroptimizer" id="markdown-toc-compileroptimizer">Compiler/Optimizer</a></li>
</ul>
</li>
<li><a href="#yarn" id="markdown-toc-yarn">YARN</a></li>
<li><a href="#background" id="markdown-toc-background">Background</a> <ul>
<li><a href="#configuring-the-network-buffers" id="markdown-toc-configuring-the-network-buffers">Configuring the Network Buffers</a></li>
<li><a href="#configuring-temporary-io-directories" id="markdown-toc-configuring-temporary-io-directories">Configuring Temporary I/O Directories</a></li>
<li><a href="#configuring-taskmanager-processing-slots" id="markdown-toc-configuring-taskmanager-processing-slots">Configuring TaskManager processing slots</a></li>
</ul>
</li>
</ul>
<h2 id="common-options">Common Options</h2>
<ul>
<li>
<p><code>env.java.home</code>: The path to the Java installation to use (DEFAULT: system’s
default Java installation, if found). Needs to be specified if the startup
scipts fail to automatically resolve the java home directory. Can be specified
to point to a specific java installation or version. If this option is not
specified, the startup scripts also evaluate the <code>$JAVA_HOME</code> environment variable.</p>
</li>
<li>
<p><code>jobmanager.rpc.address</code>: The IP address of the JobManager, which is the
master/coordinator of the distributed system (DEFAULT: localhost).</p>
</li>
<li>
<p><code>jobmanager.rpc.port</code>: The port number of the JobManager (DEFAULT: 6123).</p>
</li>
<li>
<p><code>jobmanager.heap.mb</code>: JVM heap size (in megabytes) for the JobManager. You may have to increase the heap size for the JobManager if you are running
very large applications (with many operators), or if you are keeping a long history of them.</p>
</li>
<li>
<p><code>taskmanager.heap.mb</code>: JVM heap size (in megabytes) for the TaskManagers,
which are the parallel workers of the system. In
contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and
user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager
(including sorting/hashing/caching), so this value should be as
large as possible. If the cluster is exclusively running Flink,
the total amount of available memory per machine minus some memory for the
operating system (maybe 1-2 GB) is a good value.
On YARN setups, this value is automatically configured to the size of
the TaskManager’s YARN container, minus a certain tolerance value.</p>
</li>
<li>
<p><code>taskmanager.numberOfTaskSlots</code>: The number of parallel operator or
user function instances that a single TaskManager can run (DEFAULT: 1).
If this value is larger than 1, a single TaskManager takes multiple instances of
a function or operator. That way, the TaskManager can utilize multiple CPU cores,
but at the same time, the available memory is divided between the different
operator or function instances.
This value is typically proportional to the number of physical CPU cores that
the TaskManager’s machine has (e.g., equal to the number of cores, or half the
number of cores). <a href="config.html#configuring-taskmanager-processing-slots">More about task slots</a>.</p>
</li>
<li>
<p><code>parallelism.default</code>: The default parallelism to use for programs that have
no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs
running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will
cause the system to use all available execution resources for the program’s
execution. <strong>Note</strong>: The default parallelism can be overwriten for an entire
job by calling <code>setParallelism(int parallelism)</code> on the <code>ExecutionEnvironment</code>
or by passing <code>-p &lt;parallelism&gt;</code> to the Flink Command-line frontend. It can be
overwritten for single transformations by calling <code>setParallelism(int
parallelism)</code> on an operator. See the <a href="http://flink.apache.org/docs/master/apis/programming_guide.html#parallel-execution">programming
guide</a> for more information about the
parallelism.</p>
</li>
<li>
<p><code>fs.hdfs.hadoopconf</code>: The absolute path to the Hadoop File System’s (HDFS)
configuration directory (OPTIONAL VALUE).
Specifying this value allows programs to reference HDFS files using short URIs
(<code>hdfs:///path/to/files</code>, without including the address and port of the NameNode
in the file URI). Without this option, HDFS files can be accessed, but require
fully qualified URIs like <code>hdfs://address:port/path/to/files</code>.
This option also causes file writers to pick up the HDFS’s default values for block sizes
and replication factors. Flink will look for the “core-site.xml” and
“hdfs-site.xml” files in teh specified directory.</p>
</li>
</ul>
<h2 id="advanced-options">Advanced Options</h2>
<ul>
<li>
<p><code>taskmanager.tmp.dirs</code>: The directory for temporary files, or a list of
directories separated by the systems directory delimiter (for example ‘:’
(colon) on Linux/Unix). If multiple directories are specified, then the temporary
files will be distributed across the directories in a round-robin fashion. The
I/O manager component will spawn one reading and one writing thread per
directory. A directory may be listed multiple times to have the I/O manager use
multiple threads for it (for example if it is physically stored on a very fast
disc or RAID) (DEFAULT: The system’s tmp dir).</p>
</li>
<li>
<p><code>jobmanager.web.port</code>: Port of the JobManager’s web interface (DEFAULT: 8081).</p>
</li>
<li>
<p><code>fs.overwrite-files</code>: Specifies whether file output writers should overwrite
existing files by default. Set to <em>true</em> to overwrite by default, <em>false</em> otherwise.
(DEFAULT: false)</p>
</li>
<li>
<p><code>fs.output.always-create-directory</code>: File writers running with a parallelism
larger than one create a directory for the output file path and put the different
result files (one per parallel writer task) into that directory. If this option
is set to <em>true</em>, writers with a parallelism of 1 will also create a directory
and place a single result file into it. If the option is set to <em>false</em>, the
writer will directly create the file directly at the output path, without
creating a containing directory. (DEFAULT: false)</p>
</li>
<li>
<p><code>taskmanager.network.numberOfBuffers</code>: The number of buffers available to the
network stack. This number determines how many streaming data exchange channels
a TaskManager can have at the same time and how well buffered the channels are.
If a job is rejected or you get a warning that the system has not enough buffers
available, increase this value (DEFAULT: 2048).</p>
</li>
<li>
<p><code>taskmanager.memory.size</code>: The amount of memory (in megabytes) that the task
manager reserves on the JVM’s heap space for sorting, hash tables, and caching
of intermediate results. If unspecified (-1), the memory manager will take a fixed
ratio of the heap memory available to the JVM, as specified by
<code>taskmanager.memory.fraction</code>. (DEFAULT: -1)</p>
</li>
<li>
<p><code>taskmanager.memory.fraction</code>: The relative amount of memory that the task
manager reserves for sorting, hash tables, and caching of intermediate results.
For example, a value of 0.8 means that TaskManagers reserve 80% of the
JVM’s heap space for internal data buffers, leaving 20% of the JVM’s heap space
free for objects created by user-defined functions. (DEFAULT: 0.7)
This parameter is only evaluated, if <code>taskmanager.memory.size</code> is not set.</p>
</li>
</ul>
<h2 id="full-reference">Full Reference</h2>
<h3 id="hdfs">HDFS</h3>
<p>These parameters configure the default HDFS used by Flink. Setups that do not
specify a HDFS configuration have to specify the full path to
HDFS files (<code>hdfs://address:port/path/to/files</code>) Files will also be written
with default HDFS parameters (block size, replication factor).</p>
<ul>
<li><code>fs.hdfs.hadoopconf</code>: The absolute path to the Hadoop configuration directory.
The system will look for the “core-site.xml” and “hdfs-site.xml” files in that
directory (DEFAULT: null).</li>
<li><code>fs.hdfs.hdfsdefault</code>: The absolute path of Hadoop’s own configuration file
“hdfs-default.xml” (DEFAULT: null).</li>
<li><code>fs.hdfs.hdfssite</code>: The absolute path of Hadoop’s own configuration file
“hdfs-site.xml” (DEFAULT: null).</li>
</ul>
<h3 id="jobmanager-amp-taskmanager">JobManager &amp; TaskManager</h3>
<p>The following parameters configure Flink’s JobManager and TaskManagers.</p>
<ul>
<li><code>jobmanager.rpc.address</code>: The IP address of the JobManager, which is the
master/coordinator of the distributed system (DEFAULT: localhost).</li>
<li><code>jobmanager.rpc.port</code>: The port number of the JobManager (DEFAULT: 6123).</li>
<li><code>taskmanager.rpc.port</code>: The task manager’s IPC port (DEFAULT: 6122).</li>
<li><code>taskmanager.data.port</code>: The task manager’s port used for data exchange
operations (DEFAULT: 6121).</li>
<li><code>jobmanager.heap.mb</code>: JVM heap size (in megabytes) for the JobManager
(DEFAULT: 256).</li>
<li><code>taskmanager.heap.mb</code>: JVM heap size (in megabytes) for the TaskManagers,
which are the parallel workers of the system. In
contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and
user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager
(including sorting/hashing/caching), so this value should be as
large as possible (DEFAULT: 512). On YARN setups, this value is automatically
configured to the size of the TaskManager’s YARN container, minus a
certain tolerance value.</li>
<li><code>taskmanager.numberOfTaskSlots</code>: The number of parallel operator or
user function instances that a single TaskManager can run (DEFAULT: 1).
If this value is larger than 1, a single TaskManager takes multiple instances of
a function or operator. That way, the TaskManager can utilize multiple CPU cores,
but at the same time, the available memory is divided between the different
operator or function instances.
This value is typically proportional to the number of physical CPU cores that
the TaskManager’s machine has (e.g., equal to the number of cores, or half the
number of cores).</li>
<li><code>taskmanager.tmp.dirs</code>: The directory for temporary files, or a list of
directories separated by the systems directory delimiter (for example ‘:’
(colon) on Linux/Unix). If multiple directories are specified, then the temporary
files will be distributed across the directories in a round robin fashion. The
I/O manager component will spawn one reading and one writing thread per
directory. A directory may be listed multiple times to have the I/O manager use
multiple threads for it (for example if it is physically stored on a very fast
disc or RAID) (DEFAULT: The system’s tmp dir).</li>
<li><code>taskmanager.network.numberOfBuffers</code>: The number of buffers available to the
network stack. This number determines how many streaming data exchange channels
a TaskManager can have at the same time and how well buffered the channels are.
If a job is rejected or you get a warning that the system has not enough buffers
available, increase this value (DEFAULT: 2048).</li>
<li><code>taskmanager.network.bufferSizeInBytes</code>: The size of the network buffers, in
bytes (DEFAULT: 32768 (= 32 KiBytes)).</li>
<li><code>taskmanager.memory.size</code>: The amount of memory (in megabytes) that the task
manager reserves on the JVM’s heap space for sorting, hash tables, and caching
of intermediate results. If unspecified (-1), the memory manager will take a fixed
ratio of the heap memory available to the JVM, as specified by
<code>taskmanager.memory.fraction</code>. (DEFAULT: -1)</li>
<li><code>taskmanager.memory.fraction</code>: The relative amount of memory that the task
manager reserves for sorting, hash tables, and caching of intermediate results.
For example, a value of 0.8 means that TaskManagers reserve 80% of the
JVM’s heap space for internal data buffers, leaving 20% of the JVM’s heap space
free for objects created by user-defined functions. (DEFAULT: 0.7)
This parameter is only evaluated, if <code>taskmanager.memory.size</code> is not set.</li>
<li><code>jobclient.polling.interval</code>: The interval (in seconds) in which the client
polls the JobManager for the status of its job (DEFAULT: 2).</li>
<li><code>taskmanager.runtime.max-fan</code>: The maximal fan-in for external merge joins and
fan-out for spilling hash tables. Limits the number of file handles per operator,
but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).</li>
<li><code>taskmanager.runtime.sort-spilling-threshold</code>: A sort operation starts spilling
when this fraction of its memory budget is full (DEFAULT: 0.8).</li>
<li><code>taskmanager.heartbeat-interval</code>: The interval in which the TaskManager sends
heartbeats to the JobManager.</li>
<li><code>jobmanager.max-heartbeat-delay-before-failure.msecs</code>: The maximum time that a
TaskManager hearbeat may be missing before the TaskManager is considered failed.</li>
</ul>
<h3 id="distributed-coordination-via-akka">Distributed Coordination (via Akka)</h3>
<ul>
<li><code>akka.ask.timeout</code>: Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: <strong>100 s</strong>).</li>
<li><code>akka.lookup.timeout</code>: Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d) (DEFAULT: <strong>10 s</strong>).</li>
<li><code>akka.framesize</code>: Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT: <strong>10485760b</strong>).</li>
<li><code>akka.watch.heartbeat.interval</code>: Heartbeat interval for Akka’s DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a> (DEFAULT: <strong>akka.ask.timeout/10</strong>).</li>
<li><code>akka.watch.heartbeat.pause</code>: Acceptable heartbeat pause for Akka’s DeathWatch mechanism. A low value does not allow a irregular heartbeat. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a> (DEFAULT: <strong>akka.ask.timeout</strong>).</li>
<li><code>akka.watch.threshold</code>: Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a> (DEFAULT: <strong>12</strong>).</li>
<li><code>akka.transport.heartbeat.interval</code>: Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: <strong>1000 s</strong>).</li>
<li><code>akka.transport.heartbeat.pause</code>: Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: <strong>6000 s</strong>).</li>
<li><code>akka.transport.threshold</code>: Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value (DEFAULT: <strong>300</strong>).</li>
<li><code>akka.tcp.timeout</code>: Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT: <strong>akka.ask.timeout</strong>).</li>
<li><code>akka.throughput</code>: Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT: <strong>15</strong>).</li>
<li><code>akka.log.lifecycle.events</code>: Turns on the Akka’s remote logging of events. Set this value to ‘on’ in case of debugging (DEFAULT: <strong>off</strong>).</li>
<li><code>akka.startup-timeout</code>: Timeout after which the startup of a remote component is considered being failed (DEFAULT: <strong>akka.ask.timeout</strong>).</li>
</ul>
<h3 id="jobmanager-web-frontend">JobManager Web Frontend</h3>
<ul>
<li><code>jobmanager.web.port</code>: Port of the JobManager’s web interface that displays
status of running jobs and execution time breakdowns of finished jobs
(DEFAULT: 8081). Setting this value to <code>-1</code> disables the web frontend.</li>
<li><code>jobmanager.web.history</code>: The number of latest jobs that the JobManager’s web
front-end in its history (DEFAULT: 5).</li>
</ul>
<h3 id="webclient">Webclient</h3>
<p>These parameters configure the web interface that can be used to submit jobs and
review the compiler’s execution plans.</p>
<ul>
<li><code>webclient.port</code>: The port of the webclient server (DEFAULT: 8080).</li>
<li><code>webclient.tempdir</code>: The temp directory for the web server. Used for example
for caching file fragments during file-uploads (DEFAULT: The system’s temp
directory).</li>
<li><code>webclient.uploaddir</code>: The directory into which the web server will store
uploaded programs (DEFAULT: ${webclient.tempdir}/webclient-jobs/).</li>
<li><code>webclient.plandump</code>: The directory into which the web server will dump
temporary JSON files describing the execution plans
(DEFAULT: ${webclient.tempdir}/webclient-plans/).</li>
</ul>
<h3 id="file-systems">File Systems</h3>
<p>The parameters define the behavior of tasks that create result files.</p>
<ul>
<li><code>fs.overwrite-files</code>: Specifies whether file output writers should overwrite
existing files by default. Set to <em>true</em> to overwrite by default, <em>false</em> otherwise.
(DEFAULT: false)</li>
<li><code>fs.output.always-create-directory</code>: File writers running with a parallelism
larger than one create a directory for the output file path and put the different
result files (one per parallel writer task) into that directory. If this option
is set to <em>true</em>, writers with a parallelism of 1 will also create a directory
and place a single result file into it. If the option is set to <em>false</em>, the
writer will directly create the file directly at the output path, without
creating a containing directory. (DEFAULT: false)</li>
</ul>
<h3 id="compileroptimizer">Compiler/Optimizer</h3>
<ul>
<li><code>compiler.delimited-informat.max-line-samples</code>: The maximum number of line
samples taken by the compiler for delimited inputs. The samples are used to
estimate the number of records. This value can be overridden for a specific
input with the input format’s parameters (DEFAULT: 10).</li>
<li><code>compiler.delimited-informat.min-line-samples</code>: The minimum number of line
samples taken by the compiler for delimited inputs. The samples are used to
estimate the number of records. This value can be overridden for a specific
input with the input format’s parameters (DEFAULT: 2).</li>
<li><code>compiler.delimited-informat.max-sample-len</code>: The maximal length of a line
sample that the compiler takes for delimited inputs. If the length of a single
sample exceeds this value (possible because of misconfiguration of the parser),
the sampling aborts. This value can be overridden for a specific input with the
input format’s parameters (DEFAULT: 2097152 (= 2 MiBytes)).</li>
</ul>
<h2 id="yarn">YARN</h2>
<ul>
<li><code>yarn.heap-cutoff-ratio</code>: (Default 0.15) Percentage of heap space to remove from containers started by YARN.
When a user requests a certain amount of memory for each TaskManager container (for example 4 GB),
we can not pass this amount as the maximum heap space for the JVM (<code>-Xmx</code> argument) because the JVM
is also allocating memory outside the heap. YARN is very strict with killing containers which are using
more memory than requested.
Therefore, we remove a 15% of the memory from the requested heap as a safety margin.</li>
<li>
<p><code>yarn.heap-cutoff-min</code>: (Default 384 MB) Minimum amount of memory to cut off the requested heap size.</p>
</li>
<li>
<p><code>yarn.reallocate-failed</code> (Default ‘true’) Controls whether YARN should reallocate failed containers</p>
</li>
<li>
<p><code>yarn.maximum-failed-containers</code> (Default: number of requested containers). Maximum number of containers the system
is going to reallocate in case of a failure.</p>
</li>
<li>
<p><code>yarn.application-attempts</code> (Default: 1). Number of ApplicationMaster restarts. Note that that the entire Flink cluster
will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you’ll need
to set the JM host:port manually. It is recommended to leave this option at 1.</p>
</li>
<li><code>yarn.heartbeat-delay</code> (Default: 5 seconds). Time between heartbeats with the ResourceManager.</li>
</ul>
<h2 id="background">Background</h2>
<h3 id="configuring-the-network-buffers">Configuring the Network Buffers</h3>
<p>Network buffers are a critical resource for the communication layers. They are
used to buffer records before transmission over a network, and to buffer
incoming data before dissecting it into records and handing them to the
application. A sufficient number of network buffers is critical to achieve a
good throughput.</p>
<p>In general, configure the task manager to have enough buffers that each logical
network connection on you expect to be open at the same time has a dedicated
buffer. A logical network connection exists for each point-to-point exchange of
data over the network, which typically happens at repartitioning- or
broadcasting steps. In those, each parallel task inside the TaskManager has to
be able to talk to all other parallel tasks. Hence, the required number of
buffers on a task manager is <em>total-degree-of-parallelism</em> (number of targets)
* <em>intra-node-parallelism</em> (number of sources in one task manager) * <em>n</em>.
Here, <em>n</em> is a constant that defines how many repartitioning-/broadcasting steps
you expect to be active at the same time.</p>
<p>Since the <em>intra-node-parallelism</em> is typically the number of cores, and more
than 4 repartitioning or broadcasting channels are rarely active in parallel, it
frequently boils down to <em>#cores\^2\^</em> * <em>#machines</em> * 4. To support for
example a cluster of 20 8-core machines, you should use roughly 5000 network
buffers for optimal throughput.</p>
<p>Each network buffer has by default a size of 32 KiBytes. In the above example, the
system would allocate roughly 300 MiBytes for network buffers.</p>
<p>The number and size of network buffers can be configured with the following
parameters:</p>
<ul>
<li><code>taskmanager.network.numberOfBuffers</code>, and</li>
<li><code>taskmanager.network.bufferSizeInBytes</code>.</li>
</ul>
<h3 id="configuring-temporary-io-directories">Configuring Temporary I/O Directories</h3>
<p>Although Flink aims to process as much data in main memory as possible,
it is not uncommon that more data needs to be processed than memory is
available. Flink’s runtime is designed to write temporary data to disk
to handle these situations.</p>
<p>The <code>taskmanager.tmp.dirs</code> parameter specifies a list of directories into which
Flink writes temporary files. The paths of the directories need to be
separated by ‘:’ (colon character). Flink will concurrently write (or
read) one temporary file to (from) each configured directory. This way,
temporary I/O can be evenly distributed over multiple independent I/O devices
such as hard disks to improve performance. To leverage fast I/O devices (e.g.,
SSD, RAID, NAS), it is possible to specify a directory multiple times.</p>
<p>If the <code>taskmanager.tmp.dirs</code> parameter is not explicitly specified,
Flink writes temporary data to the temporary directory of the operating
system, such as <em>/tmp</em> in Linux systems.</p>
<h3 id="configuring-taskmanager-processing-slots">Configuring TaskManager processing slots</h3>
<p>Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.</p>
<p>Each Flink TaskManager provides processing slots in the cluster. The number of slots
is typically proportional to the number of available CPU cores <strong>of each</strong> TaskManager.
As a general recommendation, the number of available CPU cores is a good default for
<code>taskmanager.numberOfTaskSlots</code>.</p>
<p>When starting a Flink application, users can supply the default number of slots to use for that job.
The command line value therefore is called <code>-p</code> (for parallelism). In addition, it is possible
to <a href="http://flink.apache.org/docs/master/apis/programming_guide.html#parallel-execution">set the number of slots in the programming APIs</a> for
the whole application and individual operators.</p>
<p><img src="fig/slots_parallelism.svg" class="img-responsive" /></p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>