Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) often hold state in various forms:
Checkpointed
interface to make their local variables fault tolerantSee also [Working with State]({{ site.baseurl }}/apis/streaming_guide.html#working_with_state) in the streaming API guide.
When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen State Backend.
Out of the box, Flink bundles two state backends: MemoryStateBacked and FsStateBackend. If nothing else is configured, the system will use the MemoryStateBacked.
The MemoryStateBacked holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.
Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.
Limitations of the MemoryStateBackend:
The MemoryStateBackend is encouraged for:
The FsStateBackend (FileSystemStateBackend) is configured with a file system URL (type, address, path), such as for example “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The FsStateBackend holds in-flight data in the TaskManager‘s memory. Upon checkpoints, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The FsStateBackend is encouraged for:
State backends can be configured per job. In addition, you can define a default state backend to be used when the job does not explicitly define a state backend.
The per-job state backend is set on the StreamExecutionEnvironment
of the job, as shown in the example below:
A default state backend can be configured in the flink-conf.yaml
, using the configuration key state.backend
.
Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), or the fully qualified class name of the class that implements the state backend factory FsStateBackendFactory.
In the case where the default state backend is set to filesystem, the entry state.backend.fs.checkpointdir
defines the directory where the checkpoint data will be stored.
A sample section in the configuration file could look as follows:
# The backend that will be used to store operator state checkpoints state.backend: filesystem # Directory for storing checkpoints state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints