blob: ea6162c562bfda3b242a3493a1c68070167f60b9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
syntax = "proto2";
package heron.proto.ckptmgr;
option java_package = "org.apache.heron.proto.ckptmgr";
option java_outer_classname = "CheckpointManager";
import "common.proto";
import "physical_plan.proto";
// The messages in this file are for stateful processing and providing
// effectively once semantics. Below are the various sequence of events that
// happen in a few(non exhaustive) scenarios
//
// On Startup
// For topologies needing effectively once, the startup sequence of
// tmaster/stmgr/instance is a little different. Normally, upon
// startup, all stmgrs connect to tmaster and then tmaster computes pplan
// and distributes to stmgrs, which in turn send it to all their
// local instances. As soon as instances get their pplan, they can
// start processing(spouts emitting tuples, etc). However for
// stateful topologies, this is a little different. First
// in addition to the pplan, tmaster needs to send the last
// consistent checkpoint id(if any) to recover from as well.
// Secondly instances need to recover their prev state before
// they can process any tuples.
// WRT messages below, the startup sequence is the following.
// From the StatefulConsistentCheckpoints message stored in the
// state manager, Tmaster picks the latest checkpoint_id and
// sends RestoreTopologyStateRequest to all stmgrs. Each stmgr
// does a GetInstanceStateRequest/GetInstanceStateResponse
// dance with its local checkpoint manager for all of its local
// instances to fetch the state. Upon fetch, stmgr does a
// RestoreInstanceStateRequest/RestoreInstanceStateResponse
// dance with the corresponding instances to restore the
// state. Completion of which, stmgr sends out the
// RestoreTopologyStateResponse message to the tmaster.
// Once the tmaster receives this from all stmgrs, it sends
// out the StartStmgrStatefulProcessing message to all
// stmgrs which inturn send out StartInstanceStatefulProcessing
// to all their local instances. This gets the topology
// humming
//
// Periodic checkpoints
// Every so often(as dictated by TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL)
// tmaster sends out StartStatefulCheckpoint message to all
// stmgrs. Each stmgr sends out InitiateStatefulCheckpoint
// message to each of its local spout instances. Because of the best effort
// nature, this is delivered as a message. After instance does its
// checkpoint, it sends StoreInstanceStateCheckpoint to stmgr to store
// its checkpoint. Stmgr passes this to ckptmgr via
// SaveInstanceStateRequest/SaveInstanceStateResponse dance with its
// local checkpoint manager. Upon getting a successful
// SaveInstanceStateResponse message, stmgr sends out a
// InstanceStateStored message notifying tmaster that the
// state of a particular instance has been saved. Parallelly
// stmgr also sends out DownstreamStatefulCheckpoint message
// to each of the downstream components to do the distributed
// Lamport style checkpointing. Meanwhile at tmaster, when it
// receives InstanceStateStored from all instances for a particular
// checkpoint_id, it has reached globally consistent checkpoint
// and it stores it in the state manager as StatefulConsistentCheckpoint
//
// Failure Recovery
// Either when stmgr dies, or when an instance dies(which is notified
// by the stmgr using the ResetTopologyState message), tmaster initiates
// the recovery mechanism. The recovery mechanism is exactly the same
// as the initial startup described above
// Message representing information about a globally consistent
// checkpoint
message StatefulConsistentCheckpoint {
required string checkpoint_id = 1;
required string packing_plan_id = 2;
// One can add more meta data about this ckpt later
}
// message stored in the state manager by the tmaster
message StatefulConsistentCheckpoints {
// An ordered list of the globally consistent checkpoints
// that have been snapshotted and can be recovered from.
// 0th element is the most recent one, 1st is the next recent
// and so on
repeated StatefulConsistentCheckpoint consistent_checkpoints = 1;
}
/*
* tmaster <-> stmgr messages
*/
// Message sent to stmgrs by the tmaster to initiate checkpointing
message StartStatefulCheckpoint {
required string checkpoint_id = 1;
}
// Message broadcasted to stmgr (will then be forwarded to instances) after when checkpoint becomes
// "globally consistent"
message StatefulConsistentCheckpointSaved {
required StatefulConsistentCheckpoint consistent_checkpoint = 1;
}
// Message sent by tmaster to stmgr asking them to reset their instances
// to this checkpoint
message RestoreTopologyStateRequest {
required string checkpoint_id = 1;
// Every Attempt made by Tmaster to restore to a globally consistent
// checkpoint is tagged with a unique id. The stmgrs include this in their
// RestoreTopologyStateResponse below. This way tmaster could
// try restoring to the same checkpoint_id multiple times and still
// keep track of for which attempt the restore was in the responses
required int64 restore_txid = 2;
}
// Message that stmgr sends to tmaster after it restores
// all of its local instances to a checkpoint_id
message RestoreTopologyStateResponse {
required heron.proto.system.Status status = 1;
required string checkpoint_id = 2;
required int64 restore_txid = 3;
}
// Message sent by stmgr to tmaster asking it to reset the topology
// to some valid checkpoint. This is sent either if stmgr dies
// and comes back up or if an instance dies.
message ResetTopologyState {
// Was there a dead/recovered stmgr connection that was the reason
// for this request?
optional string dead_stmgr = 1;
// Was there a dead/recovered local instance connection that was the reason
// for this request?
optional int32 dead_taskid = 2;
// This one describes the reason for sending the message.
// Used primarily for debugging
required string reason = 3;
}
// Message sent by stmgr to tmaster informing it about
// the fact that we stored a checkpoint belonging
// to the instance
message InstanceStateStored {
required string checkpoint_id = 1;
required heron.proto.system.Instance instance = 2;
}
// Message sent by tmaster to stmgr to start processing
// For stateful processing, all the topology components
// should be rolled back to a consistent state before they can
// start processing.
message StartStmgrStatefulProcessing {
required string checkpoint_id = 1;
}
/*
* tmaster -> ckptmgr messages
*/
// This is the message that a tmaster sends
// when it wants to register itself
// with checkpoint manager
message RegisterTMasterRequest {
required string topology_name = 1;
required string topology_id = 2;
}
// This is the message that checkpoint manager
// sends when it receives the register request
// from tmaster
message RegisterTMasterResponse {
required heron.proto.system.Status status = 1;
}
// Message sent by tmaster to ckptmgr to cleanup
// old checkpoint state.
message CleanStatefulCheckpointRequest {
// Any checkpoints older than this can be cleaned
optional string oldest_checkpoint_preserved = 1;
optional bool clean_all_checkpoints = 2;
}
// Message sent by ckptmgr to tmaster about the cleanup request
message CleanStatefulCheckpointResponse {
required heron.proto.system.Status status = 1;
repeated string cleaned_checkpoint_ids = 2;
}
/*
* stmgr -> ckptmgr messages
*/
// This message encapsulates the info associated with
// state of an instance/partition. only one of the state_location
// and state should be set
message InstanceStateCheckpoint {
required string checkpoint_id = 1;
optional bytes state = 2;
optional string state_location = 3;
}
// This message encapsulates the info associated with
// checkpoint metadata of a component
message CheckpointComponentMetadata {
required string component_name = 1;
required int32 parallelism = 2;
}
// This is the message that a stmgr sends
// when it wants to register itself with
// the checkpoint manager
message RegisterStMgrRequest {
required string topology_name = 1;
required string topology_id = 2;
required string stmgr_id = 3;
required heron.proto.system.PhysicalPlan physical_plan = 4;
}
// This is the message that checkpoint manager
// sends when it receives the register request
// from stmgr
message RegisterStMgrResponse {
required heron.proto.system.Status status = 1;
}
// This is the request that StMgr sends to Checkpoint Mgr
// to store instance checkpoint data
message SaveInstanceStateRequest {
// Information about the instance whose state this is
required heron.proto.system.Instance instance = 1;
// TODO(nwang): currently repartition is not supported and each request
// contains a single partition.
required InstanceStateCheckpoint checkpoint = 2;
}
// After successfully saving state, this is what
// Checkpoint Mgr sends back as response
message SaveInstanceStateResponse {
required heron.proto.system.Status status = 1;
required heron.proto.system.Instance instance = 2;
required string checkpoint_id = 3;
}
// This is the request that StMgr sends to Checkpoint Mgr
// to fetch the instance state data
message GetInstanceStateRequest {
required heron.proto.system.Instance instance = 1;
required string checkpoint_id = 2;
}
// This is the response that Checkpoint Mgr sends to StMgr
// for its get instance state request
message GetInstanceStateResponse {
required heron.proto.system.Status status = 1;
required heron.proto.system.Instance instance = 2;
required string checkpoint_id = 3;
optional InstanceStateCheckpoint checkpoint = 4;
}
/*
* stmgr -> Instance messages
*/
// This is the message that the stmgr sends to its
// local tasks to begin initiating checkpoint
message InitiateStatefulCheckpoint {
required string checkpoint_id = 1;
}
// This is the message that the instance sends
// when it wants stmgr to store its state
message StoreInstanceStateCheckpoint {
required InstanceStateCheckpoint state = 1;
}
// This is the message that stmgr sends to its instance
// asking them to restore their state
message RestoreInstanceStateRequest {
required InstanceStateCheckpoint state = 1;
}
// This is the message that instance sends to its stmgr
// when done with restoring the state
message RestoreInstanceStateResponse {
required heron.proto.system.Status status = 1;
required string checkpoint_id = 2;
}
// Message sent by stmgr to Instance to start processing
// This is related to StartStmgrStatefulProcessing message above
// but between stmgr -> instance.
message StartInstanceStatefulProcessing {
required string checkpoint_id = 1;
}
/*
* stmgr -> stmgr messages
*/
// Once stmgr receives StoreInstanceStateCheckpoint from an instance,
// it sends this message to all of that instance's
// downstream components' stmgrs to propagate the checkpoint marker
message DownstreamStatefulCheckpoint {
required int32 origin_task_id = 1;
required int32 destination_task_id = 2;
required string checkpoint_id = 3;
}