| /* |
| * Copyright 2015 Twitter, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "manager/stateful-controller.h" |
| #include <iostream> |
| #include <sstream> |
| #include <chrono> |
| #include <string> |
| #include <vector> |
| #include "manager/stateful-checkpointer.h" |
| #include "manager/stateful-restorer.h" |
| #include "manager/tmaster.h" |
| #include "manager/stmgrstate.h" |
| #include "metrics/metrics.h" |
| #include "basics/basics.h" |
| #include "errors/errors.h" |
| |
| namespace heron { |
| namespace tmaster { |
| |
| const sp_string METRIC_RESTORE_START = "__restore_start"; |
| const sp_string METRIC_RESTORE_STMGR_RESPONSE = "__restore_stmgr_response"; |
| const sp_string METRIC_RESTORE_STMGR_RESPONSE_IGNORED = "__restore_stmgr_response_ignored"; |
| const sp_string METRIC_RESTORE_STMGR_RESPONSE_NOTOK = "__restore_stmgr_response_notok"; |
| const sp_string METRIC_CKPTMARKER_REQUESTS_SENT = "__ckptmarker_requests_sent"; |
| const sp_string METRIC_CKPTMARKER_REQUESTS_NOTSENT = "__ckptmarker_requests_notsent"; |
| const sp_string METRIC_INSTANCE_CKPT_SAVED = "__instance_ckpt_saved"; |
| const sp_string METRIC_INSTANCE_CKPT_SAVED_IGNORED = "__instance_ckpt_saved_ignored"; |
| const sp_string METRIC_GLOBAL_CONSISTENT_CKPT = "__globally_consistent_ckpt"; |
| |
| |
| // TODO(nlu): make this number from config |
| const int32_t MOST_CHECKPOINTS_NUMBER = 5; |
| |
| StatefulController::StatefulController(const std::string& _topology_name, |
| proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt, |
| heron::common::HeronStateMgr* _state_mgr, |
| std::chrono::high_resolution_clock::time_point _tmaster_start_time, |
| common::MetricsMgrSt* _metrics_manager_client, |
| std::function<void(std::string)> _ckpt_save_watcher) |
| : topology_name_(_topology_name), ckpt_record_(_ckpt), state_mgr_(_state_mgr), |
| metrics_manager_client_(_metrics_manager_client) { |
| checkpointer_ = new StatefulCheckpointer(_tmaster_start_time); |
| restorer_ = new StatefulRestorer(); |
| count_metrics_ = new common::MultiCountMetric(); |
| metrics_manager_client_->register_metric("__stateful_controller", count_metrics_); |
| ckpt_save_watcher_ = _ckpt_save_watcher; |
| } |
| |
| StatefulController::~StatefulController() { |
| delete ckpt_record_; |
| delete checkpointer_; |
| delete restorer_; |
| metrics_manager_client_->unregister_metric("__stateful_controller"); |
| delete count_metrics_; |
| } |
| |
| void StatefulController::StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_state) { |
| count_metrics_->scope(METRIC_RESTORE_START)->incr(); |
| // TODO(sanjeev): Do we really need to start from most_recent_checkpoint? |
| if (_ignore_prev_state) { |
| restorer_->StartRestore("", _stmgrs); |
| } else { |
| restorer_->StartRestore(ckpt_record_->consistent_checkpoints(0).checkpoint_id(), _stmgrs); |
| } |
| } |
| |
| void StatefulController::HandleStMgrRestored(const std::string& _stmgr_id, |
| const std::string& _checkpoint_id, |
| int64_t _restore_txid, |
| proto::system::StatusCode _status, |
| const StMgrMap& _stmgrs) { |
| count_metrics_->scope(METRIC_RESTORE_STMGR_RESPONSE)->incr(); |
| if (!restorer_->IsInProgress()) { |
| LOG(WARNING) << "Got a Restored Topology State from stmgr " |
| << _stmgr_id << " for checkpoint " << _checkpoint_id |
| << " with txid " << _restore_txid << " when " |
| << " we are not in restore"; |
| count_metrics_->scope(METRIC_RESTORE_STMGR_RESPONSE_IGNORED)->incr(); |
| return; |
| } else if (restorer_->GetRestoreTxid() != _restore_txid || |
| restorer_->GetCheckpointIdInProgress() != _checkpoint_id) { |
| LOG(WARNING) << "Got a Restored Topology State from stmgr " |
| << _stmgr_id << " for checkpoint " << _checkpoint_id |
| << " with txid " << _restore_txid << " when " |
| << " we are in progress with checkpoint " |
| << restorer_->GetCheckpointIdInProgress() << " and txid " |
| << restorer_->GetRestoreTxid(); |
| count_metrics_->scope(METRIC_RESTORE_STMGR_RESPONSE_IGNORED)->incr(); |
| return; |
| } else if (_status != proto::system::OK) { |
| LOG(INFO) << "Got a Cannot Restore Topology State from stmgr " |
| << _stmgr_id << " for checkpoint " << _checkpoint_id |
| << " with txid " << _restore_txid << " because of " |
| << _status; |
| const std::string& new_ckpt_id = GetNextInLineCheckpointId(_checkpoint_id); |
| if (new_ckpt_id.empty()) { |
| LOG(INFO) << "Next viable checkpoint id is empty"; |
| } |
| count_metrics_->scope(METRIC_RESTORE_STMGR_RESPONSE_NOTOK)->incr(); |
| restorer_->StartRestore(new_ckpt_id, _stmgrs); |
| } else { |
| LOG(INFO) << "Got a Restored Topology State from stmgr " |
| << _stmgr_id << " for checkpoint " << _checkpoint_id |
| << " with txid " << _restore_txid; |
| restorer_->HandleStMgrRestored(_stmgr_id, _checkpoint_id, _restore_txid, _stmgrs); |
| } |
| } |
| |
| void StatefulController::RegisterNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) { |
| checkpointer_->RegisterNewPhysicalPlan(_pplan); |
| } |
| |
| void StatefulController::StartCheckpoint(const StMgrMap& _stmgrs) { |
| if (restorer_->IsInProgress()) { |
| LOG(INFO) << "Will not send checkpoint messages to stmgr because " |
| << "we are in restore"; |
| count_metrics_->scope(METRIC_CKPTMARKER_REQUESTS_NOTSENT)->incr(); |
| return; |
| } |
| count_metrics_->scope(METRIC_CKPTMARKER_REQUESTS_SENT)->incr(); |
| checkpointer_->StartCheckpoint(_stmgrs); |
| } |
| |
| void StatefulController::HandleInstanceStateStored(const std::string& _checkpoint_id, |
| const std::string& _packing_plan_id, |
| const proto::system::Instance& _instance) { |
| count_metrics_->scope(METRIC_INSTANCE_CKPT_SAVED)->incr(); |
| if (restorer_->IsInProgress()) { |
| LOG(INFO) << "Ignoring the Instance State because we are in Restore"; |
| count_metrics_->scope(METRIC_INSTANCE_CKPT_SAVED_IGNORED)->incr(); |
| return; |
| } |
| if (checkpointer_->HandleInstanceStateStored(_checkpoint_id, _instance)) { |
| // This is now a globally consistent checkpoint |
| count_metrics_->scope(METRIC_GLOBAL_CONSISTENT_CKPT)->incr(); |
| auto new_ckpt_record = AddNewConsistentCheckpoint(_checkpoint_id, _packing_plan_id); |
| state_mgr_->SetStatefulCheckpoints(topology_name_, *new_ckpt_record, |
| std::bind(&StatefulController::HandleCheckpointSave, this, |
| new_ckpt_record, std::placeholders::_1)); |
| } |
| } |
| |
| void StatefulController::HandleCheckpointSave( |
| proto::ckptmgr::StatefulConsistentCheckpoints* _new_ckpt, |
| proto::system::StatusCode _status) { |
| if (_status == proto::system::OK) { |
| LOG(INFO) << "Successfully saved " << _new_ckpt->consistent_checkpoints(0).checkpoint_id() |
| << " as the new globally consistent checkpoint"; |
| delete ckpt_record_; |
| ckpt_record_ = _new_ckpt; |
| std::string oldest_ckpt = |
| ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size() - 1) |
| .checkpoint_id(); |
| ckpt_save_watcher_(oldest_ckpt); |
| } else { |
| LOG(ERROR) << "Error saving " << _new_ckpt->consistent_checkpoints(0).checkpoint_id() |
| << " as the new globally consistent checkpoint " |
| << _status; |
| delete _new_ckpt; |
| } |
| } |
| |
| const std::string& StatefulController::GetNextInLineCheckpointId(const std::string& _ckpt_id) { |
| if (_ckpt_id.empty()) { |
| // There cannot be any checkpoints that are older than empty checkpoint |
| LOG(FATAL) << "Could not recover even from the empty state"; |
| } |
| for (int32_t i = 0; i < ckpt_record_->consistent_checkpoints_size(); ++i) { |
| if (ckpt_record_->consistent_checkpoints(i).checkpoint_id() == _ckpt_id) { |
| if (i < ckpt_record_->consistent_checkpoints_size() - 1) { |
| return ckpt_record_->consistent_checkpoints(i + 1).checkpoint_id(); |
| } else { |
| return EMPTY_STRING; |
| } |
| } |
| } |
| return EMPTY_STRING; |
| } |
| |
| proto::ckptmgr::StatefulConsistentCheckpoints* |
| StatefulController::AddNewConsistentCheckpoint(const std::string& _new_checkpoint, |
| const std::string& _packing_plan) { |
| auto new_record = new proto::ckptmgr::StatefulConsistentCheckpoints(); |
| auto new_consistent_checkpoint = new_record->add_consistent_checkpoints(); |
| new_consistent_checkpoint->set_checkpoint_id(_new_checkpoint); |
| new_consistent_checkpoint->set_packing_plan_id(_packing_plan); |
| for (int32_t i = 0; i < ckpt_record_->consistent_checkpoints_size() && |
| new_record->consistent_checkpoints_size() < MOST_CHECKPOINTS_NUMBER; ++i) { |
| new_consistent_checkpoint = new_record->add_consistent_checkpoints(); |
| new_consistent_checkpoint->set_checkpoint_id( |
| ckpt_record_->consistent_checkpoints(i).checkpoint_id()); |
| new_consistent_checkpoint->set_packing_plan_id( |
| ckpt_record_->consistent_checkpoints(i).packing_plan_id()); |
| } |
| |
| return new_record; |
| } |
| |
| bool StatefulController::GotRestoreResponse(const std::string& _stmgr) const { |
| CHECK(restorer_->IsInProgress()); |
| return restorer_->GotResponse(_stmgr); |
| } |
| |
| bool StatefulController::RestoreInProgress() const { |
| return restorer_->IsInProgress(); |
| } |
| } // namespace tmaster |
| } // namespace heron |