| /* |
| * Copyright 2015 Twitter, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <limits> |
| #include <map> |
| #include <vector> |
| #include <sstream> |
| #include <thread> |
| #include <string> |
| #include <unordered_set> |
| #include <unordered_map> |
| #include "gtest/gtest.h" |
| #include "glog/logging.h" |
| #include "proto/messages.h" |
| #include "basics/basics.h" |
| #include "errors/errors.h" |
| #include "threads/threads.h" |
| #include "network/network.h" |
| #include "basics/modinit.h" |
| #include "errors/modinit.h" |
| #include "threads/modinit.h" |
| #include "network/modinit.h" |
| #include "config/topology-config-vars.h" |
| #include "config/topology-config-helper.h" |
| #include "config/physical-plan-helper.h" |
| #include "config/heron-internals-config-reader.h" |
| #include "metrics/metrics-mgr-st.h" |
| #include "manager/stateful-restorer.h" |
| #include "server/dummy_ckptmgr_client.h" |
| #include "server/dummy_tuple_cache.h" |
| #include "server/dummy_stmgr_clientmgr.h" |
| #include "server/dummy_stmgr_server.h" |
| |
| const sp_string SPOUT_NAME = "spout"; |
| const sp_string BOLT_NAME = "bolt"; |
| const sp_string STREAM_NAME = "stream"; |
| const sp_string CONTAINER_INDEX = "0"; |
| const sp_string STMGR_NAME = "stmgr"; |
| const sp_string LOCALHOST = "127.0.0.1"; |
| |
| sp_string heron_internals_config_filename = |
| "../../../../../../../../heron/config/heron_internals.yaml"; |
| |
| // Generate a dummy topology |
| static heron::proto::api::Topology* GenerateDummyTopology( |
| const sp_string& topology_name, const sp_string& topology_id, int num_spouts, |
| int num_spout_instances, int num_bolts, int num_bolt_instances, |
| const heron::proto::api::Grouping& grouping) { |
| heron::proto::api::Topology* topology = new heron::proto::api::Topology(); |
| topology->set_id(topology_id); |
| topology->set_name(topology_name); |
| size_t spouts_size = num_spouts; |
| size_t bolts_size = num_bolts; |
| // Set spouts |
| for (size_t i = 0; i < spouts_size; ++i) { |
| heron::proto::api::Spout* spout = topology->add_spouts(); |
| // Set the component information |
| heron::proto::api::Component* component = spout->mutable_comp(); |
| sp_string compname = SPOUT_NAME; |
| compname += std::to_string(i); |
| component->set_name(compname); |
| heron::proto::api::ComponentObjectSpec compspec = heron::proto::api::JAVA_CLASS_NAME; |
| component->set_spec(compspec); |
| // Set the stream information |
| heron::proto::api::OutputStream* ostream = spout->add_outputs(); |
| heron::proto::api::StreamId* tstream = ostream->mutable_stream(); |
| sp_string streamid = STREAM_NAME; |
| streamid += std::to_string(i); |
| tstream->set_id(streamid); |
| tstream->set_component_name(compname); |
| heron::proto::api::StreamSchema* schema = ostream->mutable_schema(); |
| heron::proto::api::StreamSchema::KeyType* key_type = schema->add_keys(); |
| key_type->set_key("dummy"); |
| key_type->set_type(heron::proto::api::OBJECT); |
| // Set the config |
| heron::proto::api::Config* config = component->mutable_config(); |
| heron::proto::api::Config::KeyValue* kv = config->add_kvs(); |
| kv->set_key(heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM); |
| kv->set_value(std::to_string(num_spout_instances)); |
| } |
| // Set bolts |
| for (size_t i = 0; i < bolts_size; ++i) { |
| heron::proto::api::Bolt* bolt = topology->add_bolts(); |
| // Set the component information |
| heron::proto::api::Component* component = bolt->mutable_comp(); |
| sp_string compname = BOLT_NAME; |
| compname += std::to_string(i); |
| component->set_name(compname); |
| heron::proto::api::ComponentObjectSpec compspec = heron::proto::api::JAVA_CLASS_NAME; |
| component->set_spec(compspec); |
| // Set the stream information |
| heron::proto::api::InputStream* istream = bolt->add_inputs(); |
| heron::proto::api::StreamId* tstream = istream->mutable_stream(); |
| sp_string streamid = STREAM_NAME; |
| streamid += std::to_string(i); |
| tstream->set_id(streamid); |
| sp_string input_compname = SPOUT_NAME; |
| input_compname += std::to_string(i); |
| tstream->set_component_name(input_compname); |
| istream->set_gtype(grouping); |
| // Set the config |
| heron::proto::api::Config* config = component->mutable_config(); |
| heron::proto::api::Config::KeyValue* kv = config->add_kvs(); |
| kv->set_key(heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM); |
| kv->set_value(std::to_string(num_bolt_instances)); |
| } |
| // Set message timeout |
| heron::proto::api::Config* topology_config = topology->mutable_topology_config(); |
| heron::proto::api::Config::KeyValue* kv = topology_config->add_kvs(); |
| kv->set_key(heron::config::TopologyConfigVars::TOPOLOGY_RELIABILITY_MODE); |
| kv->set_value("ATLEAST_ONCE"); |
| |
| // Set state |
| topology->set_state(heron::proto::api::RUNNING); |
| |
| return topology; |
| } |
| |
| const sp_string CreateInstanceId(int32_t _global_index) { |
| std::ostringstream instanceid_stream; |
| instanceid_stream << "instance-" << _global_index; |
| return instanceid_stream.str(); |
| } |
| |
| std::string GenerateStMgrId(int32_t _index) { |
| std::ostringstream ostr; |
| ostr << "stmgr-" << _index; |
| return ostr.str(); |
| } |
| |
| heron::proto::system::Instance* CreateInstance(int32_t _comp, int32_t _comp_instance, |
| int32_t _stmgr_id, |
| int32_t _global_index, bool _is_spout) { |
| heron::proto::system::Instance* imap = new heron::proto::system::Instance(); |
| imap->set_instance_id(CreateInstanceId(_global_index)); |
| imap->set_stmgr_id(GenerateStMgrId(_stmgr_id)); |
| heron::proto::system::InstanceInfo* inst = imap->mutable_info(); |
| inst->set_task_id(_global_index); |
| inst->set_component_index(_comp_instance); |
| if (_is_spout) { |
| inst->set_component_name("spout" + std::to_string(_comp)); |
| } else { |
| inst->set_component_name("bolt" + std::to_string(_comp)); |
| } |
| return imap; |
| } |
| |
| heron::proto::system::PhysicalPlan* CreatePplan(int32_t _ncontainers, |
| int32_t _nsbcomp, int32_t _ninstances) { |
| int32_t nContainers = _ncontainers; |
| int32_t nSpouts = _nsbcomp; |
| int32_t nSpoutInstances = _ninstances; |
| int32_t nBolts = _nsbcomp; |
| int32_t nBoltInstances = _ninstances; |
| auto topology = GenerateDummyTopology("TestTopology", "TestTopology-12345", |
| nSpouts, nSpoutInstances, nBolts, nBoltInstances, |
| heron::proto::api::SHUFFLE); |
| auto pplan = new heron::proto::system::PhysicalPlan(); |
| pplan->mutable_topology()->CopyFrom(*topology); |
| delete topology; |
| for (int32_t i = 0; i < nContainers; ++i) { |
| auto stmgr = pplan->add_stmgrs(); |
| stmgr->set_id(GenerateStMgrId(i)); |
| stmgr->set_host_name("127.0.0.1"); |
| stmgr->set_data_port(100); |
| stmgr->set_local_endpoint("101"); |
| } |
| int32_t stmgr_assignment = 0; |
| int32_t global_index = 1; |
| for (int spout = 0; spout < nSpouts; ++spout) { |
| for (int spout_instance = 0; spout_instance < nSpoutInstances; ++spout_instance) { |
| heron::proto::system::Instance* instance = |
| CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true); |
| if (++stmgr_assignment >= nContainers) { |
| stmgr_assignment = 0; |
| } |
| pplan->add_instances()->CopyFrom(*instance); |
| delete instance; |
| } |
| } |
| for (int bolt = 0; bolt < nBolts; ++bolt) { |
| for (int bolt_instance = 0; bolt_instance < nBoltInstances; ++bolt_instance) { |
| heron::proto::system::Instance* instance = |
| CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false); |
| if (++stmgr_assignment >= nContainers) { |
| stmgr_assignment = 0; |
| } |
| pplan->add_instances()->CopyFrom(*instance); |
| delete instance; |
| } |
| } |
| return pplan; |
| } |
| |
| DummyCkptMgrClient* CreateDummyCkptMgr(heron::proto::system::PhysicalPlan* _pplan, |
| EventLoop* _eventLoop) { |
| NetworkOptions options; |
| return new DummyCkptMgrClient(_eventLoop, options, GenerateStMgrId(1), _pplan); |
| } |
| |
| DummyStMgrServer* CreateDummyStMgrServer(EventLoop* _eventLoop, |
| const std::string& _stmgr, |
| heron::proto::system::PhysicalPlan* _pplan, |
| heron::common::MetricsMgrSt* _metrics) { |
| NetworkOptions options; |
| std::vector<std::string> dummy_instances; |
| return new DummyStMgrServer(_eventLoop, options, _stmgr, _pplan, dummy_instances, |
| _metrics); |
| } |
| |
| void RestoreDone(bool* _restore_done) { |
| *_restore_done = true; |
| } |
| |
| // Normal case. Start Restore, get ckpts from all, send restores, get restores from all |
| // and then the done watcher called |
| TEST(StatefulRestorer, normalcase) { |
| auto pplan = CreatePplan(2, 4, 3); |
| EventLoop* dummyLoop = new EventLoopImpl(); |
| auto ckptmgr_client = CreateDummyCkptMgr(pplan, dummyLoop); |
| auto tuple_cache = new DummyTupleCache(dummyLoop); |
| auto dummy_metrics_client = new heron::common::MetricsMgrSt(11001, 100, dummyLoop); |
| dummy_metrics_client->Start("127.0.0.1", 11000, "_stmgr", "_stmgr"); |
| auto dummy_stmgr_clientmgr = new DummyStMgrClientMgr(dummyLoop, dummy_metrics_client, |
| GenerateStMgrId(1), pplan); |
| auto dummy_stmgr_server = CreateDummyStMgrServer(dummyLoop, GenerateStMgrId(1), |
| pplan, dummy_metrics_client); |
| bool restore_done = false; |
| std::string ckpt_id = "ckpt1"; |
| auto restorer = new heron::stmgr::StatefulRestorer(ckptmgr_client, dummy_stmgr_clientmgr, |
| tuple_cache, dummy_stmgr_server, |
| dummy_metrics_client, |
| std::bind(&RestoreDone, &restore_done)); |
| // At the start the restore is not in progress |
| EXPECT_FALSE(restorer->InProgress()); |
| std::unordered_set<sp_int32> local_taskids; |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_taskids); |
| restorer->StartRestore(ckpt_id, 1, local_taskids, pplan); |
| // Now we are in restore |
| EXPECT_TRUE(restorer->InProgress()); |
| // Make sure that the ckpt messages were sent to our tasks |
| std::unordered_set<int32_t> local_tasks; |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_tasks); |
| for (auto task : local_tasks) { |
| EXPECT_TRUE(ckptmgr_client->GetCalled(ckpt_id, task)); |
| } |
| local_tasks.clear(); |
| // Make sure that the ckpt messages were not sent to some other tasks |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(0), local_tasks); |
| for (auto task : local_tasks) { |
| EXPECT_FALSE(ckptmgr_client->GetCalled(ckpt_id, task)); |
| } |
| local_tasks.clear(); |
| |
| // Responses from ckptmgr |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_tasks); |
| for (auto task : local_tasks) { |
| EXPECT_FALSE(dummy_stmgr_server->DidSendRestoreRequest(task)); |
| heron::proto::ckptmgr::InstanceStateCheckpoint c; |
| c.set_checkpoint_id(ckpt_id); |
| restorer->HandleCheckpointState(heron::proto::system::OK, task, ckpt_id, c); |
| // Make sure that restore is sent to the instances |
| EXPECT_TRUE(dummy_stmgr_server->DidSendRestoreRequest(task)); |
| } |
| EXPECT_TRUE(restorer->InProgress()); |
| |
| // Send notification that tasks have recovered |
| for (auto task : local_tasks) { |
| restorer->HandleInstanceRestoredState(task, heron::proto::system::OK, ckpt_id); |
| } |
| EXPECT_TRUE(restorer->InProgress()); |
| |
| restorer->HandleAllStMgrClientsConnected(); |
| EXPECT_TRUE(restorer->InProgress()); |
| restorer->HandleAllInstancesConnected(); |
| EXPECT_FALSE(restorer->InProgress()); |
| EXPECT_TRUE(restore_done); |
| |
| delete restorer; |
| delete dummy_stmgr_clientmgr; |
| delete dummy_metrics_client; |
| delete tuple_cache; |
| delete ckptmgr_client; |
| delete pplan; |
| delete dummyLoop; |
| } |
| |
| // If instances die in the middle of a restore, make sure that |
| // they have to be recovered again |
| TEST(StatefulRestorer, deadinstances) { |
| auto pplan = CreatePplan(2, 4, 3); |
| EventLoop* dummyLoop = new EventLoopImpl(); |
| auto ckptmgr_client = CreateDummyCkptMgr(pplan, dummyLoop); |
| auto tuple_cache = new DummyTupleCache(dummyLoop); |
| auto dummy_metrics_client = new heron::common::MetricsMgrSt(11001, 100, dummyLoop); |
| dummy_metrics_client->Start("127.0.0.1", 11000, "_stmgr", "_stmgr"); |
| auto dummy_stmgr_clientmgr = new DummyStMgrClientMgr(dummyLoop, dummy_metrics_client, |
| GenerateStMgrId(1), pplan); |
| auto dummy_stmgr_server = CreateDummyStMgrServer(dummyLoop, GenerateStMgrId(1), |
| pplan, dummy_metrics_client); |
| bool restore_done = false; |
| std::string ckpt_id = "ckpt1"; |
| auto restorer = new heron::stmgr::StatefulRestorer(ckptmgr_client, dummy_stmgr_clientmgr, |
| tuple_cache, dummy_stmgr_server, |
| dummy_metrics_client, |
| std::bind(&RestoreDone, &restore_done)); |
| // At the start the restore is not in progress |
| EXPECT_FALSE(restorer->InProgress()); |
| std::unordered_set<sp_int32> local_taskids; |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_taskids); |
| restorer->StartRestore(ckpt_id, 1, local_taskids, pplan); |
| // Now we are in restore |
| EXPECT_TRUE(restorer->InProgress()); |
| // Just have all stmgrs connected to us |
| restorer->HandleAllStMgrClientsConnected(); |
| |
| // Responses from ckptmgr |
| std::unordered_set<int32_t> local_tasks; |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_tasks); |
| for (auto task : local_tasks) { |
| heron::proto::ckptmgr::InstanceStateCheckpoint c; |
| c.set_checkpoint_id(ckpt_id); |
| restorer->HandleCheckpointState(heron::proto::system::OK, task, ckpt_id, c); |
| } |
| EXPECT_TRUE(restorer->InProgress()); |
| |
| // Send notification that some tasks have recovered |
| EXPECT_GT(local_tasks.size(), 1); |
| bool first = true; |
| int32_t troublesome_task; |
| for (auto task : local_tasks) { |
| if (first) { |
| first = false; |
| troublesome_task = task; |
| } else { |
| restorer->HandleInstanceRestoredState(task, heron::proto::system::OK, ckpt_id); |
| } |
| } |
| EXPECT_TRUE(restorer->InProgress()); |
| |
| // Notify that one instance is dead |
| restorer->HandleDeadInstanceConnection(troublesome_task); |
| dummy_stmgr_server->ClearSendRestoreRequest(troublesome_task); |
| ckptmgr_client->ClearGetCalled(ckpt_id, troublesome_task); |
| |
| EXPECT_TRUE(restorer->InProgress()); |
| EXPECT_FALSE(ckptmgr_client->GetCalled(ckpt_id, troublesome_task)); |
| EXPECT_FALSE(dummy_stmgr_server->DidSendRestoreRequest(troublesome_task)); |
| |
| // Now it is back on again |
| restorer->HandleAllInstancesConnected(); |
| // We are still in progress |
| EXPECT_TRUE(restorer->InProgress()); |
| // Make sure that ckpt request is sent again |
| EXPECT_TRUE(ckptmgr_client->GetCalled(ckpt_id, troublesome_task)); |
| // The state is fetched from ckptmgr |
| heron::proto::ckptmgr::InstanceStateCheckpoint c; |
| c.set_checkpoint_id(ckpt_id); |
| restorer->HandleCheckpointState(heron::proto::system::OK, troublesome_task, ckpt_id, c); |
| EXPECT_TRUE(restorer->InProgress()); |
| // make sure that we sent restore state |
| EXPECT_TRUE(dummy_stmgr_server->DidSendRestoreRequest(troublesome_task)); |
| restorer->HandleInstanceRestoredState(troublesome_task, heron::proto::system::OK, ckpt_id); |
| |
| // Now everything is done |
| EXPECT_FALSE(restorer->InProgress()); |
| EXPECT_TRUE(restore_done); |
| |
| delete restorer; |
| delete dummy_stmgr_clientmgr; |
| delete dummy_metrics_client; |
| delete tuple_cache; |
| delete ckptmgr_client; |
| delete pplan; |
| delete dummyLoop; |
| } |
| |
| // This tests the scenario where ckptmgr |
| // dies in the middle of a restore |
| TEST(StatefulRestorer, deadckptmgr) { |
| auto pplan = CreatePplan(2, 4, 3); |
| EventLoop* dummyLoop = new EventLoopImpl(); |
| auto ckptmgr_client = CreateDummyCkptMgr(pplan, dummyLoop); |
| auto tuple_cache = new DummyTupleCache(dummyLoop); |
| auto dummy_metrics_client = new heron::common::MetricsMgrSt(11001, 100, dummyLoop); |
| dummy_metrics_client->Start("127.0.0.1", 11000, "_stmgr", "_stmgr"); |
| auto dummy_stmgr_clientmgr = new DummyStMgrClientMgr(dummyLoop, dummy_metrics_client, |
| GenerateStMgrId(1), pplan); |
| dummy_stmgr_clientmgr->SetAllStMgrClientsRegistered(true); |
| auto dummy_stmgr_server = CreateDummyStMgrServer(dummyLoop, GenerateStMgrId(1), |
| pplan, dummy_metrics_client); |
| dummy_stmgr_server->SetAllInstancesConnectedToUs(true); |
| bool restore_done = false; |
| std::string ckpt_id = "ckpt1"; |
| auto restorer = new heron::stmgr::StatefulRestorer(ckptmgr_client, dummy_stmgr_clientmgr, |
| tuple_cache, dummy_stmgr_server, |
| dummy_metrics_client, |
| std::bind(&RestoreDone, &restore_done)); |
| // At the start the restore is not in progress |
| EXPECT_FALSE(restorer->InProgress()); |
| std::unordered_set<sp_int32> local_taskids; |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_taskids); |
| restorer->StartRestore(ckpt_id, 1, local_taskids, pplan); |
| // Now we are in restore |
| EXPECT_TRUE(restorer->InProgress()); |
| |
| // Responses from ckptmgr |
| std::unordered_set<int32_t> local_tasks; |
| heron::config::PhysicalPlanHelper::GetTasks(*pplan, GenerateStMgrId(1), local_tasks); |
| // Send notification that some tasks have recovered |
| EXPECT_GT(local_tasks.size(), 1); |
| bool first = true; |
| int32_t troublesome_task; |
| // ckpt delivers some checkpoints |
| for (auto task : local_tasks) { |
| if (first) { |
| first = false; |
| troublesome_task = task; |
| } else { |
| heron::proto::ckptmgr::InstanceStateCheckpoint c; |
| c.set_checkpoint_id(ckpt_id); |
| restorer->HandleCheckpointState(heron::proto::system::OK, task, ckpt_id, c); |
| restorer->HandleInstanceRestoredState(task, heron::proto::system::OK, ckpt_id); |
| } |
| } |
| EXPECT_TRUE(restorer->InProgress()); |
| |
| dummy_stmgr_server->ClearSendRestoreRequest(troublesome_task); |
| ckptmgr_client->ClearGetCalled(ckpt_id, troublesome_task); |
| EXPECT_FALSE(ckptmgr_client->GetCalled(ckpt_id, troublesome_task)); |
| |
| // Now ckpt mgr dies and comes back |
| restorer->HandleCkptMgrRestart(); |
| |
| // make sure that we sent the recover ckpt request |
| EXPECT_TRUE(ckptmgr_client->GetCalled(ckpt_id, troublesome_task)); |
| EXPECT_TRUE(restorer->InProgress()); |
| restorer->HandleInstanceRestoredState(troublesome_task, heron::proto::system::OK, ckpt_id); |
| |
| // Now everything is done |
| EXPECT_FALSE(restorer->InProgress()); |
| EXPECT_TRUE(restore_done); |
| |
| delete restorer; |
| delete dummy_stmgr_clientmgr; |
| delete dummy_metrics_client; |
| delete tuple_cache; |
| delete ckptmgr_client; |
| delete pplan; |
| delete dummyLoop; |
| } |
| |
| int main(int argc, char** argv) { |
| heron::common::Initialize(argv[0]); |
| testing::InitGoogleTest(&argc, argv); |
| if (argc > 1) { |
| std::cerr << "Using config file " << argv[1] << std::endl; |
| heron_internals_config_filename = argv[1]; |
| } |
| // Create the sington for heron_internals_config_reader, if it does not exist |
| if (!heron::config::HeronInternalsConfigReader::Exists()) { |
| heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, ""); |
| } |
| return RUN_ALL_TESTS(); |
| } |