Samza provides fault-tolerant processing of streams: Samza guarantees that messages won't be lost, even if your job crashes, if a machine dies, if there is a network fault, or something else goes wrong. In order to provide this guarantee, Samza expects the input system to meet the following requirements:
Kafka meets these requirements, but they can also be implemented with other message broker systems.
As described in the section on SamzaContainer, each task instance of your job consumes one partition of an input stream. Each task has a current offset for each input stream: the offset of the next message to be read from that stream partition. Every time a message is read from the stream, the current offset moves forwards.
If a Samza container fails, it needs to be restarted (potentially on another machine) and resume processing where the failed container left off. In order to enable this, a container periodically checkpoints the current offset for each task instance.
When a Samza container starts up, it looks for the most recent checkpoint and starts consuming messages from the checkpointed offsets. If the previous container failed unexpectedly, the most recent checkpoint may be slightly behind the current offsets (i.e. the job may have consumed some more messages since the last checkpoint was written), but we can't know for sure. In that case, the job may process a few messages again.
This guarantee is called at-least-once processing: Samza ensures that your job doesn't miss any messages, even if containers need to be restarted. However, it is possible for your job to see the same message more than once when a container is restarted. We are planning to address this in a future version of Samza, but for now it is just something to be aware of: for example, if you are counting page views, a forcefully killed container could cause events to be slightly over-counted. You can reduce duplication by checkpointing more frequently, at a slight performance cost.
For checkpoints to be effective, they need to be written somewhere where they will survive faults. Samza allows you to write checkpoints to the file system (using FileSystemCheckpointManager), but that doesn't help if the machine fails and the container needs to be restarted on another machine. The most common configuration is to use Kafka for checkpointing. You can enable this with the following job configuration:
{% highlight jproperties %}
job.name=example-job
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka
task.commit.ms=60000 {% endhighlight %}
In this configuration, Samza writes checkpoints to a separate Kafka topic called __samza_checkpoint_<job-name>_<job-id> (in the example configuration above, the topic would be called __samza_checkpoint_example-job_1). Once per minute, Samza automatically sends a message to this topic, in which the current offsets of the input streams are encoded. When a Samza container starts up, it looks for the most recent offset message in this topic, and loads that checkpoint.
Sometimes it can be useful to use checkpoints only for some input streams, but not for others. In this case, you can tell Samza to ignore any checkpointed offsets for a particular stream name:
{% highlight jproperties %}
systems.kafka.streams.my-special-topic.samza.reset.offset=true
systems.kafka.streams.my-special-topic.samza.offset.default=oldest {% endhighlight %}
The following table explains the meaning of these configuration parameters:
Note that the example configuration above causes your tasks to start consuming from the oldest offset every time a container starts up. This is useful in case you have some in-memory state in your tasks that you need to rebuild from source data in an input stream. If you are using streams in this way, you may also find bootstrap streams useful.
If you want to make a one-off change to a job‘s consumer offsets, for example to force old messages to be processed again with a new version of your code, you can use CheckpointTool to inspect and manipulate the job’s checkpoint. The tool is included in Samza's source repository.
To inspect a job‘s latest checkpoint, you need to specify your job’s config file, so that the tool knows which job it is dealing with:
{% highlight bash %} samza-example/target/bin/checkpoint-tool.sh
--config-path=/path/to/job/config.properties {% endhighlight %}
This command prints out the latest checkpoint in a properties file format. You can save the output to a file, and edit it as you wish. For example, to jump back to the oldest possible point in time, you can set all the offsets to 0. Then you can feed that properties file back into checkpoint-tool.sh and save the modified checkpoint:
{% highlight bash %} samza-example/target/bin/checkpoint-tool.sh
--config-path=/path/to/job/config.properties
--new-offsets=/path/to/new/offsets.properties {% endhighlight %}
Note that Samza only reads checkpoints on container startup. In order for your checkpoint change to take effect, you need to first stop the job, then save the modified offsets, and then start the job again. If you write a checkpoint while the job is running, it will most likely have no effect.
Currently Samza takes care of checkpointing for all the systems. But there are some use-cases when we may need to inform the Consumer about each checkpoint we make. Here are few examples:
In order to use the checkpoint callback a SystemConsumer needs to implement the CheckpointListener interface: {% highlight java %} public interface CheckpointListener { void onCheckpoint(Map<SystemStreamPartition, String> offsets); } {% endhighlight %} For the SystemConsumers which implement this interface Samza will invoke onCheckpoint() callback every time OffsetManager checkpoints. Checkpoints are done per task, and ‘offsets’ are all the offsets Samza checkpoints for a task, and these are the offsets which will be passed to the consumer on restart. Note that the callback will happen after the checkpoint and is not atomic.