| /* |
| * Copyright 2015 Twitter, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <limits> |
| #include <map> |
| #include <vector> |
| #include <sstream> |
| #include <thread> |
| #include "gtest/gtest.h" |
| #include "glog/logging.h" |
| #include "proto/messages.h" |
| #include "basics/basics.h" |
| #include "errors/errors.h" |
| #include "threads/threads.h" |
| #include "network/network.h" |
| #include "basics/modinit.h" |
| #include "errors/modinit.h" |
| #include "threads/modinit.h" |
| #include "network/modinit.h" |
| #include "config/heron-internals-config-reader.h" |
| #include "config/topology-config-vars.h" |
| #include "config/topology-config-helper.h" |
| #include "config/physical-plan-helper.h" |
| #include "statemgr/heron-statemgr.h" |
| #include "statemgr/heron-localfilestatemgr.h" |
| #include "manager/tmaster.h" |
| #include "manager/stmgr.h" |
| #include "server/dummy_instance.h" |
| #include "server/dummy_stmgr.h" |
| #include "server/dummy_metricsmgr.h" |
| |
| const sp_string SPOUT_NAME = "spout"; |
| const sp_string BOLT_NAME = "bolt"; |
| const sp_string STREAM_NAME = "stream"; |
| const sp_string CONTAINER_INDEX = "0"; |
| const sp_string STMGR_NAME = "stmgr"; |
| const sp_string MESSAGE_TIMEOUT = "30"; // seconds |
| const sp_string LOCALHOST = "127.0.0.1"; |
| sp_string heron_internals_config_filename = |
| "heron/config/src/yaml/conf/test/test_heron_internals.yaml"; |
| sp_string metrics_sinks_config_filename = |
| "heron/config/src/yaml/conf/test/test_metrics_sinks.yaml"; |
| |
| // Generate a dummy topology |
| static heron::proto::api::Topology* GenerateDummyTopology( |
| const sp_string& topology_name, const sp_string& topology_id, int num_spouts, |
| int num_spout_instances, int num_bolts, int num_bolt_instances, |
| const heron::proto::api::Grouping& grouping) { |
| heron::proto::api::Topology* topology = new heron::proto::api::Topology(); |
| topology->set_id(topology_id); |
| topology->set_name(topology_name); |
| size_t spouts_size = num_spouts; |
| size_t bolts_size = num_bolts; |
| // Set spouts |
| for (size_t i = 0; i < spouts_size; ++i) { |
| heron::proto::api::Spout* spout = topology->add_spouts(); |
| // Set the component information |
| heron::proto::api::Component* component = spout->mutable_comp(); |
| sp_string compname = SPOUT_NAME; |
| compname += std::to_string(i); |
| component->set_name(compname); |
| heron::proto::api::ComponentObjectSpec compspec = heron::proto::api::JAVA_CLASS_NAME; |
| component->set_spec(compspec); |
| // Set the stream information |
| heron::proto::api::OutputStream* ostream = spout->add_outputs(); |
| heron::proto::api::StreamId* tstream = ostream->mutable_stream(); |
| sp_string streamid = STREAM_NAME; |
| streamid += std::to_string(i); |
| tstream->set_id(streamid); |
| tstream->set_component_name(compname); |
| heron::proto::api::StreamSchema* schema = ostream->mutable_schema(); |
| heron::proto::api::StreamSchema::KeyType* key_type = schema->add_keys(); |
| key_type->set_key("dummy"); |
| key_type->set_type(heron::proto::api::OBJECT); |
| // Set the config |
| heron::proto::api::Config* config = component->mutable_config(); |
| heron::proto::api::Config::KeyValue* kv = config->add_kvs(); |
| kv->set_key(heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM); |
| kv->set_value(std::to_string(num_spout_instances)); |
| } |
| // Set bolts |
| for (size_t i = 0; i < bolts_size; ++i) { |
| heron::proto::api::Bolt* bolt = topology->add_bolts(); |
| // Set the component information |
| heron::proto::api::Component* component = bolt->mutable_comp(); |
| sp_string compname = BOLT_NAME; |
| compname += std::to_string(i); |
| component->set_name(compname); |
| heron::proto::api::ComponentObjectSpec compspec = heron::proto::api::JAVA_CLASS_NAME; |
| component->set_spec(compspec); |
| // Set the stream information |
| heron::proto::api::InputStream* istream = bolt->add_inputs(); |
| heron::proto::api::StreamId* tstream = istream->mutable_stream(); |
| sp_string streamid = STREAM_NAME; |
| streamid += std::to_string(i); |
| tstream->set_id(streamid); |
| sp_string input_compname = SPOUT_NAME; |
| input_compname += std::to_string(i); |
| tstream->set_component_name(input_compname); |
| istream->set_gtype(grouping); |
| // Set the config |
| heron::proto::api::Config* config = component->mutable_config(); |
| heron::proto::api::Config::KeyValue* kv = config->add_kvs(); |
| kv->set_key(heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM); |
| kv->set_value(std::to_string(num_bolt_instances)); |
| } |
| // Set message timeout |
| heron::proto::api::Config* topology_config = topology->mutable_topology_config(); |
| heron::proto::api::Config::KeyValue* kv = topology_config->add_kvs(); |
| kv->set_key(heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS); |
| kv->set_value(MESSAGE_TIMEOUT); |
| |
| // Set state |
| topology->set_state(heron::proto::api::RUNNING); |
| |
| return topology; |
| } |
| |
| static heron::proto::system::PackingPlan* GenerateDummyPackingPlan(int num_stmgrs_, int num_spouts, |
| int num_spout_instances, int num_bolts, int num_bolt_instances) { |
| size_t spouts_size = num_spouts * num_spout_instances; |
| size_t bolts_size = num_bolts * num_bolt_instances; |
| |
| heron::proto::system::Resource* instanceResource = new heron::proto::system::Resource(); |
| instanceResource->set_ram(1); |
| instanceResource->set_cpu(1); |
| instanceResource->set_disk(1); |
| |
| heron::proto::system::Resource* containerRequiredResource = new heron::proto::system::Resource(); |
| containerRequiredResource->set_ram(10); |
| containerRequiredResource->set_cpu(10); |
| containerRequiredResource->set_disk(10); |
| |
| sp_int32 task_id = 0; |
| sp_int32 component_index = 0; |
| sp_int32 container_index = 0; |
| |
| heron::proto::system::PackingPlan* packingPlan = new heron::proto::system::PackingPlan(); |
| packingPlan->set_id("dummy_packing_plan_id"); |
| |
| std::map<size_t, heron::proto::system::ContainerPlan*> container_map_; |
| for (size_t i = 0; i < num_stmgrs_; ++i) { |
| heron::proto::system::ContainerPlan* containerPlan = packingPlan->add_container_plans(); |
| containerPlan->set_id(i); |
| heron::proto::system::Resource* requiredResource = containerPlan->mutable_requiredresource(); |
| requiredResource->set_cpu(containerRequiredResource->cpu()); |
| requiredResource->set_ram(containerRequiredResource->ram()); |
| requiredResource->set_disk(containerRequiredResource->disk()); |
| container_map_[i] = containerPlan; |
| } |
| |
| // Set spouts |
| for (size_t i = 0; i < spouts_size; ++i) { |
| heron::proto::system::ContainerPlan* containerPlan = container_map_[container_index]; |
| heron::proto::system::InstancePlan* instancePlan = containerPlan->add_instance_plans(); |
| instancePlan->set_component_name("spout_x"); |
| instancePlan->set_task_id(task_id++); |
| instancePlan->set_component_index(component_index++); |
| heron::proto::system::Resource* resource = instancePlan->mutable_resource(); |
| resource->set_cpu(instanceResource->cpu()); |
| resource->set_ram(instanceResource->ram()); |
| resource->set_disk(instanceResource->disk()); |
| if (++container_index == num_stmgrs_) { |
| container_index = 0; |
| } |
| } |
| |
| // Set bolts |
| component_index = 0; |
| for (size_t i = 0; i < bolts_size; ++i) { |
| heron::proto::system::ContainerPlan* containerPlan = container_map_[container_index]; |
| heron::proto::system::InstancePlan* instancePlan = containerPlan->add_instance_plans(); |
| instancePlan->set_component_name("bolt_x"); |
| instancePlan->set_task_id(task_id++); |
| instancePlan->set_component_index(component_index++); |
| heron::proto::system::Resource* resource = instancePlan->mutable_resource(); |
| resource->set_cpu(instanceResource->cpu()); |
| resource->set_ram(instanceResource->ram()); |
| resource->set_disk(instanceResource->disk()); |
| if (++container_index == num_stmgrs_) { |
| container_index = 0; |
| } |
| } |
| |
| return packingPlan; |
| } |
| |
| // Method to create the local zk state on the filesystem |
| void CreateLocalStateOnFS(heron::proto::api::Topology* topology, |
| heron::proto::system::PackingPlan* packingPlan, sp_string dpath) { |
| EventLoopImpl ss; |
| |
| // Write the dummy topology/tmaster location out to the local file system via the state mgr |
| heron::common::HeronLocalFileStateMgr state_mgr(dpath, &ss); |
| state_mgr.CreateTopology(*topology, NULL); |
| state_mgr.CreatePackingPlan(topology->name(), *packingPlan, NULL); |
| } |
| |
| const sp_string CreateInstanceId(sp_int8 type, sp_int8 instance, bool spout) { |
| std::ostringstream instanceid_stream; |
| if (spout) { |
| instanceid_stream << CONTAINER_INDEX << "_" << SPOUT_NAME << static_cast<int>(type); |
| } else { |
| instanceid_stream << CONTAINER_INDEX << "_" << BOLT_NAME << static_cast<int>(type); |
| } |
| instanceid_stream << "_" << static_cast<int>(instance); |
| return instanceid_stream.str(); |
| } |
| |
| heron::proto::system::Instance* CreateInstanceMap(sp_int8 type, sp_int8 instance, sp_int32 stmgr_id, |
| sp_int32 global_index, bool spout) { |
| heron::proto::system::Instance* imap = new heron::proto::system::Instance(); |
| imap->set_instance_id(CreateInstanceId(type, instance, spout)); |
| |
| imap->set_stmgr_id(STMGR_NAME + "-" + std::to_string(stmgr_id)); |
| heron::proto::system::InstanceInfo* inst = imap->mutable_info(); |
| inst->set_task_id(global_index); |
| inst->set_component_index(instance); |
| if (spout) |
| inst->set_component_name(SPOUT_NAME + std::to_string(type)); |
| else |
| inst->set_component_name(BOLT_NAME + std::to_string(type)); |
| return imap; |
| } |
| |
| // Dummy timer cb. See StartServer for explanation. |
| void DummyTimerCb(EventLoop::Status) { |
| // Do nothing |
| } |
| |
| // Function to start the threads |
| void StartServer(EventLoopImpl* ss) { |
| // In the ss register a dummy timer. This is to make sure that the |
| // exit from the loop happens timely. If there are no timers registered |
| // with the select server and also no activity on any of the fds registered |
| // for read/write then after the loopExit the loop continues indefinetly. |
| ss->registerTimer([](EventLoop::Status status) { DummyTimerCb(status); }, true, 1 * 1000 * 1000); |
| ss->loop(); |
| } |
| |
| void StartTMaster(EventLoopImpl*& ss, heron::tmaster::TMaster*& tmaster, |
| std::thread*& tmaster_thread, const sp_string& zkhostportlist, |
| const sp_string& topology_name, const sp_string& topology_id, |
| const sp_string& dpath, |
| sp_int32 tmaster_port, sp_int32 tmaster_controller_port, |
| sp_int32 tmaster_stats_port, sp_int32 metrics_mgr_port, |
| sp_int32 ckptmgr_port) { |
| ss = new EventLoopImpl(); |
| tmaster = new heron::tmaster::TMaster(zkhostportlist, topology_name, topology_id, dpath, |
| tmaster_controller_port, tmaster_port, tmaster_stats_port, |
| metrics_mgr_port, ckptmgr_port, |
| metrics_sinks_config_filename, LOCALHOST, ss); |
| tmaster_thread = new std::thread(StartServer, ss); |
| } |
| |
| void StartStMgr(EventLoopImpl*& ss, heron::stmgr::StMgr*& mgr, std::thread*& stmgr_thread, |
| const sp_string& stmgr_host, sp_int32& stmgr_port, sp_int32& local_data_port, |
| const sp_string& topology_name, |
| const sp_string& topology_id, const heron::proto::api::Topology* topology, |
| const std::vector<sp_string>& workers, const sp_string& stmgr_id, |
| const sp_string& zkhostportlist, const sp_string& dpath, sp_int32 metricsmgr_port, |
| sp_int32 shell_port, sp_int32 ckptmgr_port, const sp_string& ckptmgr_id, |
| sp_int64 _high_watermark, sp_int64 _low_watermark) { |
| // The topology will be owned and deleted by the strmgr |
| heron::proto::api::Topology* stmgr_topology = new heron::proto::api::Topology(); |
| stmgr_topology->CopyFrom(*topology); |
| // Create the select server for this stmgr to use |
| ss = new EventLoopImpl(); |
| mgr = new heron::stmgr::StMgr(ss, stmgr_host, stmgr_port, local_data_port, topology_name, |
| topology_id, |
| stmgr_topology, stmgr_id, workers, zkhostportlist, dpath, |
| metricsmgr_port, shell_port, ckptmgr_port, ckptmgr_id, |
| _high_watermark, _low_watermark); |
| EXPECT_EQ(0, stmgr_port); |
| EXPECT_EQ(0, local_data_port); |
| mgr->Init(); |
| stmgr_port = mgr->GetStmgrServerNetworkOptions().get_port(); |
| local_data_port = mgr->GetInstanceServerNetworkOptions().get_port(); |
| EXPECT_GT(stmgr_port, 0); |
| stmgr_thread = new std::thread(StartServer, ss); |
| } |
| |
| void StartDummyStMgr(EventLoopImpl*& ss, DummyStMgr*& mgr, std::thread*& stmgr_thread, |
| sp_int32& stmgr_port, sp_int32 tmaster_port, sp_int32 shell_port, |
| const sp_string& stmgr_id, |
| const std::vector<heron::proto::system::Instance*>& instances) { |
| // Create the select server for this stmgr to use |
| ss = new EventLoopImpl(); |
| |
| NetworkOptions options; |
| options.set_host(LOCALHOST); |
| options.set_port(stmgr_port); |
| options.set_max_packet_size(1_MB); |
| options.set_socket_family(PF_INET); |
| |
| mgr = new DummyStMgr(ss, options, stmgr_id, LOCALHOST, stmgr_port, LOCALHOST, tmaster_port, |
| shell_port, instances); |
| EXPECT_EQ(0, stmgr_port); |
| EXPECT_EQ(0, mgr->Start()) << "DummyStMgr bind " << LOCALHOST << ":" << stmgr_port; |
| stmgr_port = mgr->get_serveroptions().get_port(); |
| EXPECT_GT(stmgr_port, 0); |
| stmgr_thread = new std::thread(StartServer, ss); |
| } |
| |
| void StartDummyMtrMgr(EventLoopImpl*& ss, DummyMtrMgr*& mgr, std::thread*& mtmgr_thread, |
| sp_int32& mtmgr_port, const sp_string& stmgr_id, CountDownLatch* tmasterLatch, |
| CountDownLatch* connectionCloseLatch) { |
| // Create the select server for this stmgr to use |
| ss = new EventLoopImpl(); |
| |
| NetworkOptions options; |
| options.set_host(LOCALHOST); |
| options.set_port(mtmgr_port); |
| options.set_max_packet_size(10_MB); |
| options.set_socket_family(PF_INET); |
| |
| mgr = new DummyMtrMgr(ss, options, stmgr_id, tmasterLatch, connectionCloseLatch); |
| EXPECT_EQ(0, mgr->Start()) << "DummyMtrMgr bind " << LOCALHOST << ":" << mtmgr_port; |
| mtmgr_port = mgr->get_serveroptions().get_port(); |
| EXPECT_GT(mtmgr_port, 0); |
| mtmgr_thread = new std::thread(StartServer, ss); |
| } |
| |
| void StartDummySpoutInstance(EventLoopImpl*& ss, DummySpoutInstance*& worker, |
| std::thread*& worker_thread, sp_int32 stmgr_port, |
| const sp_string& topology_name, const sp_string& topology_id, |
| const sp_string& instance_id, const sp_string& component_name, |
| sp_int32 task_id, sp_int32 component_index, const sp_string& stmgr_id, |
| const sp_string& stream_id, sp_int32 max_msgs_to_send, |
| bool _do_custom_grouping) { |
| // Create the select server for this worker to use |
| ss = new EventLoopImpl(); |
| // Create the network option |
| NetworkOptions options; |
| options.set_host(LOCALHOST); |
| options.set_port(stmgr_port); |
| options.set_max_packet_size(10_MB); |
| options.set_socket_family(PF_INET); |
| |
| worker = new DummySpoutInstance(ss, options, topology_name, topology_id, instance_id, |
| component_name, task_id, component_index, stmgr_id, stream_id, |
| max_msgs_to_send, _do_custom_grouping); |
| worker->Start(); |
| worker_thread = new std::thread(StartServer, ss); |
| } |
| |
| void StartDummyBoltInstance(EventLoopImpl*& ss, DummyBoltInstance*& worker, |
| std::thread*& worker_thread, sp_int32 stmgr_port, |
| const sp_string& topology_name, const sp_string& topology_id, |
| const sp_string& instance_id, const sp_string& component_name, |
| sp_int32 task_id, sp_int32 component_index, const sp_string& stmgr_id, |
| sp_int32 expected_msgs_to_recv) { |
| // Create the select server for this worker to use |
| ss = new EventLoopImpl(); |
| // Create the network option |
| NetworkOptions options; |
| options.set_host(LOCALHOST); |
| options.set_port(stmgr_port); |
| options.set_max_packet_size(10_MB); |
| options.set_socket_family(PF_INET); |
| |
| worker = |
| new DummyBoltInstance(ss, options, topology_name, topology_id, instance_id, component_name, |
| task_id, component_index, stmgr_id, expected_msgs_to_recv); |
| worker->Start(); |
| worker_thread = new std::thread(StartServer, ss); |
| } |
| |
| struct CommonResources { |
| // arguments |
| sp_string tmaster_host_; |
| sp_int32 tmaster_port_; |
| sp_int32 tmaster_controller_port_; |
| sp_int32 tmaster_stats_port_; |
| sp_int32 metricsmgr_port_; |
| sp_int32 shell_port_; |
| sp_int32 ckptmgr_port_; |
| sp_string ckptmgr_id_; |
| sp_string zkhostportlist_; |
| sp_string topology_name_; |
| sp_string topology_id_; |
| size_t num_stmgrs_; |
| size_t num_spouts_; |
| size_t num_spout_instances_; |
| size_t num_bolts_; |
| size_t num_bolt_instances_; |
| |
| // store the stmgr server port returned by bind/listen 0 |
| std::vector<sp_int32> stmgr_ports_; |
| std::vector<sp_int32> local_data_ports_; |
| |
| heron::proto::api::Grouping grouping_; |
| |
| // returns - filled in by init |
| sp_string dpath_; |
| std::vector<EventLoopImpl*> ss_list_; |
| std::vector<sp_string> stmgrs_id_list_; |
| heron::proto::api::Topology* topology_; |
| heron::proto::system::PackingPlan* packing_plan_; |
| |
| heron::tmaster::TMaster* tmaster_; |
| std::thread* tmaster_thread_; |
| |
| // Component |
| std::vector<DummySpoutInstance*> spout_workers_list_; |
| std::vector<std::thread*> spout_workers_threads_list_; |
| std::vector<DummyBoltInstance*> bolt_workers_list_; |
| std::vector<std::thread*> bolt_workers_threads_list_; |
| |
| // metrics mgr |
| DummyMtrMgr* metrics_mgr_; |
| std::thread* metrics_mgr_thread_; |
| |
| // Stmgr |
| std::vector<heron::stmgr::StMgr*> stmgrs_list_; |
| std::vector<std::thread*> stmgrs_threads_list_; |
| |
| // Stmgr to instance ids |
| std::map<sp_int32, std::vector<sp_string> > stmgr_instance_id_list_; |
| |
| // Stmgr to Instance |
| std::map<sp_int32, std::vector<heron::proto::system::Instance*> > stmgr_instance_list_; |
| |
| // Instanceid to instance |
| std::map<sp_string, heron::proto::system::Instance*> instanceid_instance_; |
| |
| std::map<sp_string, sp_int32> instanceid_stmgr_; |
| |
| sp_int64 high_watermark_; |
| sp_int64 low_watermark_; |
| |
| CommonResources() : topology_(NULL), tmaster_(NULL), tmaster_thread_(NULL) { |
| // Create the sington for heron_internals_config_reader |
| // if it does not exist |
| if (!heron::config::HeronInternalsConfigReader::Exists()) { |
| heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, ""); |
| } |
| |
| // Create a temporary directory to write out the state |
| char dpath[255]; |
| snprintf(dpath, sizeof(dpath), "%s", "/tmp/XXXXXX"); |
| mkdtemp(dpath); |
| dpath_ = sp_string(dpath); |
| // Lets change the Connection buffer HWM and LWN for back pressure to get the |
| // test case done faster |
| high_watermark_ = 10_MB; |
| low_watermark_ = 5_MB; |
| } |
| |
| void setNumStmgrs(sp_int32 numStmgrs) { |
| num_stmgrs_ = numStmgrs; |
| while (numStmgrs > 0) { |
| stmgr_ports_.push_back(0); |
| local_data_ports_.push_back(0); |
| numStmgrs--; |
| } |
| } |
| }; |
| |
| void StartTMaster(CommonResources& common) { |
| // Generate a dummy topology |
| common.topology_ = GenerateDummyTopology( |
| common.topology_name_, common.topology_id_, common.num_spouts_, common.num_spout_instances_, |
| common.num_bolts_, common.num_bolt_instances_, common.grouping_); |
| common.packing_plan_ = GenerateDummyPackingPlan(common.num_stmgrs_, common.num_spouts_, |
| common.num_spout_instances_, common.num_bolts_, common.num_bolt_instances_); |
| |
| // Create the zk state on the local file system |
| CreateLocalStateOnFS(common.topology_, common.packing_plan_, common.dpath_); |
| |
| // Populate the list of stmgrs |
| for (int i = 0; i < common.num_stmgrs_; ++i) { |
| sp_string id = STMGR_NAME + "-"; |
| id += std::to_string(i); |
| common.stmgrs_id_list_.push_back(id); |
| } |
| |
| // Start the tmaster |
| EventLoopImpl* tmaster_eventLoop; |
| |
| StartTMaster(tmaster_eventLoop, common.tmaster_, common.tmaster_thread_, common.zkhostportlist_, |
| common.topology_name_, common.topology_id_, common.dpath_, |
| common.tmaster_port_, common.tmaster_controller_port_, common.tmaster_stats_port_, |
| common.metricsmgr_port_, common.ckptmgr_port_); |
| common.ss_list_.push_back(tmaster_eventLoop); |
| } |
| |
| void DistributeWorkersAcrossStmgrs(CommonResources& common) { |
| // which stmgr is this component going to get assigned to |
| sp_int32 stmgr_assignment_round = 0; |
| sp_int32 global_index = 0; |
| // Distribute the spouts |
| for (int spout = 0; spout < common.num_spouts_; ++spout) { |
| for (int spout_instance = 0; spout_instance < common.num_spout_instances_; ++spout_instance) { |
| heron::proto::system::Instance* imap = |
| CreateInstanceMap(spout, spout_instance, stmgr_assignment_round, global_index++, true); |
| common.stmgr_instance_id_list_[stmgr_assignment_round].push_back(imap->instance_id()); |
| common.stmgr_instance_list_[stmgr_assignment_round].push_back(imap); |
| common.instanceid_instance_[imap->instance_id()] = imap; |
| common.instanceid_stmgr_[imap->instance_id()] = stmgr_assignment_round; |
| // Have we completed a round of distribution of components |
| if (++stmgr_assignment_round == common.num_stmgrs_) { |
| stmgr_assignment_round = 0; |
| } |
| } |
| } |
| |
| stmgr_assignment_round = 0; |
| |
| // Distribute the bolts |
| for (int bolt = 0; bolt < common.num_bolts_; ++bolt) { |
| for (int bolt_instance = 0; bolt_instance < common.num_bolt_instances_; ++bolt_instance) { |
| heron::proto::system::Instance* imap = |
| CreateInstanceMap(bolt, bolt_instance, stmgr_assignment_round, global_index++, false); |
| // Have we completed a round of distribution of components |
| common.stmgr_instance_id_list_[stmgr_assignment_round].push_back(imap->instance_id()); |
| common.stmgr_instance_list_[stmgr_assignment_round].push_back(imap); |
| common.instanceid_instance_[imap->instance_id()] = imap; |
| common.instanceid_stmgr_[imap->instance_id()] = stmgr_assignment_round; |
| if (++stmgr_assignment_round == common.num_stmgrs_) { |
| stmgr_assignment_round = 0; |
| } |
| } |
| } |
| } |
| |
| void StartDummySpoutInstanceHelper(CommonResources& common, sp_int8 spout, sp_int8 spout_instance, |
| sp_int32 num_msgs_sent_by_spout_instance) { |
| // Start the spout |
| EventLoopImpl* worker_ss = NULL; |
| DummySpoutInstance* worker = NULL; |
| std::thread* worker_thread = NULL; |
| sp_string streamid = STREAM_NAME; |
| streamid += std::to_string(spout); |
| sp_string instanceid = CreateInstanceId(spout, spout_instance, true); |
| StartDummySpoutInstance(worker_ss, worker, worker_thread, |
| common.local_data_ports_[common.instanceid_stmgr_[instanceid]], |
| common.topology_name_, common.topology_id_, instanceid, |
| common.instanceid_instance_[instanceid]->info().component_name(), |
| common.instanceid_instance_[instanceid]->info().task_id(), |
| common.instanceid_instance_[instanceid]->info().component_index(), |
| common.instanceid_instance_[instanceid]->stmgr_id(), streamid, |
| num_msgs_sent_by_spout_instance, |
| common.grouping_ == heron::proto::api::CUSTOM); |
| common.ss_list_.push_back(worker_ss); |
| common.spout_workers_list_.push_back(worker); |
| common.spout_workers_threads_list_.push_back(worker_thread); |
| } |
| |
| void StartWorkerComponents(CommonResources& common, sp_int32 num_msgs_sent_by_spout_instance, |
| sp_int32 num_msgs_to_expect_in_bolt) { |
| // try to find the lowest bolt task id |
| sp_int32 min_bolt_task_id = std::numeric_limits<sp_int32>::max() - 1; |
| for (int bolt = 0; bolt < common.num_bolts_; ++bolt) { |
| for (int bolt_instance = 0; bolt_instance < common.num_bolt_instances_; ++bolt_instance) { |
| sp_string instanceid = CreateInstanceId(bolt, bolt_instance, false); |
| if (common.instanceid_instance_[instanceid]->info().task_id() < min_bolt_task_id) { |
| min_bolt_task_id = common.instanceid_instance_[instanceid]->info().task_id(); |
| } |
| } |
| } |
| |
| // Start the spouts |
| for (int spout = 0; spout < common.num_spouts_; ++spout) { |
| for (int spout_instance = 0; spout_instance < common.num_spout_instances_; ++spout_instance) { |
| StartDummySpoutInstanceHelper(common, spout, spout_instance, num_msgs_sent_by_spout_instance); |
| } |
| } |
| // Start the bolts |
| std::vector<DummyBoltInstance*> bolt_workers_list; |
| std::vector<std::thread*> bolt_workers_threads_list; |
| for (int bolt = 0; bolt < common.num_bolts_; ++bolt) { |
| for (int bolt_instance = 0; bolt_instance < common.num_bolt_instances_; ++bolt_instance) { |
| EventLoopImpl* worker_ss = NULL; |
| DummyBoltInstance* worker = NULL; |
| std::thread* worker_thread = NULL; |
| sp_string instanceid = CreateInstanceId(bolt, bolt_instance, false); |
| sp_int32 nmessages_to_expect = num_msgs_to_expect_in_bolt; |
| if (common.grouping_ == heron::proto::api::CUSTOM) { |
| if (common.instanceid_instance_[instanceid]->info().task_id() == min_bolt_task_id) { |
| nmessages_to_expect = |
| num_msgs_sent_by_spout_instance * common.num_spouts_ * common.num_spout_instances_; |
| } else { |
| nmessages_to_expect = 0; |
| } |
| } |
| StartDummyBoltInstance(worker_ss, worker, worker_thread, |
| common.local_data_ports_[common.instanceid_stmgr_[instanceid]], |
| common.topology_name_, common.topology_id_, instanceid, |
| common.instanceid_instance_[instanceid]->info().component_name(), |
| common.instanceid_instance_[instanceid]->info().task_id(), |
| common.instanceid_instance_[instanceid]->info().component_index(), |
| common.instanceid_instance_[instanceid]->stmgr_id(), |
| nmessages_to_expect); |
| common.ss_list_.push_back(worker_ss); |
| common.bolt_workers_list_.push_back(worker); |
| common.bolt_workers_threads_list_.push_back(worker_thread); |
| } |
| } |
| } |
| |
| void StartStMgrs(CommonResources& common) { |
| // Spawn and start the stmgrs |
| for (int i = 0; i < common.num_stmgrs_; ++i) { |
| EventLoopImpl* stmgr_ss = NULL; |
| heron::stmgr::StMgr* mgr = NULL; |
| std::thread* stmgr_thread = NULL; |
| |
| StartStMgr(stmgr_ss, mgr, stmgr_thread, common.tmaster_host_, common.stmgr_ports_[i], |
| common.local_data_ports_[i], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[i], common.stmgrs_id_list_[i], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| |
| common.ss_list_.push_back(stmgr_ss); |
| common.stmgrs_list_.push_back(mgr); |
| common.stmgrs_threads_list_.push_back(stmgr_thread); |
| } |
| } |
| |
| void StartMetricsMgr(CommonResources& common, CountDownLatch* tmasterLatch, |
| CountDownLatch* connectionCloseLatch) { |
| EventLoopImpl* ss = NULL; |
| DummyMtrMgr* mgr = NULL; |
| std::thread* metrics_mgr = NULL; |
| StartDummyMtrMgr(ss, mgr, metrics_mgr, common.metricsmgr_port_, "stmgr", tmasterLatch, |
| connectionCloseLatch); |
| common.ss_list_.push_back(ss); |
| common.metrics_mgr_ = mgr; |
| common.metrics_mgr_thread_ = metrics_mgr; |
| } |
| |
| void StartMetricsMgr(CommonResources& common) { StartMetricsMgr(common, NULL, NULL); } |
| |
| void TearCommonResources(CommonResources& common) { |
| delete common.topology_; |
| delete common.packing_plan_; |
| delete common.tmaster_thread_; |
| delete common.tmaster_; |
| delete common.metrics_mgr_thread_; |
| delete common.metrics_mgr_; |
| |
| // Cleanup the stream managers |
| for (size_t i = 0; i < common.stmgrs_list_.size(); ++i) { |
| delete common.stmgrs_list_[i]; |
| delete common.stmgrs_threads_list_[i]; |
| } |
| |
| for (size_t w = 0; w < common.spout_workers_list_.size(); ++w) { |
| delete common.spout_workers_list_[w]; |
| delete common.spout_workers_threads_list_[w]; |
| } |
| |
| for (size_t w = 0; w < common.bolt_workers_list_.size(); ++w) { |
| delete common.bolt_workers_list_[w]; |
| delete common.bolt_workers_threads_list_[w]; |
| } |
| |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) delete common.ss_list_[i]; |
| |
| for (auto itr = common.instanceid_instance_.begin(); |
| itr != common.instanceid_instance_.end(); ++itr) |
| delete itr->second; |
| |
| // Clean up the local filesystem state |
| FileUtils::removeRecursive(common.dpath_, true); |
| } |
| |
| void VerifyMetricsMgrTMaster(CommonResources& common) { |
| EXPECT_NE(common.metrics_mgr_->get_tmaster(), (heron::proto::tmaster::TMasterLocation*)NULL); |
| EXPECT_EQ(common.metrics_mgr_->get_tmaster()->topology_name(), common.topology_name_); |
| EXPECT_EQ(common.metrics_mgr_->get_tmaster()->topology_id(), common.topology_id_); |
| EXPECT_EQ(common.metrics_mgr_->get_tmaster()->host(), LOCALHOST); |
| EXPECT_EQ(common.metrics_mgr_->get_tmaster()->controller_port(), common.tmaster_controller_port_); |
| EXPECT_EQ(common.metrics_mgr_->get_tmaster()->master_port(), common.tmaster_port_); |
| EXPECT_EQ(common.metrics_mgr_->get_tmaster()->stats_port(), common.tmaster_stats_port_); |
| } |
| |
| // Test to make sure that the stmgr can decode the pplan |
| TEST(StMgr, test_pplan_decode) { |
| CommonResources common; |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 10000; |
| common.tmaster_controller_port_ = 10001; |
| common.tmaster_stats_port_ = 10002; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 40000; |
| common.ckptmgr_port_ = 50000; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 5; |
| common.num_spout_instances_ = 2; |
| common.num_bolts_ = 5; |
| common.num_bolt_instances_ = 2; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| sp_int8 num_workers_per_stmgr_ = (((common.num_spouts_ * common.num_spout_instances_) + |
| (common.num_bolts_ * common.num_bolt_instances_)) / |
| common.num_stmgrs_); |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // Start the stream managers |
| StartStMgrs(common); |
| |
| // Start dummy worker to make the stmgr connect to the tmaster |
| StartWorkerComponents(common, 0, 0); |
| |
| // Wait till we get the physical plan populated on atleast one of the stmgrs |
| // We just pick the first one |
| while (!common.stmgrs_list_[0]->GetPhysicalPlan()) sleep(1); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) { |
| common.stmgrs_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Verify that the pplan was decoded properly |
| const heron::proto::system::PhysicalPlan* pplan0 = common.stmgrs_list_[0]->GetPhysicalPlan(); |
| EXPECT_EQ(pplan0->stmgrs_size(), common.num_stmgrs_); |
| for (int i=0; i < common.num_stmgrs_; i++) { |
| EXPECT_NE(common.stmgr_ports_.end(), |
| std::find(common.stmgr_ports_.begin(), common.stmgr_ports_.end(), |
| pplan0->stmgrs(i).data_port())); |
| EXPECT_NE(common.local_data_ports_.end(), |
| std::find(common.stmgr_ports_.begin(), common.stmgr_ports_.end(), |
| pplan0->stmgrs(i).local_data_port())); |
| } |
| EXPECT_EQ(pplan0->instances_size(), common.num_stmgrs_ * num_workers_per_stmgr_); |
| std::map<sp_string, heron::config::PhysicalPlanHelper::TaskData> tasks; |
| heron::config::PhysicalPlanHelper::GetLocalTasks(*pplan0, common.stmgrs_id_list_[0], tasks); |
| EXPECT_EQ((int)tasks.size(), (common.num_spouts_ * common.num_spout_instances_ + |
| common.num_bolts_ * common.num_bolt_instances_) / |
| common.num_stmgrs_); |
| |
| // Delete the common resources |
| TearCommonResources(common); |
| } |
| |
| // Test to make sure that the stmgr can route data properly |
| TEST(StMgr, test_tuple_route) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 15000; |
| common.tmaster_controller_port_ = 15001; |
| common.tmaster_stats_port_ = 15002; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 45000; |
| common.ckptmgr_port_ = 55000; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 1; |
| common.num_spout_instances_ = 2; |
| common.num_bolts_ = 1; |
| common.num_bolt_instances_ = 4; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| int num_msgs_sent_by_spout_instance = 8; |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // Start the stream managers |
| StartStMgrs(common); |
| |
| // Start the dummy workers |
| StartWorkerComponents( |
| common, num_msgs_sent_by_spout_instance, |
| (num_msgs_sent_by_spout_instance * common.num_spouts_ * common.num_spout_instances_) / |
| (common.num_bolts_ * common.num_bolt_instances_)); |
| |
| // Wait for the bolt thread to complete receiving |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate. We have already waited for the bolt |
| // threads |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) { |
| common.stmgrs_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| |
| // Verification |
| // Verify that the worker got the right stream manager id |
| // EXPECT_EQ(common.stmgrs_id_list_[0], common.spout_workers_list_[0]->RecvdStMgrId()); |
| // Make sure shuffle grouping worked |
| for (size_t w = 0; w < common.bolt_workers_list_.size(); ++w) { |
| EXPECT_EQ((sp_uint64)common.bolt_workers_list_[w]->MsgsRecvd(), |
| (num_msgs_sent_by_spout_instance * common.num_spouts_ * common.num_spout_instances_) / |
| common.bolt_workers_list_.size()); |
| } |
| |
| TearCommonResources(common); |
| } |
| |
| // Test to make sure that custom grouping routing works |
| TEST(StMgr, test_custom_grouping_route) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 15500; |
| common.tmaster_controller_port_ = 15501; |
| common.tmaster_stats_port_ = 15502; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 45500; |
| common.ckptmgr_port_ = 55500; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 1; // This test will only work with 1 type of spout |
| common.num_spout_instances_ = 2; |
| common.num_bolts_ = 1; |
| common.num_bolt_instances_ = 4; |
| common.grouping_ = heron::proto::api::CUSTOM; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| int num_msgs_sent_by_spout_instance = 8; |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // Start the stream managers |
| StartStMgrs(common); |
| |
| // Start the dummy workers |
| StartWorkerComponents( |
| common, num_msgs_sent_by_spout_instance, |
| (num_msgs_sent_by_spout_instance * common.num_spouts_ * common.num_spout_instances_) / |
| (common.num_bolts_ * common.num_bolt_instances_)); |
| |
| // Wait for the bolt thread to complete receiving |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate. We have already waited for the bolt |
| // threads |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) { |
| common.stmgrs_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| |
| // Verification |
| // Make sure custom grouping worked |
| sp_int32 lowest_bolt_task_id = std::numeric_limits<sp_int32>::max() - 1; |
| sp_int32 non_zero_msgs_task_id = -1; |
| for (size_t w = 0; w < common.bolt_workers_list_.size(); ++w) { |
| if (common.bolt_workers_list_[w]->get_task_id() < lowest_bolt_task_id) { |
| lowest_bolt_task_id = common.bolt_workers_list_[w]->get_task_id(); |
| } |
| if (common.bolt_workers_list_[w]->MsgsRecvd() != 0) { |
| EXPECT_EQ( |
| common.bolt_workers_list_[w]->MsgsRecvd(), |
| (num_msgs_sent_by_spout_instance * common.num_spouts_ * common.num_spout_instances_)); |
| EXPECT_EQ(non_zero_msgs_task_id, -1); |
| non_zero_msgs_task_id = common.bolt_workers_list_[w]->get_task_id(); |
| } |
| } |
| |
| EXPECT_EQ(lowest_bolt_task_id, non_zero_msgs_task_id); |
| |
| TearCommonResources(common); |
| } |
| |
| TEST(StMgr, test_back_pressure_instance) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 17000; |
| common.tmaster_controller_port_ = 17001; |
| common.tmaster_stats_port_ = 17002; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 47000; |
| common.ckptmgr_port_ = 57000; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 2; |
| common.num_spout_instances_ = 1; |
| common.num_bolts_ = 2; |
| common.num_bolt_instances_ = 1; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| int num_msgs_sent_by_spout_instance = 100 * 1000 * 1000; // 100M |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start one regular stmgr and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss = NULL; |
| heron::stmgr::StMgr* regular_stmgr = NULL; |
| std::thread* regular_stmgr_thread = NULL; |
| StartStMgr(regular_stmgr_ss, regular_stmgr, regular_stmgr_thread, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| common.ss_list_.push_back(regular_stmgr_ss); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr->GetPhysicalPlan()) sleep(1); |
| |
| // Stop the bolt schedulers at this point so that they stop receiving |
| // This will build up back pressure |
| for (size_t i = 0; i < common.bolt_workers_list_.size(); ++i) { |
| common.bolt_workers_list_[i]->getEventLoop()->loopExit(); |
| } |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStartBPMsgs() == 0) sleep(1); |
| |
| // Now kill the bolts - at that point the back pressure should be removed |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| for (size_t w = 0; w < common.bolt_workers_list_.size(); ++w) { |
| delete common.bolt_workers_list_[w]; |
| } |
| // Clear the list so that we don't double delete in TearCommonResources |
| common.bolt_workers_list_.clear(); |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStopBPMsgs() == 0) sleep(1); |
| |
| EXPECT_EQ(dummy_stmgr->NumStopBPMsgs(), 1); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| regular_stmgr_thread->join(); |
| dummy_stmgr_thread->join(); |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| |
| // Delete the common resources |
| delete regular_stmgr_thread; |
| delete regular_stmgr; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| TearCommonResources(common); |
| } |
| |
| // Tests that spout deaths during backpressure are handled correctly |
| TEST(StMgr, test_spout_death_under_backpressure) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 17300; |
| common.tmaster_controller_port_ = 17301; |
| common.tmaster_stats_port_ = 17302; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 47300; |
| common.ckptmgr_port_ = 57300; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 1; |
| common.num_spout_instances_ = 2; |
| common.num_bolts_ = 2; |
| common.num_bolt_instances_ = 1; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| int num_msgs_sent_by_spout_instance = 100 * 1000 * 1000; // 100M |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start one regular stmgr and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss = NULL; |
| heron::stmgr::StMgr* regular_stmgr = NULL; |
| std::thread* regular_stmgr_thread = NULL; |
| StartStMgr(regular_stmgr_ss, regular_stmgr, regular_stmgr_thread, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| common.ss_list_.push_back(regular_stmgr_ss); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr->GetPhysicalPlan()) sleep(1); |
| |
| // Stop the bolt schedulers at this point so that they stop receiving |
| // This will build up back pressure |
| for (size_t i = 0; i < common.bolt_workers_list_.size(); ++i) { |
| common.bolt_workers_list_[i]->getEventLoop()->loopExit(); |
| } |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStartBPMsgs() == 0) sleep(1); |
| |
| // Now that we are in back pressure, kill the current spout |
| common.spout_workers_list_[0]->getEventLoop()->loopExit(); |
| common.spout_workers_threads_list_[0]->join(); |
| |
| // Create a new spout now |
| StartDummySpoutInstanceHelper(common, 0, 0, num_msgs_sent_by_spout_instance); |
| sleep(1); |
| |
| // First time the SM return NOTOK, and closes the old spout connection |
| EXPECT_EQ(common.spout_workers_list_.back()->GetRegisterResponseStatus(), |
| heron::proto::system::NOTOK); |
| |
| // Upon receiving NOTOK, the new spout dies the first time |
| common.spout_workers_list_.back()->getEventLoop()->loopExit(); |
| common.spout_workers_threads_list_.back()->join(); |
| |
| // Bring up the new spout again |
| StartDummySpoutInstanceHelper(common, 0, 0, num_msgs_sent_by_spout_instance); |
| sleep(1); |
| |
| // This time we expect SM to accept the new spout connection correctly |
| EXPECT_EQ(common.spout_workers_list_.back()->GetRegisterResponseStatus(), |
| heron::proto::system::OK); |
| |
| // Now kill the bolts - at that point the back pressure should be removed |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| for (size_t w = 0; w < common.bolt_workers_list_.size(); ++w) { |
| delete common.bolt_workers_list_[w]; |
| } |
| |
| // Clear the list so that we don't double delete in TearCommonResources |
| common.bolt_workers_list_.clear(); |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStopBPMsgs() == 0) sleep(1); |
| |
| EXPECT_EQ(dummy_stmgr->NumStopBPMsgs(), 1); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| regular_stmgr_thread->join(); |
| dummy_stmgr_thread->join(); |
| |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| if (common.spout_workers_threads_list_[i]->joinable()) |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| |
| // Delete the common resources |
| delete regular_stmgr_thread; |
| delete regular_stmgr; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| TearCommonResources(common); |
| } |
| |
| TEST(StMgr, test_back_pressure_stmgr) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 18000; |
| common.tmaster_controller_port_ = 18001; |
| common.tmaster_stats_port_ = 18002; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 48000; |
| common.ckptmgr_port_ = 58000; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(3); |
| common.num_spouts_ = 1; |
| common.num_spout_instances_ = 3; |
| common.num_bolts_ = 1; |
| common.num_bolt_instances_ = 3; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| // Overwrite the default values for back pressure |
| common.high_watermark_ = 1_MB; |
| common.low_watermark_ = 500_KB; |
| |
| int num_msgs_sent_by_spout_instance = 500 * 1000 * 1000; // 100M |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start two regular stmgrs and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss1 = NULL; |
| heron::stmgr::StMgr* regular_stmgr1 = NULL; |
| std::thread* regular_stmgr_thread1 = NULL; |
| |
| StartStMgr(regular_stmgr_ss1, regular_stmgr1, regular_stmgr_thread1, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| common.ss_list_.push_back(regular_stmgr_ss1); |
| |
| EventLoopImpl* regular_stmgr_ss2 = NULL; |
| heron::stmgr::StMgr* regular_stmgr2 = NULL; |
| std::thread* regular_stmgr_thread2 = NULL; |
| |
| StartStMgr(regular_stmgr_ss2, regular_stmgr2, regular_stmgr_thread2, common.tmaster_host_, |
| common.stmgr_ports_[1], common.local_data_ports_[1], |
| common.topology_name_, common.topology_id_, |
| common.topology_, common.stmgr_instance_id_list_[1], common.stmgrs_id_list_[1], |
| common.zkhostportlist_, common.dpath_, common.metricsmgr_port_, |
| common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| |
| common.ss_list_.push_back(regular_stmgr_ss2); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[2], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[2], |
| common.stmgr_instance_list_[2]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr1->GetPhysicalPlan()) sleep(1); |
| |
| // Stop regular stmgr2; at this point regular stmgr1 should send a start bp notification msg |
| regular_stmgr_ss2->loopExit(); |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStartBPMsgs() == 0) sleep(1); |
| |
| // Now kill the regular stmgr2, at this point regulat stmgr 1 should send a stop bp notification |
| regular_stmgr_thread2->join(); |
| delete regular_stmgr2; |
| delete regular_stmgr_thread2; |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStopBPMsgs() == 0) sleep(1); |
| |
| EXPECT_EQ(dummy_stmgr->NumStopBPMsgs(), 1); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| regular_stmgr_thread1->join(); |
| dummy_stmgr_thread->join(); |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| // Delete the common resources |
| delete regular_stmgr_thread1; |
| delete regular_stmgr1; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| TearCommonResources(common); |
| } |
| |
| TEST(StMgr, test_back_pressure_stmgr_reconnect) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 18500; |
| common.tmaster_controller_port_ = 18501; |
| common.tmaster_stats_port_ = 18502; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 49000; |
| common.ckptmgr_port_ = 59000; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 2; |
| common.num_spout_instances_ = 1; |
| common.num_bolts_ = 2; |
| common.num_bolt_instances_ = 1; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| int num_msgs_sent_by_spout_instance = 100 * 1000 * 1000; // 100M |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start one regular stmgr and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss = NULL; |
| heron::stmgr::StMgr* regular_stmgr = NULL; |
| std::thread* regular_stmgr_thread = NULL; |
| StartStMgr(regular_stmgr_ss, regular_stmgr, regular_stmgr_thread, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| common.ss_list_.push_back(regular_stmgr_ss); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr->GetPhysicalPlan()) sleep(1); |
| |
| // Stop the bolt schedulers at this point so that they stop receiving |
| // This will build up back pressure |
| for (size_t i = 0; i < common.bolt_workers_list_.size(); ++i) { |
| common.bolt_workers_list_[i]->getEventLoop()->loopExit(); |
| } |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStartBPMsgs() == 0) sleep(1); |
| |
| // Now disconnect the dummy stmgr and reconnect; it should get the back pressure |
| // notification again |
| dummy_stmgr_ss->loopExit(); |
| dummy_stmgr_thread->join(); |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| |
| common.stmgr_ports_[1] = 0; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Wait till we get the back pressure notification |
| while (dummy_stmgr->NumStartBPMsgs() == 0) sleep(1); |
| |
| EXPECT_EQ(dummy_stmgr->NumStartBPMsgs(), 1); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| regular_stmgr_thread->join(); |
| dummy_stmgr_thread->join(); |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Delete the common resources |
| delete regular_stmgr_thread; |
| delete regular_stmgr; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| TearCommonResources(common); |
| } |
| |
| TEST(StMgr, test_tmaster_restart_on_new_address) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 18500; |
| common.tmaster_controller_port_ = 18501; |
| common.tmaster_stats_port_ = 18502; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 49001; |
| common.ckptmgr_port_ = 59001; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 2; |
| common.num_spout_instances_ = 1; |
| common.num_bolts_ = 2; |
| common.num_bolt_instances_ = 1; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| int num_msgs_sent_by_spout_instance = 100 * 1000 * 1000; // 100M |
| |
| // A countdown latch to wait on, until metric mgr receives tmaster location |
| // The count is 4 here, since we need to ensure it is sent twice for stmgr: once at |
| // start, and once after receiving new tmaster location. Plus 2 from tmaster, total 4. |
| // 5-4=1 is used to avoid countdown on 0 |
| CountDownLatch* metricsMgrTmasterLatch = new CountDownLatch(5); |
| |
| // Start the metrics mgr, common.ss_list_[0] |
| StartMetricsMgr(common, metricsMgrTmasterLatch, NULL); |
| |
| // Start the tmaster etc. common.ss_list_[1] |
| StartTMaster(common); |
| |
| // Check the count: should be 5-1=4 |
| // The Tmaster sends its location to MetircsMgr when MetircsMgrClient initializes. |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(4, std::chrono::seconds(5))); |
| EXPECT_EQ(static_cast<sp_uint32>(4), metricsMgrTmasterLatch->getCount()); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start one regular stmgr and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss = NULL; |
| heron::stmgr::StMgr* regular_stmgr = NULL; |
| std::thread* regular_stmgr_thread = NULL; |
| StartStMgr(regular_stmgr_ss, regular_stmgr, regular_stmgr_thread, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| // common.ss_list_[2] |
| common.ss_list_.push_back(regular_stmgr_ss); |
| |
| // Check the count: should be 4-1=3 |
| // The Stmgr sends Tmaster location to MetricsMgr when MetircsMgrClient initializes |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(3, std::chrono::seconds(5))); |
| EXPECT_EQ(static_cast<sp_uint32>(3), metricsMgrTmasterLatch->getCount()); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| // common.ss_list_[3] |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr->GetPhysicalPlan()) sleep(1); |
| |
| // Kill current tmaster |
| common.ss_list_[1]->loopExit(); |
| common.tmaster_thread_->join(); |
| delete common.tmaster_; |
| delete common.tmaster_thread_; |
| |
| // Killing dummy stmgr so that we can restart it on another port, to change |
| // the physical plan. |
| dummy_stmgr_ss->loopExit(); |
| dummy_stmgr_thread->join(); |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| |
| // Change the tmaster port |
| common.tmaster_port_ = 18511; |
| |
| // Start new dummy stmgr at different port, to generate a differnt pplan that we |
| // can verify |
| common.stmgr_ports_[1] = 0; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start tmaster on a different port |
| StartTMaster(common); |
| |
| // This confirms that metrics manager received the new tmaster location |
| // Tmaster sends its location to MetricsMgr when MetricsMgrClient initialize: 3-1=2 |
| // Stmgr-0 watches new tmaster location and sends it to MetricsMgr: 2-1=1 |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(1, std::chrono::seconds(5))); |
| EXPECT_EQ(static_cast<sp_uint32>(1), metricsMgrTmasterLatch->getCount()); |
| |
| // Now wait until stmgr receives the new physical plan |
| // No easy way to avoid sleep here. |
| sleep(2); |
| |
| // Ensure that Stmgr connected to the new tmaster and has received new physical plan |
| if (regular_stmgr->GetPhysicalPlan()->stmgrs(1).data_port() != common.stmgr_ports_[1]) { |
| CHECK_EQ(regular_stmgr->GetPhysicalPlan()->stmgrs(0).data_port(), common.stmgr_ports_[1]); |
| } |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| regular_stmgr_thread->join(); |
| dummy_stmgr_thread->join(); |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Delete the common resources |
| delete regular_stmgr_thread; |
| delete regular_stmgr; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| delete metricsMgrTmasterLatch; |
| TearCommonResources(common); |
| } |
| |
| TEST(StMgr, test_tmaster_restart_on_same_address) { |
| CommonResources common; |
| |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 18500; |
| common.tmaster_controller_port_ = 18501; |
| common.tmaster_stats_port_ = 18502; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 49002; |
| common.ckptmgr_port_ = 59002; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 2; |
| common.num_spout_instances_ = 1; |
| common.num_bolts_ = 2; |
| common.num_bolt_instances_ = 1; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| int num_msgs_sent_by_spout_instance = 100 * 1000 * 1000; // 100M |
| |
| // A countdown latch to wait on, until metric mgr receives tmaster location |
| // The count is 2 here for stmgr, since we need to ensure it is sent twice: once at |
| // start, and once after receiving new tmaster location |
| CountDownLatch* metricsMgrTmasterLatch = new CountDownLatch(5); |
| |
| // Start the metrics mgr |
| StartMetricsMgr(common, metricsMgrTmasterLatch, NULL); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Check the count: should be 5-1=4 |
| // Tmaster send its location to MetricsMgr when MetricsMgrClient initializes |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(4, std::chrono::seconds(5))); |
| EXPECT_EQ(static_cast<sp_uint32>(4), metricsMgrTmasterLatch->getCount()); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start one regular stmgr and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss = NULL; |
| heron::stmgr::StMgr* regular_stmgr = NULL; |
| std::thread* regular_stmgr_thread = NULL; |
| StartStMgr(regular_stmgr_ss, regular_stmgr, regular_stmgr_thread, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| common.ss_list_.push_back(regular_stmgr_ss); |
| |
| // Check the count: should be 4-1=3 |
| // Stmgr-0 sends tmaster location to MetrcisMgr when MetricsMgrClient initializes. |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(3, std::chrono::seconds(5))); |
| EXPECT_EQ(static_cast<sp_uint32>(3), metricsMgrTmasterLatch->getCount()); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr->GetPhysicalPlan()) sleep(1); |
| |
| // Kill current tmaster |
| common.ss_list_[1]->loopExit(); |
| common.tmaster_thread_->join(); |
| delete common.tmaster_; |
| delete common.tmaster_thread_; |
| |
| // Killing dummy stmgr so that we can restart it on another port, to change |
| // the physical plan. |
| dummy_stmgr_ss->loopExit(); |
| dummy_stmgr_thread->join(); |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| |
| // Start new dummy stmgr at different port, to generate a differnt pplan that we |
| // can verify |
| sp_int32 stmgr_port_old = common.stmgr_ports_[1]; |
| common.stmgr_ports_[1] = 0; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start tmaster on a different port |
| StartTMaster(common); |
| |
| // This confirms that metrics manager received the new tmaster location |
| // Check the count: should be 3-2=1 |
| // Tmaster sends its location when MetricsMgrClient initialize |
| // Stmgr-0 watches and sends tmaster location |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(1, std::chrono::seconds(5))); |
| EXPECT_EQ(static_cast<sp_uint32>(1), metricsMgrTmasterLatch->getCount()); |
| |
| // Now wait until stmgr receives the new physical plan. |
| // No easy way to avoid sleep here. |
| // Note: Here we sleep longer compared to the previous test as we need |
| // to tmasterClient could take upto 1 second (specified in test_heron_internals.yaml) |
| // to retry connecting to tmaster. |
| int retries = 30; |
| while (regular_stmgr->GetPhysicalPlan()->stmgrs(1).data_port() == stmgr_port_old |
| && retries--) |
| sleep(1); |
| |
| // Ensure that Stmgr connected to the new tmaster and has received new physical plan |
| CHECK_EQ(regular_stmgr->GetPhysicalPlan()->stmgrs(1).data_port(), common.stmgr_ports_[1]); |
| CHECK_EQ(regular_stmgr->GetPhysicalPlan()->stmgrs(1).local_data_port(), |
| common.local_data_ports_[1]); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| common.metrics_mgr_thread_->join(); |
| regular_stmgr_thread->join(); |
| dummy_stmgr_thread->join(); |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Delete the common resources |
| delete regular_stmgr_thread; |
| delete regular_stmgr; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| delete metricsMgrTmasterLatch; |
| TearCommonResources(common); |
| } |
| |
| // This tests to make sure that metrics mgr upon reconnect |
| // will get the tmaster location |
| TEST(StMgr, test_metricsmgr_reconnect) { |
| CommonResources common; |
| // Initialize dummy params |
| common.tmaster_host_ = LOCALHOST; |
| common.tmaster_port_ = 19000; |
| common.tmaster_controller_port_ = 19001; |
| common.tmaster_stats_port_ = 19002; |
| common.metricsmgr_port_ = 0; |
| common.shell_port_ = 49500; |
| common.ckptmgr_port_ = 59500; |
| common.ckptmgr_id_ = "ckptmgr"; |
| common.topology_name_ = "mytopology"; |
| common.topology_id_ = "abcd-9999"; |
| common.setNumStmgrs(2); |
| common.num_spouts_ = 2; |
| common.num_spout_instances_ = 1; |
| common.num_bolts_ = 2; |
| common.num_bolt_instances_ = 1; |
| common.grouping_ = heron::proto::api::SHUFFLE; |
| // Empty so that we don't attempt to connect to the zk |
| // but instead connect to the local filesytem |
| common.zkhostportlist_ = ""; |
| |
| int num_msgs_sent_by_spout_instance = 100 * 1000 * 1000; // 100M |
| |
| // A countdown latch to wait on, until metric mgr receives tmaster location |
| CountDownLatch* metricsMgrTmasterLatch = new CountDownLatch(1); |
| // A countdown latch to wait on metrics manager to close connnection. |
| CountDownLatch* metricsMgrConnectionCloseLatch = new CountDownLatch(1); |
| // Start the metrics mgr |
| StartMetricsMgr(common, metricsMgrTmasterLatch, metricsMgrConnectionCloseLatch); |
| |
| // lets remember this |
| EventLoopImpl* mmgr_ss = common.ss_list_.back(); |
| |
| // Start the tmaster etc. |
| StartTMaster(common); |
| |
| // Distribute workers across stmgrs |
| DistributeWorkersAcrossStmgrs(common); |
| |
| // We'll start one regular stmgr and one dummy stmgr |
| EventLoopImpl* regular_stmgr_ss = NULL; |
| heron::stmgr::StMgr* regular_stmgr = NULL; |
| std::thread* regular_stmgr_thread = NULL; |
| StartStMgr(regular_stmgr_ss, regular_stmgr, regular_stmgr_thread, common.tmaster_host_, |
| common.stmgr_ports_[0], common.local_data_ports_[0], |
| common.topology_name_, common.topology_id_, common.topology_, |
| common.stmgr_instance_id_list_[0], common.stmgrs_id_list_[0], common.zkhostportlist_, |
| common.dpath_, common.metricsmgr_port_, common.shell_port_, common.ckptmgr_port_, |
| common.ckptmgr_id_, common.high_watermark_, common.low_watermark_); |
| common.ss_list_.push_back(regular_stmgr_ss); |
| |
| // Start a dummy stmgr |
| EventLoopImpl* dummy_stmgr_ss = NULL; |
| DummyStMgr* dummy_stmgr = NULL; |
| std::thread* dummy_stmgr_thread = NULL; |
| StartDummyStMgr(dummy_stmgr_ss, dummy_stmgr, dummy_stmgr_thread, common.stmgr_ports_[1], |
| common.tmaster_port_, common.shell_port_, common.stmgrs_id_list_[1], |
| common.stmgr_instance_list_[1]); |
| common.ss_list_.push_back(dummy_stmgr_ss); |
| |
| // Start the dummy workers |
| StartWorkerComponents(common, num_msgs_sent_by_spout_instance, num_msgs_sent_by_spout_instance); |
| |
| // Wait till we get the physical plan populated on the stmgr. That way we know the |
| // workers have connected |
| while (!regular_stmgr->GetPhysicalPlan()) sleep(1); |
| |
| // wait until metrics mgr also get time to get tmaster location |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(0, std::chrono::seconds(5))); |
| |
| // Check that metricsmgr got it |
| VerifyMetricsMgrTMaster(common); |
| |
| // Kill the metrics mgr |
| for (auto iter = common.ss_list_.begin(); iter != common.ss_list_.end(); ++iter) { |
| if (*iter == mmgr_ss) { |
| common.ss_list_.erase(iter); |
| break; |
| } |
| } |
| |
| // Exiting the loop without stopping, will not close the connection |
| // between the stmgr and metrics mgr (since it is run a dummy thread |
| // instead of an actual process). When a new metrics mgr comes up, |
| // it will miss any register requests from stmgr, while stmgr keeps trying. |
| // Hence expliciltly stopping to force connection close. |
| common.metrics_mgr_->Stop(); |
| |
| // Stopping the server will enqueue connnection cleanup callbacks |
| // to the select server. To ensure they get executed gracefully, |
| // need to wait until metrics mgr notifies us. |
| EXPECT_TRUE(metricsMgrConnectionCloseLatch->wait(0, std::chrono::seconds(5))); |
| mmgr_ss->loopExit(); |
| common.metrics_mgr_thread_->join(); |
| delete common.metrics_mgr_thread_; |
| common.metrics_mgr_thread_ = NULL; |
| delete common.metrics_mgr_; |
| common.metrics_mgr_ = NULL; |
| delete metricsMgrTmasterLatch; |
| delete metricsMgrConnectionCloseLatch; |
| |
| metricsMgrTmasterLatch = new CountDownLatch(1); |
| metricsMgrConnectionCloseLatch = new CountDownLatch(1); |
| // Start the metrics mgr again |
| StartMetricsMgr(common, metricsMgrTmasterLatch, metricsMgrConnectionCloseLatch); |
| EXPECT_TRUE(metricsMgrTmasterLatch->wait(0, std::chrono::seconds(5))); |
| |
| // Check that metricsmgr got it |
| VerifyMetricsMgrTMaster(common); |
| |
| // Stop the schedulers |
| for (size_t i = 0; i < common.ss_list_.size(); ++i) { |
| common.ss_list_[i]->loopExit(); |
| } |
| |
| // Wait for the threads to terminate |
| common.tmaster_thread_->join(); |
| regular_stmgr_thread->join(); |
| dummy_stmgr_thread->join(); |
| common.metrics_mgr_thread_->join(); |
| for (size_t i = 0; i < common.spout_workers_threads_list_.size(); ++i) { |
| common.spout_workers_threads_list_[i]->join(); |
| } |
| for (size_t i = 0; i < common.bolt_workers_threads_list_.size(); ++i) { |
| common.bolt_workers_threads_list_[i]->join(); |
| } |
| |
| // Delete the common resources |
| delete regular_stmgr_thread; |
| delete regular_stmgr; |
| delete dummy_stmgr_thread; |
| delete dummy_stmgr; |
| delete metricsMgrTmasterLatch; |
| delete metricsMgrConnectionCloseLatch; |
| TearCommonResources(common); |
| } |
| |
| // Test PatchPhysicalPlanWithHydratedTopology function |
| TEST(StMgr, test_PatchPhysicalPlanWithHydratedTopology) { |
| int32_t nSpouts = 2; |
| int32_t nSpoutInstances = 1; |
| int32_t nBolts = 3; |
| int32_t nBoltInstances = 1; |
| heron::proto::api::Topology* topology = |
| GenerateDummyTopology("topology_name", |
| "topology_id", |
| nSpouts, nSpoutInstances, nBolts, nBoltInstances, |
| heron::proto::api::SHUFFLE); |
| |
| heron::proto::system::PhysicalPlan* pplan = new heron::proto::system::PhysicalPlan(); |
| pplan->mutable_topology()->CopyFrom(*topology); |
| |
| // Verify initial values |
| EXPECT_EQ( |
| heron::config::TopologyConfigHelper::GetTopologyConfigValue( |
| *topology, |
| heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, |
| ""), |
| "30"); |
| EXPECT_EQ( |
| heron::config::TopologyConfigHelper::GetTopologyConfigValue( |
| pplan->topology(), |
| heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, |
| ""), |
| "30"); |
| // Change runtime data in PhysicalPlan and patch it |
| std::map<std::string, std::string> update; |
| update["conf.new"] = "test"; |
| update[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS] = "10"; |
| heron::config::TopologyConfigHelper::SetTopologyRuntimeConfig(pplan->mutable_topology(), update); |
| |
| // Verify updated runtime data is still in the patched physical plan |
| // The topology in the physical plan should have the old name |
| EXPECT_EQ( |
| heron::config::TopologyConfigHelper::GetTopologyConfigValue( |
| *topology, |
| heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, |
| ""), |
| "30"); // The internal topology object should still have the initial value |
| EXPECT_EQ( |
| heron::config::TopologyConfigHelper::GetTopologyConfigValue( |
| pplan->topology(), |
| heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, |
| ""), |
| "10"); // The topology object in the physical plan should have the new value |
| EXPECT_EQ( |
| heron::config::TopologyConfigHelper::GetTopologyConfigValue( |
| pplan->topology(), "conf.new", ""), |
| "test"); // The topology object in the physical plan should have the new config |
| |
| delete pplan; |
| } |
| |
| int main(int argc, char** argv) { |
| heron::common::Initialize(argv[0]); |
| std::cout << "Current working directory (to find stmgr logs) " |
| << ProcessUtils::getCurrentWorkingDirectory() << std::endl; |
| testing::InitGoogleTest(&argc, argv); |
| if (argc > 1) { |
| std::cerr << "Using config file " << argv[1] << std::endl; |
| heron_internals_config_filename = argv[1]; |
| } |
| return RUN_ALL_TESTS(); |
| } |