blob: d25ae9cbea93dd5a3d144afae13ebb6f761ff98c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_
#define SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_
#include <tsl/hopscotch_map.h>
#include <deque>
#include <vector>
#include <map>
#include "proto/messages.h"
#include "basics/basics.h"
#include "network/network.h"
namespace heron {
namespace stmgr {
class StMgr;
class TupleCache {
public:
TupleCache(std::shared_ptr<EventLoop> eventLoop, sp_uint32 _drain_threshold);
virtual ~TupleCache();
template <class T>
void RegisterDrainer(void (T::*method)(sp_int32, proto::system::HeronTupleSet2*), T* _t) {
tuple_drainer_ = std::bind(method, _t, std::placeholders::_1, std::placeholders::_2);
}
template <class T>
void RegisterCheckpointDrainer(void (T::*method)(sp_int32,
proto::ckptmgr::DownstreamStatefulCheckpoint*), T* _t) {
checkpoint_drainer_ = std::bind(method, _t, std::placeholders::_1, std::placeholders::_2);
}
// returns tuple key
sp_int64 add_data_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple);
void add_ack_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::AckTuple& _tuple);
void add_fail_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::AckTuple& _tuple);
void add_emit_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::AckTuple& _tuple);
void add_checkpoint_tuple(sp_int32 _task_id,
proto::ckptmgr::DownstreamStatefulCheckpoint* _message);
// Clear all data of all task_ids
// This is different from the drain because while drain clears the messages
// calling the drainer functions, this one just deletes the messages.
virtual void clear();
private:
void drain(EventLoop::Status);
void drain_impl();
class TupleList {
// not accessible to anyone else
friend class TupleCache;
private:
TupleList();
~TupleList();
sp_int64 add_data_tuple(sp_int32 _src_task_id,
const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple, sp_uint64* total_size_,
sp_uint64* _tuples_cache_max_tuple_size);
void add_ack_tuple(sp_int32 _src_task_id,
const proto::system::AckTuple& _tuple, sp_uint64* total_size_);
void add_fail_tuple(sp_int32 _src_task_id,
const proto::system::AckTuple& _tuple, sp_uint64* total_size_);
void add_emit_tuple(sp_int32 _src_task_id,
const proto::system::AckTuple& _tuple, sp_uint64* total_size_);
void add_checkpoint_tuple(proto::ckptmgr::DownstreamStatefulCheckpoint* _message,
sp_uint64* total_size_);
void drain(sp_int32 _task_id,
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _tuple_drainer,
std::function<void(sp_int32,
proto::ckptmgr::DownstreamStatefulCheckpoint*)> _checkpoint_drainer);
proto::system::HeronTupleSet2* acquire_clean_set() {
proto::system::HeronTupleSet2* set = nullptr;
set = __global_protobuf_pool_acquire__(set);
return set;
}
void clear();
private:
std::deque<google::protobuf::Message*> tuples_;
proto::system::HeronTupleSet2* current_;
sp_uint64 current_size_;
sp_int32 last_drained_count_;
};
TupleList* get(sp_int32 _task_id);
// map from task_id to the TupleList
tsl::hopscotch_map<sp_int32, TupleList*> cache_;
std::shared_ptr<EventLoop> eventLoop_;
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tuple_drainer_;
std::function<void(sp_int32, proto::ckptmgr::DownstreamStatefulCheckpoint*)>
checkpoint_drainer_;
sp_uint64 total_size_;
sp_uint32 drain_threshold_bytes_;
// Configs to be read
sp_int32 cache_drain_frequency_ms_;
sp_uint64 tuples_cache_max_tuple_size_;
};
} // namespace stmgr
} // namespace heron
#endif // SRC_CPP_SVCS_STMGR_SRC_UTIL_TUPLE_CACHE_H_