blob: b5dd45ddfb7b1c8362173de2764e67c105ec0bb1 [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_
#define SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_
#include <deque>
#include <vector>
#include <map>
#include "proto/messages.h"
#include "basics/basics.h"
#include "network/network.h"
namespace heron {
namespace stmgr {
class StMgr;
class TupleCache {
public:
TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold);
virtual ~TupleCache();
template <class T>
void RegisterDrainer(void (T::*method)(sp_int32, proto::system::HeronTupleSet2*), T* _t) {
drainer_ = std::bind(method, _t, std::placeholders::_1, std::placeholders::_2);
}
// returns tuple key
sp_int64 add_data_tuple(sp_int32 _task_id, const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple);
void add_ack_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple);
void add_fail_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple);
void add_emit_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple);
private:
void drain(EventLoop::Status);
void drain_impl();
class TupleList {
// not accessible to anyone else
friend class TupleCache;
private:
TupleList();
~TupleList();
sp_int64 add_data_tuple(const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple, sp_uint64* total_size_,
sp_uint64* _tuples_cache_max_tuple_size);
void add_ack_tuple(const proto::system::AckTuple& _tuple, sp_uint64* total_size_);
void add_fail_tuple(const proto::system::AckTuple& _tuple, sp_uint64* total_size_);
void add_emit_tuple(const proto::system::AckTuple& _tuple, sp_uint64* total_size_);
void drain(sp_int32 _task_id,
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _drainer);
proto::system::HeronTupleSet2* acquire_clean_set() {
proto::system::HeronTupleSet2* set = nullptr;
set = __global_protobuf_pool_acquire__(set);
set->Clear();
return set;
}
private:
std::deque<proto::system::HeronTupleSet2*> tuples_;
proto::system::HeronTupleSet2* current_;
sp_uint64 current_size_;
sp_int32 last_drained_count_;
};
TupleList* get(sp_int32 _task_id);
// map from task_id to the TupleList
std::unordered_map<sp_int32, TupleList*> cache_;
EventLoop* eventLoop_;
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> drainer_;
sp_uint64 total_size_;
sp_uint32 drain_threshold_bytes_;
// Configs to be read
sp_int32 cache_drain_frequency_ms_;
sp_uint64 tuples_cache_max_tuple_size_;
};
} // namespace stmgr
} // namespace heron
#endif // SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_