The following questions are frequently asked with regard to the Flink project in general. If you have further questions, make sure to consult the documentation or ask the community.
{% toc %}
Flink is a data processing system and an alternative to Hadoop's MapReduce component. It comes with its own runtime, rather than building on top of MapReduce. As such, it can work completely independently of the Hadoop ecosystem. However, Flink can also access Hadoop‘s distributed file system (HDFS) to read and write data, and Hadoop’s next-generation resource manager (YARN) to provision cluster resources. Since most Flink users are using Hadoop HDFS to store their data, Flink already ships the required libraries to access HDFS.
No. Flink can run without a Hadoop installation. However, a very common setup is to use Flink to analyze data stored in the Hadoop Distributed File System (HDFS). To make these setups work out of the box, Flink bundles the Hadoop client libraries by default.
Additionally, we provide a special YARN Enabled download of Flink for users with an existing Hadoop YARN cluster. Apache Hadoop YARN is Hadoop's cluster resource manager that allows to use different execution engines next to each other on a cluster.
There are a multiple of ways to track the progress of a Flink program:
conf/flink-config.yml
).log/flink-<user>-jobmanager-<host>.log
and log/flink-<user>-taskmanager-<host>.log
).In Flink programs, the parallelism determines how operations are split into individual tasks which are assigned to task slots. Each node in a cluster has at least one task slot. The total number of task slots is the number of all task slots on all machines. If the parallelism is set to N
, Flink tries to divide an operation into N
parallel tasks which can be computed concurrently using the available task slots. The number of task slots should be equal to the parallelism to ensure that all tasks can be computed in a task slot concurrently.
Note: Not all operations can be divided into multiple tasks. For example, a GroupReduce
operation without a grouping has to be performed with a parallelism of 1 because the entire group needs to be present at exactly one node to perform the reduce operation. Flink will determine whether the parallelism has to be 1 and set it accordingly.
The parallelism can be set in numerous ways to ensure a fine-grained control over the execution of a Flink program. See the [Configuration guide]({{ site.docs-snapshot }}/setup/config.html#common-options) for detailed instructions on how to set the parallelism. Also check out [this figure]({{ site.docs-snapshot }}/setup/config.html#configuring-taskmanager-processing-slots) detailing how the processing slots and parallelism are related to each other.
All functions in Flink must be serializable, as defined by java.io.Serializable. Since all function interfaces are serializable, the exception means that one of the fields used in your function is not serializable.
In particular, if your function is an inner class, or anonymous inner class, it contains a hidden reference to the enclosing class (usually called this$0
, if you look at the function in the debugger). If the enclosing class is not serializable, this is probably the source of the error. Solutions are to
It means that the implicit value for the type information could not be provided. Make sure that you have an import org.apache.flink.api.scala._
statement in your code.
If you are using flink operations inside functions or classes that take generic parameters a TypeInformation must be available for that parameter. This can be achieved by using a context bound:
def myFunction[T: TypeInformation](input: DataSet[T]): DataSet[Seq[T]] = { input.reduceGroup( i => i.toSeq ) }
See [Type Extraction and Serialization]({{ site.docs-snapshot }}/internals/types_serialization.html) for an in-depth discussion of how Flink handles types.
If you run Flink in a massively parallel setting (100+ parallel threads), you need to adapt the number of network buffers via the config parameter taskmanager.network.numberOfBuffers
. As a rule-of-thumb, the number of buffers should be at least 4 * numberOfNodes * numberOfTasksPerNode^2
. See [Configuration Reference]({{ site.docs-snapshot }}/setup/config.html) for details.
The most common case for these exception is when Flink is set up with the wrong HDFS version. Because different HDFS versions are often not compatible with each other, the connection between the filesystem master and the client breaks.
Call to <host:port> failed on local exception: java.io.EOFException at org.apache.hadoop.ipc.Client.wrapException(Client.java:775) at org.apache.hadoop.ipc.Client.call(Client.java:743) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) at $Proxy0.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82) at org.apache.flinkruntime.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:276
Please refer to the [download page]({{ site.baseurl }}/downloads.html#maven) and the {% github README.md master “build instructions” %} for details on how to set up Flink for different Hadoop and HDFS versions.
Flink is shipping with the Hadoop 2.2 binaries by default. These binaries are used to connect to HDFS or YARN. It seems that there are some bugs in the HDFS client which cause exceptions while writing to HDFS (in particular under high load). Among the exceptions are the following:
HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby"
java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064 from datanode 172.22.5.81:50010 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)
Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0 at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)
If you are experiencing any of these, we recommend using a Flink build with a Hadoop version matching your local HDFS version. You can also manually build Flink against the exact Hadoop version (for example when using a Hadoop distribution with a custom patch level)
Flink uses a new feature of the Scala compiler (called “quasiquotes”) that have not yet been properly integrated with the Eclipse Scala plugin. In order to make this feature available in Eclipse, you need to manually configure the flink-scala project to use a compiler plugin:
are not grouped/joined correctly?
Keys must correctly implement the methods java.lang.Object#hashCode()
, java.lang.Object#equals(Object o)
, and java.util.Comparable#compareTo(...)
. These methods are always backed with default implementations which are usually inadequate. Therefore, all keys must override hashCode()
and equals(Object o)
.
All data type classes must be public and have a public nullary constructor (constructor with no arguments). Further more, the classes must not be abstract or interfaces. If the classes are internal classes, they must be public and static.
Stopping the processes sometimes takes a few seconds, because the shutdown may do some cleanup work.
In some error cases it happens that the JobManager or TaskManager cannot be stopped with the provided stop-scripts (bin/stop-local.sh
or bin/stop- cluster.sh
). You can kill their processes on Linux/Mac as follows:
jps
command on Linux(if you have OpenJDK installed) or command ps -ef | grep java
to find all Java processes.kill -9 <pid>
, where pid
is the process id of the affected JobManager or TaskManager process.On Windows, the TaskManager shows a table of all processes and allows you to destroy a process by right its entry.
These exceptions occur usually when the functions in the program consume a lot of memory by collection large numbers of objects, for example in lists or maps. The OutOfMemoryExceptions in Java are kind of tricky. The exception is not necessarily thrown by the component that allocated most of the memory but by the component that tried to requested the latest bit of memory that could not be provided.
There are two ways to go about this:
See whether you can use less memory inside the functions. For example, use arrays of primitive types instead of object types.
Reduce the memory that Flink reserves for its own processing. The TaskManager reserves a certain portion of the available memory for sorting, hashing, caching, network buffering, etc. That part of the memory is unavailable to the user-defined functions. By reserving it, the system can guarantee to not run out of memory on large inputs, but to plan with the available memory and destage operations to disk, if necessary. By default, the system reserves around 70% of the memory. If you frequently run applications that need more memory in the user-defined functions, you can reduce that value using the configuration entries taskmanager.memory.fraction
or taskmanager.memory.size
. See the [Configuration Reference]({{ site.docs-snapshot }}/setup/config.html) for details. This will leave more memory to JVM heap, but may cause data processing tasks to go to disk more often.
Check the logging behavior of your jobs. Emitting logging per or tuple may be helpful to debug jobs in small setups with tiny data sets, it becomes very inefficient and disk space consuming if used for large input data.
The ./bin/yarn-session.sh
script is intended to run while the YARN-session is open. In some error cases however, the script immediately stops running. The output looks like this:
07:34:27,004 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395604279745_273123 to ResourceManager at jobtracker-host Flink JobManager is now running on worker1:6123 JobManager Web Interface: http://jobtracker-host:54311/proxy/application_1295604279745_273123/ 07:34:51,528 INFO org.apache.flinkyarn.Client - Application application_1295604279745_273123 finished with state FINISHED at 1398152089553 07:34:51,529 INFO org.apache.flinkyarn.Client - Killing the Flink-YARN application. 07:34:51,529 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killing application application_1295604279745_273123 07:34:51,534 INFO org.apache.flinkyarn.Client - Deleting files in hdfs://user/marcus/.flink/application_1295604279745_273123 07:34:51,559 INFO org.apache.flinkyarn.Client - YARN Client is shutting down
The problem here is that the Application Master (AM) is stopping and the YARN client assumes that the application has finished.
There are three possible reasons for that behavior:
The ApplicationMaster exited with an exception. To debug that error, have a look in the logfiles of the container. The yarn-site.xml
file contains the configured path. The key for the path is yarn.nodemanager.log-dirs
, the default value is ${yarn.log.dir}/userlogs
.
YARN has killed the container that runs the ApplicationMaster. This case happens when the AM used too much memory or other resources beyond YARN‘s limits. In this case, you’ll find error messages in the nodemanager logs on the host.
The operating system has shut down the JVM of the AM. This can happen if the YARN configuration is wrong and more memory than physically available is configured. Execute dmesg
on the machine where the AM was running to see if this happened. You see messages from Linux' OOM killer.
While starting the YARN session, you are receiving an exception like this:
Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=robert, access=WRITE, inode="/user/robert":hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:234) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:214) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:158) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5193) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5175) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5149) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2090) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1393) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1382) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1307) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:384) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:380) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:380) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:324) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338) at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2021) at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1989) at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1954) at org.apache.flinkyarn.Utils.setupLocalResource(Utils.java:176) at org.apache.flinkyarn.Client.run(Client.java:362) at org.apache.flinkyarn.Client.main(Client.java:568)
The reason for this error is, that the home directory of the user in HDFS has the wrong permissions. The user (in this case robert
) can not create directories in his own home directory.
Flink creates a .flink/
directory in the users home directory where it stores the Flink jar and configuration file.
For streaming programs, Flink has a novel approach to draw periodic snapshots of the streaming dataflow state and use those for recovery. This mechanism is both efficient and flexible. See the documentation on [streaming fault tolerance]({{ site.docs-snapshot }}/internals/stream_checkpointing.html) for details.
For batch processing programs, Flink remembers the programs sequence of transformations and can restart failed jobs.
[Flink‘s Accumulators]({{ site.docs-snapshot }}/apis/programming_guide.html#accumulators--counters) work very similar like Hadoop’s counters, but are more powerful.
Flink has a Distributed Cache that is deeply integrated with the APIs. Please refer to the JavaDocs for details on how to use it.
In order to make data sets available on all tasks, we encourage you to use [Broadcast Variables]({{ site.docs-snapshot }}/apis/programming_guide.html#broadcast-variables) instead. They are more efficient and easier to use than the distributed cache.