Stream processing applications are typically long running applications, and they may accumulate state over extended periods of time.
When running a distributed system over a long period of time, expect:
In each of these situations, some or all of S4 nodes will be shutdown. The system may therefore be partly unavailable, and in-memory state accumulated during the execution may be lost.
In order to deal with this kind of situation, S4 provides:
In this document, we first describe the high availability mechanism implemented in S4, then we describe the checkpointing and recovery mechanism, and how to customize it, then we describe future improvements.
In order to guarantee availability in the presence of sudden node failures, S4 provides a mechanism to automatically detect failed nodes and redirect messages to a standby node.
The following figure illustrates this fail-over mechanism:
This technique provides high availability but does not prevent state loss.
S4 clusters are defined with a fixed number of tasks (~ partitions). If you have n partitions and start m nodes, with m>n, you get m-n standby nodes.
Zookeeper considers a node is dead when it cannot reach it after a the session timeout. The session timeout is specified by the client upon connection, and is at minimum twice the tickTime (heartbeat) specified in the Zookeeper ensemble configuration.
Upon node crash, the fail-over mechanism brings a new and fresh node to the cluster. When this node is brought into the cluster, it has no state, no instantiated PE. Messages start arriving at this node, and trigger keyed PE instantiations.
If there is no checkpointing and recovery mechanism, those PEs start with an empty state.
For PEs to recover a previous state, the technique we use is to:
This means that if there is a previous state that was checkpointed, and that a new PE is instantiated because a new key is seen, the PE instance will fetch the corresponding checkpoint, recover the corresponding state, and only then start processing events. State loss is minimal!
In order to minimize the latency, checkpointing is uncoordinated and asynchronous. Uncoordinated checkpointing means that each checkpoint is taken independently, without aiming at global consistency. Asynchronous checkpointing aims at minimizing the impact on the event processing execution path.
Taking a checkpoint is a 2 steps operations, both handled outside of the event processing path:
The following figure shows the various components involved: the checkpointing framework handles the serialization and passes serialized state to a pluggable storage backend:
In order to optimize the usage of resources, recovery is lazy, which means it only happens when necessary. When a message for a new key arrives in the recovered S4 node, a new PE instance is created, and the system tries to fetch a previous checkpoint from storage. If there is a previous state, it is copied to the newly created PE instance. (This implies deserializing a previous object and copying its fields).
A PE can be checkpointed if:
For example, one must make sure fields of type
Stream
(for sending messages downstream) are transient!
Checkpointing intervals are defined per prototype, in time intervals or event counts (for now). This is specified in the application module, using API methods from the ProcessingElement class, and passing a CheckpointingConfiguration object. Please refer to the API documentation.
The twitter example application shipped in the distribution is already configured for enabling checkpointing. See the TwitterCounterApp class.
For instance, here is how to specify a checkpointing frequency of 20s on the TopNTopicPE prototype:
topNTopicPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(20).timeUnit(TimeUnit.SECONDS).build());
This is a node configuration. You need to inject a checkpointing module that speficies a CheckpointingFramework implementation (please use org.apache.s4.core.ft.SafeKeeper) and a backend storage implementation. The backend storage implements the StateStorage interface.
We provide a default module (FileSystemBackendCheckpointingModule) that uses a file system backend (DefaultFileSystemStateStorage). It can be used with an NFS setup and introduces no dependency. You may use it by starting an S4 node in the following manner:
./s4 node -c=cluster1 -emc=org.apache.s4.core.ft.FileSystemBackendCheckpointingModule
It is quite straightforward to implement backends for other kinds of storage (key value stores, datagrid, cache, RDBMS). Writing a checkpointing backend consists of implementing a simple interface (StateStorage
) matching your infrastructure or system.
Using an alternative backend is as simple as providing a new module to the S4 node. Here is an example of a module using a ‘Cool’ backend implementation:
#!java public class CoolBackendCheckpointingModule extends AbstractModule { @Override protected void configure() { bind(StateStorage.class).to(CoolStateStorage.class); bind(CheckpointingFramework.class).to(SafeKeeper.class); } }
By default, S4 keeps all non transient fields as part of the state, and uses kryo to serialize and deserialize checkpoints, but it is possible to use a different mechanism, by overriding the checkpoint()
, serializeState()
and restoreState()
methods of the ProcessingElement
class.
PEs are eligible for checkpointing when their state is ‘dirty’. The dirty flag is checked through the isDirty()
method, and cleared by calling the clearDirty()
method. In some cases, dependent on the application code, only some of the events may actually change the state of the PE. You should override these methods in order to avoid unjustified checkpointing operations.
The checkpointing framework has a number of overridable parameters, mostly for sizing thread pools:
Serialization thread pool
Storage backend thread pool
Fetching thread pool: fetching is a blocking operation, which can timeout:
In the case the backend is unresponsive, it can be bypassed: