blob: 9e9eb71224a0004abd755ede7dcfa57dddbe084e [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "util/tuple-cache.h"
#include <iostream>
#include <map>
#include <string>
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
namespace heron {
namespace stmgr {
TupleCache::TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold)
: eventLoop_(eventLoop), drain_threshold_bytes_(_drain_threshold) {
cache_drain_frequency_ms_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrCacheDrainFrequencyMs();
tuples_cache_max_tuple_size_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrPacketMaximumSizeBytes();
total_size_ = 0;
auto drain_cb = [this](EventLoop::Status status) { this->drain(status); };
eventLoop_->registerTimer(std::move(drain_cb), true, cache_drain_frequency_ms_ * 1000);
}
TupleCache::~TupleCache() {
// Drain the cache first
drain_impl();
for (auto iter = cache_.begin(); iter != cache_.end(); ++iter) {
delete iter->second;
}
}
sp_int64 TupleCache::add_data_tuple(sp_int32 _task_id, const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_data_tuple(_streamid, _tuple, &total_size_, &tuples_cache_max_tuple_size_);
}
void TupleCache::add_ack_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_ack_tuple(_tuple, &total_size_);
}
void TupleCache::add_fail_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_fail_tuple(_tuple, &total_size_);
}
void TupleCache::add_emit_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
if (total_size_ >= drain_threshold_bytes_) drain_impl();
TupleList* l = get(_task_id);
return l->add_emit_tuple(_tuple, &total_size_);
}
TupleCache::TupleList* TupleCache::get(sp_int32 _task_id) {
TupleList* l = NULL;
auto iter = cache_.find(_task_id);
if (iter == cache_.end()) {
l = new TupleList();
cache_[_task_id] = l;
} else {
l = iter->second;
}
return l;
}
void TupleCache::drain(EventLoop::Status) { drain_impl(); }
void TupleCache::drain_impl() {
for (auto iter = cache_.begin(); iter != cache_.end(); ++iter) {
iter->second->drain(iter->first, drainer_);
}
total_size_ = 0;
}
TupleCache::TupleList::TupleList() {
current_ = NULL;
current_size_ = 0;
last_drained_count_ = 0;
}
TupleCache::TupleList::~TupleList() {
CHECK(tuples_.empty());
}
sp_int64 TupleCache::TupleList::add_data_tuple(const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple,
sp_uint64* _total_size,
sp_uint64* _tuples_cache_max_tuple_size) {
if (!current_ || current_->has_control() || current_->data().stream().id() != _streamid.id() ||
current_->data().stream().component_name() != _streamid.component_name() ||
current_size_ > *_tuples_cache_max_tuple_size) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_->mutable_data()->mutable_stream()->MergeFrom(_streamid);
current_size_ = 0;
}
sp_int64 tuple_key = 0;
if (_tuple->roots_size() > 0) {
tuple_key = RandUtils::lrand();
}
// Override in place
_tuple->set_key(tuple_key);
std::string* added_tuple = current_->mutable_data()->add_tuples();
_tuple->SerializePartialToString(added_tuple);
sp_int64 tuple_size = _tuple->GetCachedSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
return tuple_key;
}
void TupleCache::TupleList::add_ack_tuple(const proto::system::AckTuple& _tuple,
sp_uint64* _total_size) {
if (!current_ || current_->has_data() || current_->control().emits_size() > 0) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_size_ = 0;
}
sp_int64 tuple_size = _tuple.ByteSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
current_->mutable_control()->add_acks()->CopyFrom(_tuple);
}
void TupleCache::TupleList::add_fail_tuple(const proto::system::AckTuple& _tuple,
sp_uint64* _total_size) {
if (!current_ || current_->has_data() || current_->control().emits_size() > 0) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_size_ = 0;
}
sp_int64 tuple_size = _tuple.ByteSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
current_->mutable_control()->add_fails()->CopyFrom(_tuple);
}
void TupleCache::TupleList::add_emit_tuple(const proto::system::AckTuple& _tuple,
sp_uint64* _total_size) {
if (!current_ || current_->has_data() || current_->control().acks_size() > 0 ||
current_->control().fails_size() > 0) {
if (current_) {
tuples_.push_front(current_);
}
current_ = acquire_clean_set();
current_size_ = 0;
}
sp_int64 tuple_size = _tuple.ByteSize();
current_size_ += tuple_size;
*_total_size += tuple_size;
current_->mutable_control()->add_emits()->CopyFrom(_tuple);
}
void TupleCache::TupleList::drain(
sp_int32 _task_id, std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _drainer) {
sp_int32 drained = 0;
// we have to drain from back
while (!tuples_.empty()) {
_drainer(_task_id, tuples_.back()); // Drain cleans up the structure
tuples_.pop_back();
drained++;
}
if (current_ && drained == 0 && last_drained_count_ == 0) {
// We didn;t drain anything last time. Better do it now
// TODO(vikasr) : Add metric
_drainer(_task_id, current_);
drained++;
current_ = NULL;
current_size_ = 0;
}
last_drained_count_ = drained;
}
} // namespace stmgr
} // namespace heron