blob: b23114b8c12cd4dc492ab57499b93a6774079c33 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink: Frequently Asked Questions (FAQ)</title>
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="/css/flink.css">
<link rel="stylesheet" href="/css/syntax.css">
<!-- Blog RSS feed -->
<link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="/"><img alt="Apache Flink" src="/img/navbar-brand-logo.jpg" width="78px" height="40px"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<!-- Overview -->
<li><a href="/index.html">Overview</a></li>
<!-- Quickstart -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Quickstart <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/setup_quickstart.html">Setup</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/java_api_quickstart.html">Java API</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html">Scala API</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/run_example_quickstart.html">Run Step-by-Step Example</a></li>
</ul>
</li>
<!-- Features -->
<li><a href="/features.html">Features</a></li>
<!-- Downloads -->
<li><a href="/downloads.html">Downloads</a></li>
<!-- Documentation -->
<li class="dropdown">
<a href="" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Documentation <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<!-- Latest stable release -->
<li role="presentation" class="dropdown-header"><strong>Latest Release</strong> (Stable)</li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9">0.9.0 Documentation</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/api/java" class="active">0.9.0 Javadocs</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/api/scala/index.html" class="active">0.9.0 ScalaDocs</a></li>
<!-- Snapshot docs -->
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Snapshot</strong> (Development)</li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-master">0.10 Documentation</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/java" class="active">0.10 Javadocs</a></li>
<li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/scala/index.html" class="active">0.10 ScalaDocs</a></li>
<!-- Wiki -->
<li class="divider"></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li>
</ul>
</li>
<!-- FAQ -->
<li class="hidden-sm active"><a href="/faq.html">FAQ</a></li>
</ul>
<ul class="nav navbar-nav navbar-right">
<!-- Blog -->
<li class=" hidden-md hidden-sm"><a href="/blog/">Blog</a></li>
<li class="dropdown hidden-md hidden-sm">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Community <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<!-- Community -->
<li role="presentation" class="dropdown-header"><strong>Community</strong></li>
<li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
<li><a href="/community.html#irc">IRC</a></li>
<li><a href="/community.html#stack-overflow">Stack Overflow</a></li>
<li><a href="/community.html#issue-tracker">Issue Tracker</a></li>
<li><a href="/community.html#source-code">Source Code</a></li>
<li><a href="/community.html#people">People</a></li>
<!-- Contribute -->
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="/how-to-contribute.html">How to Contribute</a></li>
<li><a href="/coding-guidelines.html">Coding Guidelines</a></li>
</ul>
</li>
<li class="dropdown hidden-md hidden-sm">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Project <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<!-- Project -->
<li role="presentation" class="dropdown-header"><strong>Project</strong></li>
<li><a href="/material.html">Material</a></li>
<li><a href="https://twitter.com/apacheflink"><small><span class="glyphicon glyphicon-new-window"></span></small> Twitter</a></li>
<li><a href="https://github.com/apache/flink"><small><span class="glyphicon glyphicon-new-window"></span></small> GitHub</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li>
</ul>
</li>
</ul>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-8 col-sm-offset-2">
<div class="row">
<div class="col-sm-12"><h1>Frequently Asked Questions (FAQ)</h1></div>
</div>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<p>The following questions are frequently asked with regard to the Flink project <strong>in general</strong>. If you have further questions, make sure to consult the <a href="">documentation</a> or <a href="">ask the community</a>.</p>
<div class="page-toc">
<ul id="markdown-toc">
<li><a href="#general" id="markdown-toc-general">General</a> <ul>
<li><a href="#is-flink-a-hadoop-project" id="markdown-toc-is-flink-a-hadoop-project">Is Flink a Hadoop Project?</a></li>
<li><a href="#do-i-have-to-install-apache-hadoop-to-use-flink" id="markdown-toc-do-i-have-to-install-apache-hadoop-to-use-flink">Do I have to install Apache Hadoop to use Flink?</a></li>
</ul>
</li>
<li><a href="#usage" id="markdown-toc-usage">Usage</a> <ul>
<li><a href="#how-do-i-assess-the-progress-of-a-flink-program" id="markdown-toc-how-do-i-assess-the-progress-of-a-flink-program">How do I assess the progress of a Flink program?</a></li>
<li><a href="#how-can-i-figure-out-why-a-program-failed" id="markdown-toc-how-can-i-figure-out-why-a-program-failed">How can I figure out why a program failed?</a></li>
<li><a href="#how-do-i-debug-flink-programs" id="markdown-toc-how-do-i-debug-flink-programs">How do I debug Flink programs?</a></li>
<li><a href="#what-is-the-parallelism-how-do-i-set-it" id="markdown-toc-what-is-the-parallelism-how-do-i-set-it">What is the parallelism? How do I set it?</a></li>
</ul>
</li>
<li><a href="#errors" id="markdown-toc-errors">Errors</a> <ul>
<li><a href="#why-am-i-getting-a-nonserializableexception-" id="markdown-toc-why-am-i-getting-a-nonserializableexception-">Why am I getting a “NonSerializableException” ?</a></li>
<li><a href="#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters" id="markdown-toc-in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters">In Scala API, I get an error about implicit values and evidence parameters</a></li>
<li><a href="#i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this" id="markdown-toc-i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this">I get an error message saying that not enough buffers are available. How do I fix this?</a></li>
<li><a href="#my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause" id="markdown-toc-my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause">My job fails early with a java.io.EOFException. What could be the cause?</a></li>
<li><a href="#my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do" id="markdown-toc-my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do">My job fails with various exceptions from the HDFS/Hadoop code. What can I do?</a></li>
<li><a href="#in-eclipse-i-get-compilation-errors-in-the-scala-projects" id="markdown-toc-in-eclipse-i-get-compilation-errors-in-the-scala-projects">In Eclipse, I get compilation errors in the Scala projects</a></li>
<li><a href="#my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types" id="markdown-toc-my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types">My program does not compute the correct result. Why are my custom key types</a></li>
<li><a href="#i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong" id="markdown-toc-i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong">I get a java.lang.InstantiationException for my data type, what is wrong?</a></li>
<li><a href="#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do" id="markdown-toc-i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do">I can’t stop Flink with the provided stop-scripts. What can I do?</a></li>
<li><a href="#i-got-an-outofmemoryexception-what-can-i-do" id="markdown-toc-i-got-an-outofmemoryexception-what-can-i-do">I got an OutOfMemoryException. What can I do?</a></li>
<li><a href="#why-do-the-taskmanager-log-files-become-so-huge" id="markdown-toc-why-do-the-taskmanager-log-files-become-so-huge">Why do the TaskManager log files become so huge?</a></li>
</ul>
</li>
<li><a href="#yarn-deployment" id="markdown-toc-yarn-deployment">YARN Deployment</a> <ul>
<li><a href="#the-yarn-session-runs-only-for-a-few-seconds" id="markdown-toc-the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</a></li>
<li><a href="#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup" id="markdown-toc-the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</a></li>
</ul>
</li>
<li><a href="#features" id="markdown-toc-features">Features</a> <ul>
<li><a href="#what-kind-of-fault-tolerance-does-flink-provide" id="markdown-toc-what-kind-of-fault-tolerance-does-flink-provide">What kind of fault-tolerance does Flink provide?</a></li>
<li><a href="#are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported" id="markdown-toc-are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</a></li>
</ul>
</li>
</ul>
</div>
<h2 id="general">General</h2>
<h3 id="is-flink-a-hadoop-project">Is Flink a Hadoop Project?</h3>
<p>Flink is a data processing system and an <strong>alternative to Hadoop’s
MapReduce component</strong>. It comes with its <em>own runtime</em>, rather than building on top
of MapReduce. As such, it can work completely independently of the Hadoop
ecosystem. However, Flink can also access Hadoop’s distributed file
system (HDFS) to read and write data, and Hadoop’s next-generation resource
manager (YARN) to provision cluster resources. Since most Flink users are
using Hadoop HDFS to store their data, Flink already ships the required libraries to
access HDFS.</p>
<h3 id="do-i-have-to-install-apache-hadoop-to-use-flink">Do I have to install Apache Hadoop to use Flink?</h3>
<p><strong>No</strong>. Flink can run <strong>without</strong> a Hadoop installation. However, a <em>very common</em>
setup is to use Flink to analyze data stored in the Hadoop Distributed
File System (HDFS). To make these setups work out of the box, Flink bundles the
Hadoop client libraries by default.</p>
<p>Additionally, we provide a special YARN Enabled download of Flink for
users with an existing Hadoop YARN cluster. <a href="http://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html">Apache Hadoop
YARN</a>
is Hadoop’s cluster resource manager that allows to use
different execution engines next to each other on a cluster.</p>
<h2 id="usage">Usage</h2>
<h3 id="how-do-i-assess-the-progress-of-a-flink-program">How do I assess the progress of a Flink program?</h3>
<p>There are a multiple of ways to track the progress of a Flink program:</p>
<ul>
<li>The JobManager (the master of the distributed system) starts a web interface
to observe program execution. In runs on port 8081 by default (configured in
<code>conf/flink-config.yml</code>).</li>
<li>When you start a program from the command line, it will print the status
changes of all operators as the program progresses through the operations.</li>
<li>All status changes are also logged to the JobManager’s log file.</li>
</ul>
<h3 id="how-can-i-figure-out-why-a-program-failed">How can I figure out why a program failed?</h3>
<ul>
<li>The JobManager web frontend (by default on port 8081) displays the exceptions
of failed tasks.</li>
<li>If you run the program from the command-line, task exceptions are printed to
the standard error stream and shown on the console.</li>
<li>Both the command line and the web interface allow you to figure out which
parallel task first failed and caused the other tasks to cancel the execution.</li>
<li>Failing tasks and the corresponding exceptions are reported in the log files
of the master and the worker where the exception occurred
(<code>log/flink-&lt;user&gt;-jobmanager-&lt;host&gt;.log</code> and
<code>log/flink-&lt;user&gt;-taskmanager-&lt;host&gt;.log</code>).</li>
</ul>
<h3 id="how-do-i-debug-flink-programs">How do I debug Flink programs?</h3>
<ul>
<li>When you start a program locally with the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/local_execution.html">LocalExecutor</a>,
you can place breakpoints in your functions and debug them like normal
Java/Scala programs.</li>
<li>The <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#accumulators--counters">Accumulators</a> are very helpful in
tracking the behavior of the parallel execution. They allow you to gather
information inside the program’s operations and show them after the program
execution.</li>
</ul>
<h3 id="what-is-the-parallelism-how-do-i-set-it">What is the parallelism? How do I set it?</h3>
<p>In Flink programs, the parallelism determines how operations are split into
individual tasks which are assigned to task slots. Each node in a cluster has at
least one task slot. The total number of task slots is the number of all task slots
on all machines. If the parallelism is set to <code>N</code>, Flink tries to divide an
operation into <code>N</code> parallel tasks which can be computed concurrently using the
available task slots. The number of task slots should be equal to the
parallelism to ensure that all tasks can be computed in a task slot concurrently.</p>
<p><strong>Note</strong>: Not all operations can be divided into multiple tasks. For example, a
<code>GroupReduce</code> operation without a grouping has to be performed with a
parallelism of 1 because the entire group needs to be present at exactly one
node to perform the reduce operation. Flink will determine whether the
parallelism has to be 1 and set it accordingly.</p>
<p>The parallelism can be set in numerous ways to ensure a fine-grained control
over the execution of a Flink program. See
the <a href="http://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#common-options">Configuration guide</a> for detailed instructions on how to
set the parallelism. Also check out <a href="http://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-taskmanager-processing-slots">this figure</a> detailing
how the processing slots and parallelism are related to each other.</p>
<h2 id="errors">Errors</h2>
<h3 id="why-am-i-getting-a-nonserializableexception-">Why am I getting a “NonSerializableException” ?</h3>
<p>All functions in Flink must be serializable, as defined by <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">java.io.Serializable</a>.
Since all function interfaces are serializable, the exception means that one
of the fields used in your function is not serializable.</p>
<p>In particular, if your function is an inner class, or anonymous inner class,
it contains a hidden reference to the enclosing class (usually called <code>this$0</code>, if you look
at the function in the debugger). If the enclosing class is not serializable, this is probably
the source of the error. Solutions are to</p>
<ul>
<li>make the function a standalone class, or a static inner class (no more reference to the enclosing class)</li>
<li>make the enclosing class serializable</li>
<li>use a Java 8 lambda function.</li>
</ul>
<h3 id="in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters">In Scala API, I get an error about implicit values and evidence parameters</h3>
<p>It means that the implicit value for the type information could not be provided.
Make sure that you have an <code>import org.apache.flink.api.scala._</code> statement in your code.</p>
<p>If you are using flink operations inside functions or classes that take
generic parameters a TypeInformation must be available for that parameter.
This can be achieved by using a context bound:</p>
<div class="highlight"><pre><code class="language-scala"><span class="k">def</span> <span class="n">myFunction</span><span class="o">[</span><span class="kt">T:</span> <span class="kt">TypeInformation</span><span class="o">](</span><span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Seq</span><span class="o">[</span><span class="kt">T</span><span class="o">]]</span> <span class="k">=</span> <span class="o">{</span>
<span class="n">input</span><span class="o">.</span><span class="n">reduceGroup</span><span class="o">(</span> <span class="n">i</span> <span class="k">=&gt;</span> <span class="n">i</span><span class="o">.</span><span class="n">toSeq</span> <span class="o">)</span>
<span class="o">}</span></code></pre></div>
<p>See <a href="http://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html">Type Extraction and Serialization</a> for
an in-depth discussion of how Flink handles types.</p>
<h3 id="i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this">I get an error message saying that not enough buffers are available. How do I fix this?</h3>
<p>If you run Flink in a massively parallel setting (100+ parallel threads),
you need to adapt the number of network buffers via the config parameter
<code>taskmanager.network.numberOfBuffers</code>.
As a rule-of-thumb, the number of buffers should be at least
<code>4 * numberOfNodes * numberOfTasksPerNode^2</code>. See
<a href="http://ci.apache.org/projects/flink/flink-docs-master/setup/config.html">Configuration Reference</a> for details.</p>
<h3 id="my-job-fails-early-with-a-javaioeofexception-what-could-be-the-cause">My job fails early with a java.io.EOFException. What could be the cause?</h3>
<p>The most common case for these exception is when Flink is set up with the
wrong HDFS version. Because different HDFS versions are often not compatible
with each other, the connection between the filesystem master and the client
breaks.</p>
<div class="highlight"><pre><code class="language-bash">Call to &lt;host:port&gt; failed on <span class="nb">local </span>exception: java.io.EOFException
at org.apache.hadoop.ipc.Client.wrapException<span class="o">(</span>Client.java:775<span class="o">)</span>
at org.apache.hadoop.ipc.Client.call<span class="o">(</span>Client.java:743<span class="o">)</span>
at org.apache.hadoop.ipc.RPC<span class="nv">$Invoker</span>.invoke<span class="o">(</span>RPC.java:220<span class="o">)</span>
at <span class="nv">$Proxy0</span>.getProtocolVersion<span class="o">(</span>Unknown Source<span class="o">)</span>
at org.apache.hadoop.ipc.RPC.getProxy<span class="o">(</span>RPC.java:359<span class="o">)</span>
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode<span class="o">(</span>DFSClient.java:106<span class="o">)</span>
at org.apache.hadoop.hdfs.DFSClient.&lt;init&gt;<span class="o">(</span>DFSClient.java:207<span class="o">)</span>
at org.apache.hadoop.hdfs.DFSClient.&lt;init&gt;<span class="o">(</span>DFSClient.java:170<span class="o">)</span>
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:82<span class="o">)</span>
at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize<span class="o">(</span>DistributedFileSystem.java:276</code></pre></div>
<p>Please refer to the <a href="/downloads.html#maven">download page</a> and
the <a href="https://github.com/apache/flink/tree/master/README.md">build instructions</a>
for details on how to set up Flink for different Hadoop and HDFS versions.</p>
<h3 id="my-job-fails-with-various-exceptions-from-the-hdfshadoop-code-what-can-i-do">My job fails with various exceptions from the HDFS/Hadoop code. What can I do?</h3>
<p>Flink is shipping with the Hadoop 2.2 binaries by default. These binaries are used
to connect to HDFS or YARN.
It seems that there are some bugs in the HDFS client which cause exceptions while writing to HDFS
(in particular under high load).
Among the exceptions are the following:</p>
<ul>
<li><code>HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby"</code></li>
<li>
<p><code>java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064 from datanode 172.22.5.81:50010
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)</code></p>
</li>
<li><code>Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)</code></li>
</ul>
<p>If you are experiencing any of these, we recommend using a Flink build with a Hadoop version matching
your local HDFS version.
You can also manually build Flink against the exact Hadoop version (for example
when using a Hadoop distribution with a custom patch level)</p>
<h3 id="in-eclipse-i-get-compilation-errors-in-the-scala-projects">In Eclipse, I get compilation errors in the Scala projects</h3>
<p>Flink uses a new feature of the Scala compiler (called “quasiquotes”) that have not yet been properly
integrated with the Eclipse Scala plugin. In order to make this feature available in Eclipse, you
need to manually configure the <em>flink-scala</em> project to use a <em>compiler plugin</em>:</p>
<ul>
<li>Right click on <em>flink-scala</em> and choose “Properties”</li>
<li>Select “Scala Compiler” and click on the “Advanced” tab. (If you do not have that, you probably have not set up Eclipse for Scala properly.)</li>
<li>Check the box “Use Project Settings”</li>
<li>In the field “Xplugin”, put the path “/home/<user-name>/.m2/repository/org/scalamacros/paradise_2.10.4/2.0.1/paradise_2.10.4-2.0.1.jar"</user-name></li>
<li>NOTE: You have to build Flink with Maven on the command line first, to make sure the plugin is downloaded.</li>
</ul>
<h3 id="my-program-does-not-compute-the-correct-result-why-are-my-custom-key-types">My program does not compute the correct result. Why are my custom key types</h3>
<p>are not grouped/joined correctly?</p>
<p>Keys must correctly implement the methods <code>java.lang.Object#hashCode()</code>,
<code>java.lang.Object#equals(Object o)</code>, and <code>java.util.Comparable#compareTo(...)</code>.
These methods are always backed with default implementations which are usually
inadequate. Therefore, all keys must override <code>hashCode()</code> and <code>equals(Object o)</code>.</p>
<h3 id="i-get-a-javalanginstantiationexception-for-my-data-type-what-is-wrong">I get a java.lang.InstantiationException for my data type, what is wrong?</h3>
<p>All data type classes must be public and have a public nullary constructor
(constructor with no arguments). Further more, the classes must not be abstract
or interfaces. If the classes are internal classes, they must be public and
static.</p>
<h3 id="i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do">I can’t stop Flink with the provided stop-scripts. What can I do?</h3>
<p>Stopping the processes sometimes takes a few seconds, because the shutdown may
do some cleanup work.</p>
<p>In some error cases it happens that the JobManager or TaskManager cannot be
stopped with the provided stop-scripts (<code>bin/stop-local.sh</code> or <code>bin/stop-
cluster.sh</code>). You can kill their processes on Linux/Mac as follows:</p>
<ul>
<li>Determine the process id (pid) of the JobManager / TaskManager process. You
can use the <code>jps</code> command on Linux(if you have OpenJDK installed) or command
<code>ps -ef | grep java</code> to find all Java processes.</li>
<li>Kill the process with <code>kill -9 &lt;pid&gt;</code>, where <code>pid</code> is the process id of the
affected JobManager or TaskManager process.</li>
</ul>
<p>On Windows, the TaskManager shows a table of all processes and allows you to
destroy a process by right its entry.</p>
<h3 id="i-got-an-outofmemoryexception-what-can-i-do">I got an OutOfMemoryException. What can I do?</h3>
<p>These exceptions occur usually when the functions in the program consume a lot
of memory by collection large numbers of objects, for example in lists or maps.
The OutOfMemoryExceptions in Java are kind of tricky. The exception is not
necessarily thrown by the component that allocated most of the memory but by the
component that tried to requested the latest bit of memory that could not be
provided.</p>
<p>There are two ways to go about this:</p>
<ol>
<li>
<p>See whether you can use less memory inside the functions. For example, use
arrays of primitive types instead of object types.</p>
</li>
<li>
<p>Reduce the memory that Flink reserves for its own processing. The
TaskManager reserves a certain portion of the available memory for sorting,
hashing, caching, network buffering, etc. That part of the memory is unavailable
to the user-defined functions. By reserving it, the system can guarantee to not
run out of memory on large inputs, but to plan with the available memory and
destage operations to disk, if necessary. By default, the system reserves around
70% of the memory. If you frequently run applications that need more memory in
the user-defined functions, you can reduce that value using the configuration
entries <code>taskmanager.memory.fraction</code> or <code>taskmanager.memory.size</code>. See the
<a href="http://ci.apache.org/projects/flink/flink-docs-master/setup/config.html">Configuration Reference</a> for details. This will leave more memory to JVM heap,
but may cause data processing tasks to go to disk more often.</p>
</li>
</ol>
<h3 id="why-do-the-taskmanager-log-files-become-so-huge">Why do the TaskManager log files become so huge?</h3>
<p>Check the logging behavior of your jobs. Emitting logging per or tuple may be
helpful to debug jobs in small setups with tiny data sets, it becomes very
inefficient and disk space consuming if used for large input data.</p>
<h2 id="yarn-deployment">YARN Deployment</h2>
<h3 id="the-yarn-session-runs-only-for-a-few-seconds">The YARN session runs only for a few seconds</h3>
<p>The <code>./bin/yarn-session.sh</code> script is intended to run while the YARN-session is
open. In some error cases however, the script immediately stops running. The
output looks like this:</p>
<div class="highlight"><pre><code>07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host
Flink JobManager is now running on worker1:6123
JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/
07:34:51,528 INFO org.apache.flinkyarn.Client - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553
07:34:51,529 INFO org.apache.flinkyarn.Client - Killing the Flink-YARN application.
07:34:51,529 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killing application application_1295604279745_273123
07:34:51,534 INFO org.apache.flinkyarn.Client - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123
07:34:51,559 INFO org.apache.flinkyarn.Client - YARN Client is shutting down
</code></pre></div>
<p>The problem here is that the Application Master (AM) is stopping and the YARN client assumes that the application has finished.</p>
<p>There are three possible reasons for that behavior:</p>
<ul>
<li>
<p>The ApplicationMaster exited with an exception. To debug that error, have a
look in the logfiles of the container. The <code>yarn-site.xml</code> file contains the
configured path. The key for the path is <code>yarn.nodemanager.log-dirs</code>, the
default value is <code>${yarn.log.dir}/userlogs</code>.</p>
</li>
<li>
<p>YARN has killed the container that runs the ApplicationMaster. This case
happens when the AM used too much memory or other resources beyond YARN’s
limits. In this case, you’ll find error messages in the nodemanager logs on
the host.</p>
</li>
<li>
<p>The operating system has shut down the JVM of the AM. This can happen if the
YARN configuration is wrong and more memory than physically available is
configured. Execute <code>dmesg</code> on the machine where the AM was running to see if
this happened. You see messages from Linux’ <a href="http://linux-mm.org/OOM_Killer">OOM killer</a>.</p>
</li>
</ul>
<h3 id="the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup">The YARN session crashes with a HDFS permission exception during startup</h3>
<p>While starting the YARN session, you are receiving an exception like this:</p>
<div class="highlight"><pre><code>Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5193)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5175)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5149)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2090)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1393)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1382)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1307)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:384)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:380)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:380)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:324)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2021)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1989)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1954)
at org.apache.flinkyarn.Utils.setupLocalResource(Utils.java:176)
at org.apache.flinkyarn.Client.run(Client.java:362)
at org.apache.flinkyarn.Client.main(Client.java:568)
</code></pre></div>
<p>The reason for this error is, that the home directory of the user <strong>in HDFS</strong>
has the wrong permissions. The user (in this case <code>robert</code>) can not create
directories in his own home directory.</p>
<p>Flink creates a <code>.flink/</code> directory in the users home directory
where it stores the Flink jar and configuration file.</p>
<h2 id="features">Features</h2>
<h3 id="what-kind-of-fault-tolerance-does-flink-provide">What kind of fault-tolerance does Flink provide?</h3>
<p>For streaming programs, Flink has a novel approach to draw periodic snapshots of the streaming dataflow state and use those for recovery.
This mechanism is both efficient and flexible. See the documentation on <a href="http://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html">streaming fault tolerance</a> for details.</p>
<p>For batch processing programs, Flink remembers the programs sequence of transformations and can restart failed jobs.</p>
<h3 id="are-hadoop-like-utilities-such-as-counters-and-the-distributedcache-supported">Are Hadoop-like utilities, such as Counters and the DistributedCache supported?</h3>
<p><a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#accumulators--counters">Flink’s Accumulators</a> work very similar like
Hadoop’s counters, but are more powerful.</p>
<p>Flink has a <a href="https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java">Distributed Cache</a> that is deeply integrated with the APIs. Please refer to the <a href="https://github.com/apache/flink/tree/master/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L831">JavaDocs</a> for details on how to use it.</p>
<p>In order to make data sets available on all tasks, we encourage you to use <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#broadcast-variables">Broadcast Variables</a> instead. They are more efficient and easier to use than the distributed cache.</p>
</div>
</div>
<hr />
<div class="footer text-center">
<p>Copyright © 2014-2015 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
<p>Apache Flink, Apache, and the Apache feather logo are trademarks of The Apache Software Foundation.</p>
<p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
</body>
</html>