|  | /** | 
|  | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | * or more contributor license agreements.  See the NOTICE file | 
|  | * distributed with this work for additional information | 
|  | * regarding copyright ownership.  The ASF licenses this file | 
|  | * to you under the Apache License, Version 2.0 (the | 
|  | * "License"); you may not use this file except in compliance | 
|  | * with the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, | 
|  | * software distributed under the License is distributed on an | 
|  | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | * KIND, either express or implied.  See the License for the | 
|  | * specific language governing permissions and limitations | 
|  | * under the License. | 
|  | */ | 
|  |  | 
|  | #ifndef SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_ | 
|  | #define SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_ | 
|  |  | 
|  | #include <tsl/hopscotch_map.h> | 
|  | #include <deque> | 
|  | #include <vector> | 
|  | #include <map> | 
|  | #include "proto/messages.h" | 
|  | #include "basics/basics.h" | 
|  | #include "network/network.h" | 
|  |  | 
|  | namespace heron { | 
|  | namespace stmgr { | 
|  |  | 
|  | class StMgr; | 
|  |  | 
|  | class TupleCache { | 
|  | public: | 
|  | TupleCache(std::shared_ptr<EventLoop> eventLoop, sp_uint32 _drain_threshold); | 
|  | virtual ~TupleCache(); | 
|  |  | 
|  | template <class T> | 
|  | void RegisterDrainer(void (T::*method)(sp_int32, proto::system::HeronTupleSet2*), T* _t) { | 
|  | tuple_drainer_ = std::bind(method, _t, std::placeholders::_1, std::placeholders::_2); | 
|  | } | 
|  |  | 
|  | template <class T> | 
|  | void RegisterCheckpointDrainer(void (T::*method)(sp_int32, | 
|  | proto::ckptmgr::DownstreamStatefulCheckpoint*), T* _t) { | 
|  | checkpoint_drainer_ = std::bind(method, _t, std::placeholders::_1, std::placeholders::_2); | 
|  | } | 
|  |  | 
|  | // returns tuple key | 
|  | sp_int64 add_data_tuple(sp_int32 _src_task_id, | 
|  | sp_int32 _task_id, const proto::api::StreamId& _streamid, | 
|  | proto::system::HeronDataTuple* _tuple); | 
|  | void add_ack_tuple(sp_int32 _src_task_id, | 
|  | sp_int32 _task_id, const proto::system::AckTuple& _tuple); | 
|  | void add_fail_tuple(sp_int32 _src_task_id, | 
|  | sp_int32 _task_id, const proto::system::AckTuple& _tuple); | 
|  | void add_emit_tuple(sp_int32 _src_task_id, | 
|  | sp_int32 _task_id, const proto::system::AckTuple& _tuple); | 
|  | void add_checkpoint_tuple(sp_int32 _task_id, | 
|  | proto::ckptmgr::DownstreamStatefulCheckpoint* _message); | 
|  |  | 
|  | // Clear all data of all task_ids | 
|  | // This is different from the drain because while drain clears the messages | 
|  | // calling the drainer functions, this one just deletes the messages. | 
|  | virtual void clear(); | 
|  |  | 
|  | private: | 
|  | void drain(EventLoop::Status); | 
|  | void drain_impl(); | 
|  |  | 
|  | class TupleList { | 
|  | // not accessible to anyone else | 
|  | friend class TupleCache; | 
|  |  | 
|  | private: | 
|  | TupleList(); | 
|  | ~TupleList(); | 
|  |  | 
|  | sp_int64 add_data_tuple(sp_int32 _src_task_id, | 
|  | const proto::api::StreamId& _streamid, | 
|  | proto::system::HeronDataTuple* _tuple, sp_uint64* total_size_, | 
|  | sp_uint64* _tuples_cache_max_tuple_size); | 
|  | void add_ack_tuple(sp_int32 _src_task_id, | 
|  | const proto::system::AckTuple& _tuple, sp_uint64* total_size_); | 
|  | void add_fail_tuple(sp_int32 _src_task_id, | 
|  | const proto::system::AckTuple& _tuple, sp_uint64* total_size_); | 
|  | void add_emit_tuple(sp_int32 _src_task_id, | 
|  | const proto::system::AckTuple& _tuple, sp_uint64* total_size_); | 
|  | void add_checkpoint_tuple(proto::ckptmgr::DownstreamStatefulCheckpoint* _message, | 
|  | sp_uint64* total_size_); | 
|  |  | 
|  | void drain(sp_int32 _task_id, | 
|  | std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _tuple_drainer, | 
|  | std::function<void(sp_int32, | 
|  | proto::ckptmgr::DownstreamStatefulCheckpoint*)> _checkpoint_drainer); | 
|  |  | 
|  | proto::system::HeronTupleSet2* acquire_clean_set() { | 
|  | proto::system::HeronTupleSet2* set = nullptr; | 
|  | set = __global_protobuf_pool_acquire__(set); | 
|  | return set; | 
|  | } | 
|  |  | 
|  | void clear(); | 
|  |  | 
|  | private: | 
|  | std::deque<google::protobuf::Message*> tuples_; | 
|  | proto::system::HeronTupleSet2* current_; | 
|  | sp_uint64 current_size_; | 
|  | sp_int32 last_drained_count_; | 
|  | }; | 
|  |  | 
|  | TupleList* get(sp_int32 _task_id); | 
|  |  | 
|  | // map from task_id to the TupleList | 
|  | tsl::hopscotch_map<sp_int32, TupleList*> cache_; | 
|  | std::shared_ptr<EventLoop> eventLoop_; | 
|  | std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tuple_drainer_; | 
|  | std::function<void(sp_int32, proto::ckptmgr::DownstreamStatefulCheckpoint*)> | 
|  | checkpoint_drainer_; | 
|  | sp_uint64 total_size_; | 
|  | sp_uint32 drain_threshold_bytes_; | 
|  |  | 
|  | // Configs to be read | 
|  | sp_int32 cache_drain_frequency_ms_; | 
|  | sp_uint64 tuples_cache_max_tuple_size_; | 
|  | }; | 
|  |  | 
|  | }  // namespace stmgr | 
|  | }  // namespace heron | 
|  |  | 
|  | #endif  // SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_ |