package org.apache.samza.checkpoint
import java.util
import java.util.Properties
import java.util.regex.Pattern
import joptsimple.ArgumentAcceptingOptionSpec
import joptsimple.OptionSet
import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, Logging}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import scala.collection.JavaConverters._
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.execution.JobPlanner
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.mutable.ListBuffer
* Command-line tool for inspecting and manipulating the checkpoints for a job.
* This can be used, for example, to force a job to re-process a stream from the
* beginning.
* When running this tool, you need to provide the configuration URI of job you
* want to inspect/manipulate. The tool prints out the latest checkpoint for that
* job (latest offset of each partition of each input stream).
* To update the checkpoint, you need to provide a second properties file
* containing the offsets you want. It needs to be in the same format as the tool
* prints out the latest checkpoint:
* tasknames.<taskname>.systems.<system>.streams.<topic>.partitions.<partition>=<offset>
* The provided offset definitions will be grouped by <taskname> and written to
* individual checkpoint entries for each <taskname>
* NOTE: A job only reads its checkpoint when it starts up. Therefore, if you want
* your checkpoint change to take effect, you have to first stop the job, then
* write a new checkpoint, then start it up again. Writing a new checkpoint while
* the job is running may not have any effect.
* If you're building Samza from source, you can use the 'checkpointTool' gradle
* task as a short-cut to running this tool.
object CheckpointTool {
/** Format in which SystemStreamPartition is represented in a properties file */
val SSP_PATTERN = ""
val SSP_REGEX: Pattern = Pattern.compile("tasknames\\.(.+)\\.systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)")
type TaskNameToCheckpointMap = Map[TaskName, Map[SystemStreamPartition, String]]
class CheckpointToolCommandLine extends CommandLine with Logging {
val newOffsetsOpt: ArgumentAcceptingOptionSpec[URI] =
parser.accepts("new-offsets", "URI of file (e.g. file:///some/local/ " +
"containing offsets to write to the job's checkpoint topic. " +
"If not given, this tool prints out the current offsets.")
var newOffsets: TaskNameToCheckpointMap = _
def parseOffsets(propertiesFile: Properties): TaskNameToCheckpointMap = {
var checkpoints : ListBuffer[(TaskName, Map[SystemStreamPartition, String])] = ListBuffer()
propertiesFile.asScala.foreach { case (key, value) =>
val matcher = SSP_REGEX.matcher(key)
if (matcher.matches) {
val taskname = new TaskName(
val partition = new Partition(Integer.parseInt(
val ssp = new SystemStreamPartition(,, partition)
val tuple = taskname -> Map(ssp -> value)
checkpoints += tuple
} else {
warn("Warning: ignoring unrecognised property: %s = %s" format (key, value))
val taskNameSSPPairs = checkpoints.toList
if(taskNameSSPPairs.isEmpty) {
return null
// Need to turn taskNameSSPPairs List[(taskname, Map[SystemStreamPartition, Offset])] to Map[TaskName, Map[SSP, Offset]]
taskNameSSPPairs // List[(taskname, Map[SystemStreamPartition, Offset])]
.groupBy(_._1) // Group by taskname
.mapValues(m => // Drop the extra taskname that we grouped on
.mapValues(m => m.reduce( _ ++ _)) // Merge all the maps of SSPs->Offset into one for the whole taskname
override def loadConfig(options: OptionSet): Config = {
val config = super.loadConfig(options)
if (options.has(newOffsetsOpt)) {
val newOffsetsInputStream = new FileInputStream(options.valueOf(newOffsetsOpt).getPath)
val properties = new Properties()
newOffsets = parseOffsets(properties)
def apply(config: Config, offsets: TaskNameToCheckpointMap): CheckpointTool = {
val metadataStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap())
new CheckpointTool(offsets, metadataStore, config)
def main(args: Array[String]) {
val cmdline = new CheckpointToolCommandLine
val options = cmdline.parser.parse(args: _*)
val userConfig = cmdline.loadConfig(options)
val jobConfig = JobPlanner.generateSingleJobConfig(userConfig)
val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig)
info(s"Using the rewritten config: $rewrittenConfig")
val tool = CheckpointTool(rewrittenConfig, cmdline.newOffsets)
class CheckpointTool(newOffsets: TaskNameToCheckpointMap, coordinatorStreamStore: CoordinatorStreamStore, userDefinedConfig: Config) extends Logging {
def run() {
val configFromCoordinatorStream: Config = getConfigFromCoordinatorStream(coordinatorStreamStore)
println("Configuration read from the coordinator stream")
val combinedConfigMap: util.Map[String, String] = new util.HashMap[String, String]()
val combinedConfig: Config = new MapConfig(combinedConfigMap)
val taskConfig = new TaskConfig(combinedConfig)
// Instantiate the checkpoint manager with coordinator stream configuration.
val checkpointManager: CheckpointManager =
JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new MetricsRegistryMap))
.getOrElse(throw new SamzaException("Configuration: task.checkpoint.factory is not defined."))
try {
// Find all the TaskNames that would be generated for this job config
val changelogManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
val jobModelManager = JobModelManager(combinedConfig, changelogManager.readPartitionMapping(),
coordinatorStreamStore, new MetricsRegistryMap())
val taskNames = jobModelManager
val lastCheckpoints = => {
taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
.getOrElse(new Checkpoint(new java.util.HashMap[SystemStreamPartition, String]()))
lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current checkpoint for task: "+ lcp._1))
if (newOffsets != null) {
newOffsets.foreach {
case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) =>
logCheckpoint(taskName, offsets, "New offset to be written for task: " + taskName)
val checkpoint = new Checkpoint(offsets.asJava)
checkpointManager.writeCheckpoint(taskName, checkpoint)
info(s"Updated the checkpoint of the task: $taskName to: $offsets")
} finally {
def getConfigFromCoordinatorStream(coordinatorStreamStore: CoordinatorStreamStore): Config = {
def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition, String], prefix: String) {
def logLine(tn:TaskName, ssp:SystemStreamPartition, offset:String) = (prefix + ": " + CheckpointTool.SSP_PATTERN + " = %s") format (tn.toString, ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset)
checkpoint.keys.toList.sorted.foreach(ssp => info(logLine(tn, ssp, checkpoint(ssp))))