| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| syntax = "proto2"; |
| package heron.proto.ckptmgr; |
| option java_package = "org.apache.heron.proto.ckptmgr"; |
| option java_outer_classname = "CheckpointManager"; |
| |
| import "common.proto"; |
| import "physical_plan.proto"; |
| |
| // The messages in this file are for stateful processing and providing |
| // effectively once semantics. Below are the various sequence of events that |
| // happen in a few(non exhaustive) scenarios |
| // |
| // On Startup |
| // For topologies needing effectively once, the startup sequence of |
| // tmaster/stmgr/instance is a little different. Normally, upon |
| // startup, all stmgrs connect to tmaster and then tmaster computes pplan |
| // and distributes to stmgrs, which in turn send it to all their |
| // local instances. As soon as instances get their pplan, they can |
| // start processing(spouts emitting tuples, etc). However for |
| // stateful topologies, this is a little different. First |
| // in addition to the pplan, tmaster needs to send the last |
| // consistent checkpoint id(if any) to recover from as well. |
| // Secondly instances need to recover their prev state before |
| // they can process any tuples. |
| // WRT messages below, the startup sequence is the following. |
| // From the StatefulConsistentCheckpoints message stored in the |
| // state manager, Tmaster picks the latest checkpoint_id and |
| // sends RestoreTopologyStateRequest to all stmgrs. Each stmgr |
| // does a GetInstanceStateRequest/GetInstanceStateResponse |
| // dance with its local checkpoint manager for all of its local |
| // instances to fetch the state. Upon fetch, stmgr does a |
| // RestoreInstanceStateRequest/RestoreInstanceStateResponse |
| // dance with the corresponding instances to restore the |
| // state. Completion of which, stmgr sends out the |
| // RestoreTopologyStateResponse message to the tmaster. |
| // Once the tmaster receives this from all stmgrs, it sends |
| // out the StartStmgrStatefulProcessing message to all |
| // stmgrs which inturn send out StartInstanceStatefulProcessing |
| // to all their local instances. This gets the topology |
| // humming |
| // |
| // Periodic checkpoints |
| // Every so often(as dictated by TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL) |
| // tmaster sends out StartStatefulCheckpoint message to all |
| // stmgrs. Each stmgr sends out InitiateStatefulCheckpoint |
| // message to each of its local spout instances. Because of the best effort |
| // nature, this is delivered as a message. After instance does its |
| // checkpoint, it sends StoreInstanceStateCheckpoint to stmgr to store |
| // its checkpoint. Stmgr passes this to ckptmgr via |
| // SaveInstanceStateRequest/SaveInstanceStateResponse dance with its |
| // local checkpoint manager. Upon getting a successful |
| // SaveInstanceStateResponse message, stmgr sends out a |
| // InstanceStateStored message notifying tmaster that the |
| // state of a particular instance has been saved. Parallelly |
| // stmgr also sends out DownstreamStatefulCheckpoint message |
| // to each of the downstream components to do the distributed |
| // Lamport style checkpointing. Meanwhile at tmaster, when it |
| // receives InstanceStateStored from all instances for a particular |
| // checkpoint_id, it has reached globally consistent checkpoint |
| // and it stores it in the state manager as StatefulConsistentCheckpoint |
| // |
| // Failure Recovery |
| // Either when stmgr dies, or when an instance dies(which is notified |
| // by the stmgr using the ResetTopologyState message), tmaster initiates |
| // the recovery mechanism. The recovery mechanism is exactly the same |
| // as the initial startup described above |
| |
| // Message representing information about a globally consistent |
| // checkpoint |
| message StatefulConsistentCheckpoint { |
| required string checkpoint_id = 1; |
| required string packing_plan_id = 2; |
| // One can add more meta data about this ckpt later |
| } |
| |
| // message stored in the state manager by the tmaster |
| message StatefulConsistentCheckpoints { |
| // An ordered list of the globally consistent checkpoints |
| // that have been snapshotted and can be recovered from. |
| // 0th element is the most recent one, 1st is the next recent |
| // and so on |
| repeated StatefulConsistentCheckpoint consistent_checkpoints = 1; |
| } |
| |
| /* |
| * tmaster <-> stmgr messages |
| */ |
| |
| // Message sent to stmgrs by the tmaster to initiate checkpointing |
| message StartStatefulCheckpoint { |
| required string checkpoint_id = 1; |
| } |
| |
| // Message sent by tmaster to stmgr asking them to reset their instances |
| // to this checkpoint |
| message RestoreTopologyStateRequest { |
| required string checkpoint_id = 1; |
| // Every Attempt made by Tmaster to restore to a globally consistent |
| // checkpoint is tagged with a unique id. The stmgrs include this in their |
| // RestoreTopologyStateResponse below. This way tmaster could |
| // try restoring to the same checkpoint_id multiple times and still |
| // keep track of for which attempt the restore was in the responses |
| required int64 restore_txid = 2; |
| } |
| |
| // Message that stmgr sends to tmaster after it restores |
| // all of its local instances to a checkpoint_id |
| message RestoreTopologyStateResponse { |
| required heron.proto.system.Status status = 1; |
| required string checkpoint_id = 2; |
| required int64 restore_txid = 3; |
| } |
| |
| // Message sent by stmgr to tmaster asking it to reset the topology |
| // to some valid checkpoint. This is sent either if stmgr dies |
| // and comes back up or if an instance dies. |
| message ResetTopologyState { |
| // Was there a dead/recovered stmgr connection that was the reason |
| // for this request? |
| optional string dead_stmgr = 1; |
| // Was there a dead/recovered local instance connection that was the reason |
| // for this request? |
| optional int32 dead_taskid = 2; |
| // This one describes the reason for sending the message. |
| // Used primarily for debugging |
| required string reason = 3; |
| } |
| |
| // Message sent by stmgr to tmaster informing it about |
| // the fact that we stored a checkpoint belonging |
| // to the instance |
| message InstanceStateStored { |
| required string checkpoint_id = 1; |
| required heron.proto.system.Instance instance = 2; |
| } |
| |
| // Message sent by tmaster to stmgr to start processing |
| // For stateful processing, all the topology components |
| // should be rolled back to a consistent state before they can |
| // start processing. |
| message StartStmgrStatefulProcessing { |
| required string checkpoint_id = 1; |
| } |
| |
| /* |
| * tmaster -> ckptmgr messages |
| */ |
| |
| // This is the message that a tmaster sends |
| // when it wants to register itself |
| // with checkpoint manager |
| message RegisterTMasterRequest { |
| required string topology_name = 1; |
| required string topology_id = 2; |
| } |
| |
| // This is the message that checkpoint manager |
| // sends when it receives the register request |
| // from tmaster |
| message RegisterTMasterResponse { |
| required heron.proto.system.Status status = 1; |
| } |
| |
| // Message sent by tmaster to ckptmgr to cleanup |
| // old checkpoint state. |
| message CleanStatefulCheckpointRequest { |
| // Any checkpoints older than this can be cleaned |
| optional string oldest_checkpoint_preserved = 1; |
| optional bool clean_all_checkpoints = 2; |
| } |
| |
| // Message sent by ckptmgr to tmaster about the cleanup request |
| message CleanStatefulCheckpointResponse { |
| required heron.proto.system.Status status = 1; |
| repeated string cleaned_checkpoint_ids = 2; |
| } |
| |
| /* |
| * stmgr -> ckptmgr messages |
| */ |
| |
| // This message encapsulates the info associated with |
| // state of an instance/partition. only one of the state_location |
| // and state should be set |
| message InstanceStateCheckpoint { |
| required string checkpoint_id = 1; |
| optional bytes state = 2; |
| optional string state_location = 3; |
| } |
| |
| // This message encapsulates the info associated with |
| // checkpoint metadata of a component |
| message CheckpointComponentMetadata { |
| required string component_name = 1; |
| required int32 parallelism = 2; |
| } |
| |
| // This is the message that a stmgr sends |
| // when it wants to register itself with |
| // the checkpoint manager |
| message RegisterStMgrRequest { |
| required string topology_name = 1; |
| required string topology_id = 2; |
| required string stmgr_id = 3; |
| required heron.proto.system.PhysicalPlan physical_plan = 4; |
| } |
| |
| // This is the message that checkpoint manager |
| // sends when it receives the register request |
| // from stmgr |
| message RegisterStMgrResponse { |
| required heron.proto.system.Status status = 1; |
| } |
| |
| // This is the request that StMgr sends to Checkpoint Mgr |
| // to store instance checkpoint data |
| message SaveInstanceStateRequest { |
| // Information about the instance whose state this is |
| required heron.proto.system.Instance instance = 1; |
| // TODO(nwang): currently repartition is not supported and each request |
| // contains a single partition. |
| required InstanceStateCheckpoint checkpoint = 2; |
| } |
| |
| // After successfully saving state, this is what |
| // Checkpoint Mgr sends back as response |
| message SaveInstanceStateResponse { |
| required heron.proto.system.Status status = 1; |
| required heron.proto.system.Instance instance = 2; |
| required string checkpoint_id = 3; |
| } |
| |
| // This is the request that StMgr sends to Checkpoint Mgr |
| // to fetch the instance state data |
| message GetInstanceStateRequest { |
| required heron.proto.system.Instance instance = 1; |
| required string checkpoint_id = 2; |
| } |
| |
| // This is the response that Checkpoint Mgr sends to StMgr |
| // for its get instance state request |
| message GetInstanceStateResponse { |
| required heron.proto.system.Status status = 1; |
| required heron.proto.system.Instance instance = 2; |
| required string checkpoint_id = 3; |
| optional InstanceStateCheckpoint checkpoint = 4; |
| } |
| |
| /* |
| * stmgr -> Instance messages |
| */ |
| |
| // This is the message that the stmgr sends to its |
| // local tasks to begin initiating checkpoint |
| message InitiateStatefulCheckpoint { |
| required string checkpoint_id = 1; |
| } |
| |
| // This is the message that the instance sends |
| // when it wants stmgr to store its state |
| message StoreInstanceStateCheckpoint { |
| required InstanceStateCheckpoint state = 1; |
| } |
| |
| // This is the message that stmgr sends to its instance |
| // asking them to restore their state |
| message RestoreInstanceStateRequest { |
| required InstanceStateCheckpoint state = 1; |
| } |
| |
| // This is the message that instance sends to its stmgr |
| // when done with restoring the state |
| message RestoreInstanceStateResponse { |
| required heron.proto.system.Status status = 1; |
| required string checkpoint_id = 2; |
| } |
| |
| // Message sent by stmgr to Instance to start processing |
| // This is related to StartStmgrStatefulProcessing message above |
| // but between stmgr -> instance. |
| message StartInstanceStatefulProcessing { |
| required string checkpoint_id = 1; |
| } |
| |
| /* |
| * stmgr -> stmgr messages |
| */ |
| |
| // Once stmgr receives StoreInstanceStateCheckpoint from an instance, |
| // it sends this message to all of that instance's |
| // downstream components' stmgrs to propagate the checkpoint marker |
| message DownstreamStatefulCheckpoint { |
| required int32 origin_task_id = 1; |
| required int32 destination_task_id = 2; |
| required string checkpoint_id = 3; |
| } |