blob: 28b7415d52214dd8bf5171db8f8d2beff8debb3c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "util/tuple-cache.h"
#include <iostream>
#include <map>
#include <string>
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
namespace heron {
namespace stmgr {
TupleCache::TupleCache(std::shared_ptr<EventLoop> eventLoop, sp_uint32 _drain_threshold)
: eventLoop_(eventLoop), drain_threshold_bytes_(_drain_threshold) {
cache_drain_frequency_ms_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrCacheDrainFrequencyMs();
// leave something for the headers
tuples_cache_max_tuple_size_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrNetworkOptionsMaximumPacketMb()
* 1_MB - 1024;
total_size_ = 0;
auto drain_cb = [this](EventLoop::Status status) { this->drain(status); };
eventLoop_->registerTimer(std::move(drain_cb), true, cache_drain_frequency_ms_ * 1000);
}
TupleCache::~TupleCache() {
// Clear the cache first
clear();
for (auto iter = cache_.begin(); iter != cache_.end(); ++iter) {
delete iter->second;
}
}
sp_int64 TupleCache::add_data_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_data_tuple(_src_task_id, _streamid, _tuple, &total_size_,
&tuples_cache_max_tuple_size_);
}
void TupleCache::add_ack_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_ack_tuple(_src_task_id, _tuple, &total_size_);
}
void TupleCache::add_fail_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_fail_tuple(_src_task_id, _tuple, &total_size_);
}
void TupleCache::add_emit_tuple(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_emit_tuple(_src_task_id, _tuple, &total_size_);
}
void TupleCache::add_checkpoint_tuple(sp_int32 _task_id,
proto::ckptmgr::DownstreamStatefulCheckpoint* _message) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_checkpoint_tuple(_message, &total_size_);
}
TupleCache::TupleList* TupleCache::get(sp_int32 _task_id) {
TupleList* l = NULL;
auto iter = cache_.find(_task_id);
if (iter == cache_.end()) {
l = new TupleList();
cache_[_task_id] = l;
} else {
l = iter->second;
}
return l;
}
void TupleCache::clear() {
for (auto kv : cache_) {
kv.second->clear();
delete kv.second;
}
cache_.clear();
total_size_ = 0;
}
void TupleCache::drain(EventLoop::Status) { drain_impl(); }
void TupleCache::drain_impl() {
for (auto iter = cache_.begin(); iter != cache_.end(); ++iter) {
iter->second->drain(iter->first, tuple_drainer_, checkpoint_drainer_);
}
total_size_ = 0;
}
TupleCache::TupleList::TupleList() {
current_ = NULL;
current_size_ = 0;
last_drained_count_ = 0;
}
TupleCache::TupleList::~TupleList() {
CHECK(tuples_.empty());
CHECK(!current_);
}
void TupleCache::TupleList::clear() {
if (current_) {
__global_protobuf_pool_release__(current_);
current_ = NULL;
}
while (!tuples_.empty()) {
__global_protobuf_pool_release__(tuples_.front());
tuples_.pop_front();
}
current_size_ = 0;
last_drained_count_ = 0;
}
sp_int64 TupleCache::TupleList::add_data_tuple(sp_int32 _src_task_id,
const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple,
sp_uint64* _total_size,
sp_uint64* _tuples_cache_max_tuple_size) {
if (!current_ || current_->has_control() || current_->src_task_id() != _src_task_id ||
current_->data().stream().id() != _streamid.id() ||
current_->data().stream().component_name() != _streamid.component_name() ||
current_size_ > *_tuples_cache_max_tuple_size) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_->mutable_data()->mutable_stream()->MergeFrom(_streamid);
current_->set_src_task_id(_src_task_id);
current_size_ = 0;
}
sp_int64 tuple_key = 0;
if (_tuple->roots_size() > 0) {
tuple_key = RandUtils::lrand();
}
// Override in place
_tuple->set_key(tuple_key);
std::string* added_tuple = current_->mutable_data()->add_tuples();
_tuple->SerializePartialToString(added_tuple);
sp_int64 tuple_size = _tuple->GetCachedSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
return tuple_key;
}
void TupleCache::TupleList::add_ack_tuple(sp_int32 _src_task_id,
const proto::system::AckTuple& _tuple,
sp_uint64* _total_size) {
if (!current_ || current_->src_task_id() != _src_task_id ||
current_->has_data() || current_->control().emits_size() > 0) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_->set_src_task_id(_src_task_id);
current_size_ = 0;
}
sp_int64 tuple_size = _tuple.ByteSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
current_->mutable_control()->add_acks()->CopyFrom(_tuple);
}
void TupleCache::TupleList::add_fail_tuple(sp_int32 _src_task_id,
const proto::system::AckTuple& _tuple,
sp_uint64* _total_size) {
if (!current_ || current_->src_task_id() != _src_task_id ||
current_->has_data() || current_->control().emits_size() > 0) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_->set_src_task_id(_src_task_id);
current_size_ = 0;
}
sp_int64 tuple_size = _tuple.ByteSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
current_->mutable_control()->add_fails()->CopyFrom(_tuple);
}
void TupleCache::TupleList::add_emit_tuple(sp_int32 _src_task_id,
const proto::system::AckTuple& _tuple,
sp_uint64* _total_size) {
if (!current_ || current_->src_task_id() != _src_task_id ||
current_->has_data() || current_->control().acks_size() > 0 ||
current_->control().fails_size() > 0) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_->set_src_task_id(_src_task_id);
current_size_ = 0;
}
sp_int64 tuple_size = _tuple.ByteSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
current_->mutable_control()->add_emits()->CopyFrom(_tuple);
}
void TupleCache::TupleList::add_checkpoint_tuple(
proto::ckptmgr::DownstreamStatefulCheckpoint* _message,
sp_uint64* _total_size) {
if (current_) {
tuples_.push_front(current_);
current_ = NULL;
current_size_ = 0;
}
sp_int64 tuple_size = _message->ByteSize();
*_total_size += tuple_size;
tuples_.push_front(_message);
}
void TupleCache::TupleList::drain(
sp_int32 _task_id, std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _tuple_drainer,
std::function<void(sp_int32, proto::ckptmgr::DownstreamStatefulCheckpoint*)>
_checkpoint_drainer) {
sp_int32 drained = 0;
// we have to drain from back
while (!tuples_.empty()) {
if (tuples_.back()->GetTypeName() == "heron.proto.system.HeronTupleSet2") {
auto t = static_cast<proto::system::HeronTupleSet2*>(tuples_.back());
_tuple_drainer(_task_id, t); // Drain cleans up the structure
} else {
auto t = static_cast<proto::ckptmgr::DownstreamStatefulCheckpoint*>(tuples_.back());
_checkpoint_drainer(_task_id, t); // Drain cleans up the structure
}
tuples_.pop_back();
drained++;
}
if (current_ && drained == 0 && last_drained_count_ == 0) {
// We didn;t drain anything last time. Better do it now
// TODO(vikasr) : Add metric
_tuple_drainer(_task_id, current_);
drained++;
current_ = NULL;
current_size_ = 0;
}
last_drained_count_ = drained;
}
} // namespace stmgr
} // namespace heron