| /* |
| * Copyright 2015 Twitter, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "manager/checkpoint-gateway.h" |
| #include <functional> |
| #include <iostream> |
| #include <deque> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <vector> |
| #include "util/neighbour-calculator.h" |
| #include "metrics/metrics.h" |
| #include "proto/messages.h" |
| #include "basics/basics.h" |
| #include "errors/errors.h" |
| #include "threads/threads.h" |
| #include "network/network.h" |
| |
| namespace heron { |
| namespace stmgr { |
| |
| CheckpointGateway::CheckpointGateway(sp_uint64 _drain_threshold, |
| NeighbourCalculator* _neighbour_calculator, |
| common::MetricsMgrSt* _metrics_manager_client, |
| std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _tupleset_drainer, |
| std::function<void(proto::stmgr::TupleStreamMessage*)> _tuplestream_drainer, |
| std::function<void(sp_int32, proto::ckptmgr::InitiateStatefulCheckpoint*)> _ckpt_drainer) |
| : drain_threshold_(_drain_threshold), current_size_(0), |
| neighbour_calculator_(_neighbour_calculator), |
| metrics_manager_client_(_metrics_manager_client), tupleset_drainer_(_tupleset_drainer), |
| tuplestream_drainer_(_tuplestream_drainer), ckpt_drainer_(_ckpt_drainer) { |
| size_metric_ = new common::AssignableMetric(current_size_); |
| metrics_manager_client_->register_metric("__stateful_gateway_size", size_metric_); |
| } |
| |
| CheckpointGateway::~CheckpointGateway() { |
| for (auto kv : pending_tuples_) { |
| delete kv.second; |
| } |
| metrics_manager_client_->unregister_metric("__stateful_gateway_size"); |
| delete size_metric_; |
| } |
| |
| void CheckpointGateway::SendToInstance(sp_int32 _task_id, |
| proto::system::HeronTupleSet2* _message) { |
| if (current_size_ > drain_threshold_) { |
| ForceDrain(); |
| } |
| CheckpointInfo* info = get_info(_task_id); |
| sp_uint64 size = _message->GetCachedSize(); |
| _message = info->SendToInstance(_message, size); |
| if (!_message) { |
| current_size_ += size; |
| } else { |
| tupleset_drainer_(_task_id, _message); |
| } |
| size_metric_->SetValue(current_size_); |
| } |
| |
| void CheckpointGateway::SendToInstance(proto::stmgr::TupleStreamMessage* _message) { |
| if (current_size_ > drain_threshold_) { |
| ForceDrain(); |
| } |
| sp_int32 task_id = _message->task_id(); |
| sp_uint64 size = _message->set().size(); |
| CheckpointInfo* info = get_info(task_id); |
| _message = info->SendToInstance(_message, size); |
| if (!_message) { |
| current_size_ += size; |
| } else { |
| tuplestream_drainer_(_message); |
| } |
| size_metric_->SetValue(current_size_); |
| } |
| |
| void CheckpointGateway::HandleUpstreamMarker(sp_int32 _src_task_id, sp_int32 _destination_task_id, |
| const sp_string& _checkpoint_id) { |
| LOG(INFO) << "Got checkpoint marker for triplet " |
| << _checkpoint_id << " " << _src_task_id << " " << _destination_task_id; |
| CheckpointInfo* info = get_info(_destination_task_id); |
| sp_uint64 size = 0; |
| std::deque<Tuple> tuples = info->HandleUpstreamMarker(_src_task_id, _checkpoint_id, &size); |
| for (auto tupl : tuples) { |
| DrainTuple(_destination_task_id, tupl); |
| } |
| current_size_ -= size; |
| size_metric_->SetValue(current_size_); |
| } |
| |
| void CheckpointGateway::DrainTuple(sp_int32 _dest, Tuple& _tuple) { |
| if (std::get<0>(_tuple)) { |
| tupleset_drainer_(_dest, std::get<0>(_tuple)); |
| } else if (std::get<1>(_tuple)) { |
| tuplestream_drainer_(std::get<1>(_tuple)); |
| } else { |
| ckpt_drainer_(_dest, std::get<2>(_tuple)); |
| } |
| } |
| |
| void CheckpointGateway::ForceDrain() { |
| for (auto kv : pending_tuples_) { |
| std::deque<Tuple> tuples = kv.second->ForceDrain(); |
| for (auto tupl : tuples) { |
| DrainTuple(kv.first, tupl); |
| } |
| } |
| current_size_ = 0; |
| size_metric_->SetValue(current_size_); |
| } |
| |
| CheckpointGateway::CheckpointInfo* |
| CheckpointGateway::get_info(sp_int32 _task_id) { |
| auto iter = pending_tuples_.find(_task_id); |
| if (iter == pending_tuples_.end()) { |
| CheckpointInfo* info = |
| new CheckpointInfo(_task_id, neighbour_calculator_->get_upstreamers(_task_id)); |
| pending_tuples_[_task_id] = info; |
| return info; |
| } else { |
| return iter->second; |
| } |
| } |
| |
| void CheckpointGateway::Clear() { |
| for (auto kv : pending_tuples_) { |
| kv.second->Clear(); |
| delete kv.second; |
| } |
| pending_tuples_.clear(); |
| current_size_ = 0; |
| size_metric_->SetValue(current_size_); |
| } |
| |
| CheckpointGateway::CheckpointInfo::CheckpointInfo(sp_int32 _this_task_id, |
| const std::unordered_set<sp_int32>& _all_upstream_dependencies) { |
| checkpoint_id_ = ""; |
| all_upstream_dependencies_ = _all_upstream_dependencies; |
| pending_upstream_dependencies_ = all_upstream_dependencies_; |
| current_size_ = 0; |
| this_task_id_ = _this_task_id; |
| } |
| |
| CheckpointGateway::CheckpointInfo::~CheckpointInfo() { |
| CHECK(pending_tuples_.empty()); |
| CHECK_EQ(current_size_, 0); |
| } |
| |
| proto::system::HeronTupleSet2* |
| CheckpointGateway::CheckpointInfo::SendToInstance(proto::system::HeronTupleSet2* _tuple, |
| sp_uint64 _size) { |
| if (checkpoint_id_.empty()) { |
| return _tuple; |
| } else { |
| if (pending_upstream_dependencies_.find(_tuple->src_task_id()) != |
| pending_upstream_dependencies_.end()) { |
| // This means that we still are expecting a checkpoint marker from this src task id |
| return _tuple; |
| } else { |
| add(std::make_tuple(_tuple, (proto::stmgr::TupleStreamMessage*)nullptr, |
| (proto::ckptmgr::InitiateStatefulCheckpoint*)nullptr), _size); |
| return nullptr; |
| } |
| } |
| } |
| |
| proto::stmgr::TupleStreamMessage* |
| CheckpointGateway::CheckpointInfo::SendToInstance(proto::stmgr::TupleStreamMessage* _tuple, |
| sp_uint64 _size) { |
| if (checkpoint_id_.empty()) { |
| return _tuple; |
| } else { |
| if (pending_upstream_dependencies_.find(_tuple->src_task_id()) != |
| pending_upstream_dependencies_.end()) { |
| // This means that we still are expecting a checkpoint marker from this src task id |
| return _tuple; |
| } else { |
| add(std::make_tuple((proto::system::HeronTupleSet2*)nullptr, _tuple, |
| (proto::ckptmgr::InitiateStatefulCheckpoint*)nullptr), _size); |
| return nullptr; |
| } |
| } |
| } |
| |
| std::deque<CheckpointGateway::Tuple> |
| CheckpointGateway::CheckpointInfo::HandleUpstreamMarker(sp_int32 _src_task_id, |
| const sp_string& _checkpoint_id, |
| sp_uint64* _size) { |
| if (_checkpoint_id == checkpoint_id_) { |
| pending_upstream_dependencies_.erase(_src_task_id); |
| } else if (checkpoint_id_.empty()) { |
| LOG(INFO) << "TaskId: " << this_task_id_ |
| << " Seeing the checkpoint marker " << _checkpoint_id |
| << " for the first time"; |
| checkpoint_id_ = _checkpoint_id; |
| pending_upstream_dependencies_.erase(_src_task_id); |
| } else if (_checkpoint_id > checkpoint_id_) { |
| LOG(INFO) << "TaskId: " << this_task_id_ |
| << " Seeing the checkpoint marker " << _checkpoint_id |
| << " while we were already amidst " << checkpoint_id_ |
| << " ..resetting"; |
| checkpoint_id_ = _checkpoint_id; |
| pending_upstream_dependencies_ = all_upstream_dependencies_; |
| pending_upstream_dependencies_.erase(_src_task_id); |
| } else { |
| LOG(WARNING) << "TaskId: " << this_task_id_ |
| << " Discarding older checkpoint_id message " |
| << _checkpoint_id << " from upstream task " |
| << _src_task_id; |
| } |
| if (pending_upstream_dependencies_.empty()) { |
| LOG(INFO) << "TaskId: " << this_task_id_ |
| << " All checkpoint markers received for checkpoint " |
| << _checkpoint_id; |
| // We need to add Initiate Checkpoint message before the current set |
| auto message = new proto::ckptmgr::InitiateStatefulCheckpoint(); |
| message->set_checkpoint_id(_checkpoint_id); |
| add_front(std::make_tuple((proto::system::HeronTupleSet2*)nullptr, |
| (proto::stmgr::TupleStreamMessage*)nullptr, message), |
| message->GetCachedSize()); |
| return ForceDrain(); |
| } else { |
| std::deque<Tuple> dummy; |
| return dummy; |
| } |
| } |
| |
| std::deque<CheckpointGateway::Tuple> |
| CheckpointGateway::CheckpointInfo::ForceDrain() { |
| checkpoint_id_ = ""; |
| current_size_ = 0; |
| std::deque<Tuple> tmp = pending_tuples_; |
| pending_tuples_.clear(); |
| pending_upstream_dependencies_ = all_upstream_dependencies_; |
| return tmp; |
| } |
| |
| void CheckpointGateway::CheckpointInfo::add(Tuple _tuple, sp_uint64 _size) { |
| pending_tuples_.push_back(_tuple); |
| current_size_ += _size; |
| } |
| |
| void CheckpointGateway::CheckpointInfo::add_front(Tuple _tuple, sp_uint64 _size) { |
| pending_tuples_.push_front(_tuple); |
| current_size_ += _size; |
| } |
| |
| void CheckpointGateway::CheckpointInfo::Clear() { |
| for (auto tupl : pending_tuples_) { |
| if (std::get<0>(tupl)) { |
| __global_protobuf_pool_release__(std::get<0>(tupl)); |
| } else if (std::get<1>(tupl)) { |
| __global_protobuf_pool_release__(std::get<1>(tupl)); |
| } else { |
| __global_protobuf_pool_release__(std::get<2>(tupl)); |
| } |
| } |
| pending_tuples_.clear(); |
| current_size_ = 0; |
| checkpoint_id_ = ""; |
| } |
| } // namespace stmgr |
| } // namespace heron |