blob: 26e2a57c13f1bf7b4e522a36482f805f30b9c9e5 [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <map>
#include "gtest/gtest.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "basics/modinit.h"
#include "errors/modinit.h"
#include "threads/modinit.h"
#include "network/modinit.h"
#include "config/heron-internals-config-reader.h"
#include "util/tuple-cache.h"
sp_string heron_internals_config_filename =
"../../../../../../../../heron/config/heron_internals.yaml";
class Drainer {
public:
Drainer(const std::map<sp_int32, sp_int32>& _num_data_tuples_expected,
const std::map<sp_int32, sp_int32>& _num_ack_tuples_expected,
const std::map<sp_int32, sp_int32>& _num_fail_tuples_expected)
: num_data_tuples_expected_(_num_data_tuples_expected),
num_ack_tuples_expected_(_num_ack_tuples_expected),
num_fail_tuples_expected_(_num_fail_tuples_expected),
ckpt_message_seen_(false) {}
~Drainer() {}
void Drain(sp_int32 _task_id, heron::proto::system::HeronTupleSet2* _t) {
if (_t->has_data()) {
EXPECT_EQ(_t->has_control(), false);
add_actual(_task_id, _t->data().tuples_size(), num_data_tuples_actual_);
} else {
EXPECT_EQ(_t->has_data(), false);
add_actual(_task_id, _t->control().acks_size(), num_ack_tuples_actual_);
add_actual(_task_id, _t->control().fails_size(), num_fail_tuples_actual_);
}
delete _t;
}
void CheckpointDrain(sp_int32 _task_id, heron::proto::ckptmgr::DownstreamStatefulCheckpoint* _t) {
ckpt_message_seen_ = true;
}
bool Verify(bool _ckpt) {
return verify(num_data_tuples_expected_, num_data_tuples_actual_) &&
verify(num_ack_tuples_expected_, num_ack_tuples_actual_) &&
verify(num_fail_tuples_expected_, num_fail_tuples_actual_) &&
ckpt_message_seen_ == _ckpt;
}
private:
void add_actual(sp_int32 _task_id, sp_int32 _increment,
std::map<sp_int32, sp_int32>& _which_map) {
if (_which_map.find(_task_id) == _which_map.end()) {
_which_map[_task_id] = _increment;
} else {
_which_map[_task_id] += _increment;
}
}
bool verify(const std::map<sp_int32, sp_int32>& _first,
const std::map<sp_int32, sp_int32>& _second) {
if (_first.size() != _second.size()) return false;
for (std::map<sp_int32, sp_int32>::const_iterator iter = _first.begin(); iter != _first.end();
++iter) {
if (_second.find(iter->first) == _second.end()) return false;
if (_second.find(iter->first)->second != iter->second) return false;
}
return true;
}
std::map<sp_int32, sp_int32> num_data_tuples_expected_;
std::map<sp_int32, sp_int32> num_ack_tuples_expected_;
std::map<sp_int32, sp_int32> num_fail_tuples_expected_;
std::map<sp_int32, sp_int32> num_data_tuples_actual_;
std::map<sp_int32, sp_int32> num_ack_tuples_actual_;
std::map<sp_int32, sp_int32> num_fail_tuples_actual_;
bool ckpt_message_seen_;
};
void DoneHandler(EventLoopImpl* _ss, EventLoopImpl::Status) { _ss->loopExit(); }
// Test simple data tuples drain
TEST(TupleCache, test_simple_data_drain) {
sp_int32 data_tuples_count = 23354;
EventLoopImpl ss;
sp_uint32 drain_threshold = 1024 * 1024;
heron::stmgr::TupleCache* g = new heron::stmgr::TupleCache(&ss, drain_threshold);
std::map<sp_int32, sp_int32> data_tuples;
data_tuples[1] = data_tuples_count;
std::map<sp_int32, sp_int32> ack_tuples;
std::map<sp_int32, sp_int32> fail_tuples;
Drainer* drainer = new Drainer(data_tuples, ack_tuples, fail_tuples);
g->RegisterDrainer(&Drainer::Drain, drainer);
heron::proto::api::StreamId dummy;
dummy.set_id("stream");
dummy.set_component_name("comp");
for (sp_int32 i = 0; i < data_tuples_count; ++i) {
heron::proto::system::HeronDataTuple tuple;
tuple.set_key(RandUtils::lrand());
g->add_data_tuple(1, 1, dummy, &tuple);
}
// 300 milliseconds second
auto cb = [&ss](EventLoopImpl::Status status) { DoneHandler(&ss, status); };
ss.registerTimer(std::move(cb), false, 300_ms);
ss.loop();
EXPECT_EQ(drainer->Verify(false), true);
delete drainer;
delete g;
}
// Test data/ack/fail mix
TEST(TupleCache, test_data_ack_fail_mix) {
sp_int32 data_tuples_count = 23354;
sp_int32 ack_tuples_count = 3543;
sp_int32 fail_tuples_count = 6564;
EventLoopImpl ss;
sp_uint32 drain_threshold = 1024 * 1024;
heron::stmgr::TupleCache* g = new heron::stmgr::TupleCache(&ss, drain_threshold);
std::map<sp_int32, sp_int32> data_tuples;
data_tuples[1] = data_tuples_count;
std::map<sp_int32, sp_int32> ack_tuples;
ack_tuples[1] = ack_tuples_count;
std::map<sp_int32, sp_int32> fail_tuples;
fail_tuples[1] = fail_tuples_count;
Drainer* drainer = new Drainer(data_tuples, ack_tuples, fail_tuples);
g->RegisterDrainer(&Drainer::Drain, drainer);
heron::proto::api::StreamId dummy;
dummy.set_id("stream");
dummy.set_component_name("comp");
sp_int32 max_count = std::max(std::max(data_tuples_count, ack_tuples_count), fail_tuples_count);
for (sp_int32 i = 0; i < max_count; ++i) {
if (i < data_tuples_count) {
heron::proto::system::HeronDataTuple tuple;
tuple.set_key(RandUtils::lrand());
g->add_data_tuple(1, 1, dummy, &tuple);
}
if (i < ack_tuples_count) {
heron::proto::system::AckTuple tuple;
tuple.set_ackedtuple(RandUtils::lrand());
g->add_ack_tuple(1, 1, tuple);
}
if (i < fail_tuples_count) {
heron::proto::system::AckTuple tuple;
tuple.set_ackedtuple(RandUtils::lrand());
g->add_fail_tuple(1, 1, tuple);
}
}
// 300 milliseconds second
auto cb = [&ss](EventLoopImpl::Status status) { DoneHandler(&ss, status); };
ss.registerTimer(std::move(cb), false, 300000);
ss.loop();
EXPECT_EQ(drainer->Verify(false), true);
delete drainer;
delete g;
}
// Test different stream mix
TEST(TupleCache, test_different_stream_mix) {
sp_int32 data_tuples_count = 23354; // make sure this is even
sp_int32 ack_tuples_count = 3544; // make sure this is even
sp_int32 fail_tuples_count = 6564; // make sure this is even
EventLoopImpl ss;
sp_uint32 drain_threshold = 1024 * 1024;
heron::stmgr::TupleCache* g = new heron::stmgr::TupleCache(&ss, drain_threshold);
std::map<sp_int32, sp_int32> data_tuples;
data_tuples[1] = data_tuples_count / 2;
data_tuples[2] = data_tuples_count / 2;
std::map<sp_int32, sp_int32> ack_tuples;
ack_tuples[1] = ack_tuples_count / 2;
ack_tuples[2] = ack_tuples_count / 2;
std::map<sp_int32, sp_int32> fail_tuples;
fail_tuples[1] = fail_tuples_count / 2;
fail_tuples[2] = fail_tuples_count / 2;
Drainer* drainer = new Drainer(data_tuples, ack_tuples, fail_tuples);
g->RegisterDrainer(&Drainer::Drain, drainer);
heron::proto::api::StreamId stream1;
stream1.set_id("stream1");
stream1.set_component_name("comp1");
heron::proto::api::StreamId stream2;
stream2.set_id("stream2");
stream2.set_component_name("comp2");
sp_int32 max_count = std::max(std::max(data_tuples_count, ack_tuples_count), fail_tuples_count);
for (sp_int32 i = 0; i < max_count; ++i) {
if (i < data_tuples_count) {
heron::proto::system::HeronDataTuple tuple;
tuple.set_key(RandUtils::lrand());
if (i % 2 == 0) {
g->add_data_tuple(1, 1, stream1, &tuple);
} else {
g->add_data_tuple(1, 2, stream2, &tuple);
}
}
if (i < ack_tuples_count) {
heron::proto::system::AckTuple tuple;
tuple.set_ackedtuple(RandUtils::lrand());
if (i % 2 == 0) {
g->add_ack_tuple(1, 1, tuple);
} else {
g->add_ack_tuple(1, 2, tuple);
}
}
if (i < fail_tuples_count) {
heron::proto::system::AckTuple tuple;
tuple.set_ackedtuple(RandUtils::lrand());
if (i % 2 == 0) {
g->add_fail_tuple(1, 1, tuple);
} else {
g->add_fail_tuple(1, 2, tuple);
}
}
}
// 400 milliseconds second
auto cb = [&ss](EventLoopImpl::Status status) { DoneHandler(&ss, status); };
ss.registerTimer(std::move(cb), false, 300000);
ss.loop();
EXPECT_EQ(drainer->Verify(false), true);
delete drainer;
delete g;
}
// Test drain with checkpoint marker
TEST(TupleCache, test_checkpoint_drain) {
sp_int32 data_tuples_count = 23354;
EventLoopImpl ss;
sp_uint32 drain_threshold = 1024 * 1024;
heron::stmgr::TupleCache* g = new heron::stmgr::TupleCache(&ss, drain_threshold);
std::map<sp_int32, sp_int32> data_tuples;
data_tuples[1] = data_tuples_count;
std::map<sp_int32, sp_int32> ack_tuples;
std::map<sp_int32, sp_int32> fail_tuples;
Drainer* drainer = new Drainer(data_tuples, ack_tuples, fail_tuples);
g->RegisterDrainer(&Drainer::Drain, drainer);
g->RegisterCheckpointDrainer(&Drainer::CheckpointDrain, drainer);
heron::proto::api::StreamId dummy;
dummy.set_id("stream");
dummy.set_component_name("comp");
for (sp_int32 i = 0; i < data_tuples_count/2; ++i) {
heron::proto::system::HeronDataTuple tuple;
tuple.set_key(RandUtils::lrand());
g->add_data_tuple(1, 1, dummy, &tuple);
}
heron::proto::ckptmgr::DownstreamStatefulCheckpoint ckpt_message;
g->add_checkpoint_tuple(1, &ckpt_message);
for (sp_int32 i = 0; i < data_tuples_count/2; ++i) {
heron::proto::system::HeronDataTuple tuple;
tuple.set_key(RandUtils::lrand());
g->add_data_tuple(1, 1, dummy, &tuple);
}
// 300 milliseconds second
auto cb = [&ss](EventLoopImpl::Status status) { DoneHandler(&ss, status); };
ss.registerTimer(std::move(cb), false, 300_ms);
ss.loop();
EXPECT_EQ(drainer->Verify(true), true);
delete drainer;
delete g;
}
int main(int argc, char** argv) {
heron::common::Initialize(argv[0]);
testing::InitGoogleTest(&argc, argv);
if (argc > 1) {
std::cerr << "Using config file " << argv[1] << std::endl;
heron_internals_config_filename = argv[1];
}
char path1[1024]; // This is a buffer for the text
getcwd(path1, 1024);
std::cout << "Current working directory " << path1 << std::endl;
// Create the sington for heron_internals_config_reader, if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
return RUN_ALL_TESTS();
}