| /* |
| * Copyright 2015 Twitter, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "manager/stmgr-clientmgr.h" |
| #include <iostream> |
| #include <map> |
| #include <unordered_set> |
| #include "manager/stmgr.h" |
| #include "manager/stmgr-client.h" |
| #include "proto/messages.h" |
| #include "basics/basics.h" |
| #include "errors/errors.h" |
| #include "threads/threads.h" |
| #include "network/network.h" |
| #include "config/heron-internals-config-reader.h" |
| #include "metrics/metrics.h" |
| |
| namespace heron { |
| namespace stmgr { |
| |
| // New connections made with other stream managers. |
| const sp_string METRIC_STMGR_NEW_CONNECTIONS = "__stmgr_new_connections"; |
| |
| StMgrClientMgr::StMgrClientMgr(EventLoop* eventLoop, const sp_string& _topology_name, |
| const sp_string& _topology_id, const sp_string& _stmgr_id, |
| StMgr* _stream_manager, |
| heron::common::MetricsMgrSt* _metrics_manager_client, |
| sp_int64 _high_watermark, sp_int64 _low_watermark) |
| : topology_name_(_topology_name), |
| topology_id_(_topology_id), |
| stmgr_id_(_stmgr_id), |
| eventLoop_(eventLoop), |
| stream_manager_(_stream_manager), |
| metrics_manager_client_(_metrics_manager_client), |
| high_watermark_(_high_watermark), |
| low_watermark_(_low_watermark) { |
| stmgr_clientmgr_metrics_ = new heron::common::MultiCountMetric(); |
| metrics_manager_client_->register_metric("__clientmgr", stmgr_clientmgr_metrics_); |
| } |
| |
| StMgrClientMgr::~StMgrClientMgr() { |
| // This should not be called |
| metrics_manager_client_->unregister_metric("__clientmgr"); |
| delete stmgr_clientmgr_metrics_; |
| } |
| |
| void StMgrClientMgr::NewPhysicalPlan(const proto::system::PhysicalPlan* _pplan) { |
| // TODO(vikasr) : Currently we establish connections with all streammanagers |
| // In the next iteration we might want to make it better |
| std::unordered_set<sp_string> all_stmgrs; |
| for (sp_int32 i = 0; i < _pplan->stmgrs_size(); ++i) { |
| const proto::system::StMgr& s = _pplan->stmgrs(i); |
| if (s.id() == stmgr_id_) { |
| continue; // dont want to connect to ourselves |
| } |
| all_stmgrs.insert(s.id()); |
| if (clients_.find(s.id()) != clients_.end()) { |
| // We already have a connection for this stmgr. |
| // Just make sure we have it for the same host/port |
| const NetworkOptions& o = clients_[s.id()]->get_clientoptions(); |
| if (o.get_host() != s.host_name() || o.get_port() != s.data_port()) { |
| LOG(INFO) << "Stmgr " << s.id() << " changed from " << o.get_host() << ":" << o.get_port() |
| << " to " << s.host_name() << ":" << s.data_port(); |
| // This stmgr has actually moved to a different host/port |
| clients_[s.id()]->Quit(); // this will delete itself. |
| clients_[s.id()] = CreateClient(s.id(), s.host_name(), s.data_port()); |
| instance_stats_[s.id()].clear(); |
| } else { |
| // This stmgr has remained the same. Don't do anything |
| } |
| } else { |
| // We don't have any connection to this stmgr. |
| LOG(INFO) << "Stmgr " << s.id() << " came on " << s.host_name() << ":" << s.data_port(); |
| clients_[s.id()] = CreateClient(s.id(), s.host_name(), s.data_port()); |
| } |
| } |
| |
| // We need to remove any unused ports |
| std::unordered_set<sp_string> to_remove; |
| for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { |
| if (all_stmgrs.find(iter->first) == all_stmgrs.end()) { |
| // This stmgr is no longer there in the physical map |
| to_remove.insert(iter->first); |
| } |
| } |
| |
| // Now go over to_remove to remove all the unused stmgrs |
| for (auto iter = to_remove.begin(); iter != to_remove.end(); ++iter) { |
| LOG(INFO) << "Stmgr " << *iter << " no longer required"; |
| clients_[*iter]->Quit(); // This will delete itself. |
| clients_.erase(*iter); |
| instance_stats_.erase(*iter); |
| } |
| } |
| |
| StMgrClient* StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id, |
| const sp_string& _hostname, sp_int32 _port) { |
| stmgr_clientmgr_metrics_->scope(METRIC_STMGR_NEW_CONNECTIONS)->incr(); |
| NetworkOptions options; |
| options.set_host(_hostname); |
| options.set_port(_port); |
| options.set_max_packet_size(config::HeronInternalsConfigReader::Instance() |
| ->GetHeronStreammgrNetworkOptionsMaximumPacketMb() * 1_MB); |
| options.set_high_watermark(high_watermark_); |
| options.set_low_watermark(low_watermark_); |
| options.set_socket_family(PF_INET); |
| StMgrClient* client = new StMgrClient(eventLoop_, options, topology_name_, topology_id_, |
| stmgr_id_, _other_stmgr_id, this, metrics_manager_client_); |
| client->Start(); |
| return client; |
| } |
| |
| sp_int32 StMgrClientMgr::FindBusiestTaskOnStmgr(const sp_string& _stmgr_id) { |
| CHECK(instance_stats_.find(_stmgr_id) != instance_stats_.end()); |
| sp_int32 task_id; |
| sp_int64 max = 0; |
| for (auto iter = instance_stats_[_stmgr_id].begin(); |
| iter!= instance_stats_[_stmgr_id].end(); |
| iter++) { |
| if (iter->second > max) { |
| task_id = iter->first; |
| max = iter->second; |
| } |
| } |
| return task_id; |
| } |
| |
| void StMgrClientMgr::SendTupleStreamMessage(sp_int32 _task_id, const sp_string& _stmgr_id, |
| const proto::system::HeronTupleSet2& _msg) { |
| auto iter = clients_.find(_stmgr_id); |
| CHECK(iter != clients_.end()); |
| |
| instance_stats_[_stmgr_id][_task_id] += _msg.GetCachedSize(); |
| // Acquire the message |
| proto::stmgr::TupleStreamMessage2* out = nullptr; |
| out = clients_[_stmgr_id]->acquire(out); |
| out->set_task_id(_task_id); |
| _msg.SerializePartialToString(out->mutable_set()); |
| |
| clients_[_stmgr_id]->SendTupleStreamMessage(*out); |
| |
| // Release the message |
| clients_[_stmgr_id]->release(out); |
| } |
| |
| void StMgrClientMgr::StartBackPressureOnServer(const sp_string& _other_stmgr_id) { |
| stream_manager_->StartBackPressureOnServer(_other_stmgr_id); |
| } |
| |
| void StMgrClientMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) { |
| // Call the StMgrServers removeBackPressure method |
| stream_manager_->StopBackPressureOnServer(_other_stmgr_id); |
| instance_stats_.clear(); |
| } |
| |
| void StMgrClientMgr::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) { |
| for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { |
| iter->second->SendStartBackPressureMessage(_task_id); |
| } |
| } |
| |
| void StMgrClientMgr::SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id) { |
| for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { |
| iter->second->SendStopBackPressureMessage(_task_id); |
| } |
| } |
| |
| } // namespace stmgr |
| } // namespace heron |