blob: 2e8e7544d1c0a84268e2b226bbfeeef3a974d3ae [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_H_
#define SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_H_
#include <list>
#include <map>
#include <unordered_map>
#include <utility>
#include <vector>
#include <chrono>
#include <typeindex>
#include "proto/messages.h"
#include "network/network.h"
#include "basics/basics.h"
namespace heron {
namespace common {
class HeronStateMgr;
class MetricsMgrSt;
class MultiAssignableMetric;
}
}
namespace heron {
namespace stmgr {
class StMgrServer;
class StMgrClientMgr;
class TMasterClient;
class StreamConsumers;
class XorManager;
class TupleCache;
class StMgr {
public:
StMgr(EventLoop* eventLoop, const sp_string& _myhost, sp_int32 _myport,
const sp_string& _topology_name, const sp_string& _topology_id,
proto::api::Topology* _topology, const sp_string& _stmgr_id,
const std::vector<sp_string>& _instances, const sp_string& _zkhostport,
const sp_string& _zkroot, sp_int32 _metricsmgr_port, sp_int32 _shell_port,
sp_int64 _high_watermark, sp_int64 _low_watermark);
virtual ~StMgr();
// All kinds of initialization like starting servers and clients
void Init();
// Called by tmaster client when a new physical plan is available
void NewPhysicalPlan(proto::system::PhysicalPlan* pplan);
void HandleStreamManagerData(const sp_string& _stmgr_id,
const proto::stmgr::TupleStreamMessage2& _message);
void HandleInstanceData(sp_int32 _task_id, bool _local_spout,
proto::system::HeronTupleSet* _message);
void DrainInstanceData(sp_int32 _task_id, proto::system::HeronTupleSet2* _tuple);
const proto::system::PhysicalPlan* GetPhysicalPlan() const;
// Forward the call to the StmgrServer
virtual void StartBackPressureOnServer(const sp_string& _other_stmgr_id);
// Forward the call to the StmgrServer
virtual void StopBackPressureOnServer(const sp_string& _other_stmgr_id);
// Used by the server to tell the client to send the back pressure related
// messages
void SendStartBackPressureToOtherStMgrs();
void SendStopBackPressureToOtherStMgrs();
void StartTMasterClient();
bool DidAnnounceBackPressure();
private:
void OnTMasterLocationFetch(proto::tmaster::TMasterLocation* _tmaster, proto::system::StatusCode);
void OnMetricsCacheLocationFetch(
proto::tmaster::MetricsCacheLocation* _tmaster, proto::system::StatusCode);
void FetchTMasterLocation();
void FetchMetricsCacheLocation();
// A wrapper that calls FetchTMasterLocation. Needed for RegisterTimer
void CheckTMasterLocation(EventLoop::Status);
void UpdateProcessMetrics(EventLoop::Status);
void CleanupStreamConsumers();
void PopulateStreamConsumers(
proto::api::Topology* _topology,
const std::map<sp_string, std::vector<sp_int32> >& _component_to_task_ids);
void PopulateXorManagers(
const proto::api::Topology& _topology, sp_int32 _message_timeout,
const std::map<sp_string, std::vector<sp_int32> >& _component_to_task_ids);
void CleanupXorManagers();
void SendInBound(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
void ProcessAcksAndFails(sp_int32 _task_id, const proto::system::HeronControlTupleSet& _control);
void CopyDataOutBound(sp_int32 _src_task_id, bool _local_spout,
const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple,
const std::vector<sp_int32>& _out_tasks);
void CopyControlOutBound(const proto::system::AckTuple& _control, bool _is_fail);
sp_int32 ExtractTopologyTimeout(const proto::api::Topology& _topology);
void CreateTMasterClient(proto::tmaster::TMasterLocation* tmasterLocation);
void StartStmgrServer();
void CreateTupleCache();
// This is called when we receive a valid new Tmaster Location.
// Performs all the actions necessary to deal with new tmaster.
void HandleNewTmaster(proto::tmaster::TMasterLocation* newTmasterLocation);
// Broadcast the tmaster location changes to other components. (MM for now)
void BroadcastTmasterLocation(proto::tmaster::TMasterLocation* tmasterLocation);
void BroadcastMetricsCacheLocation(proto::tmaster::MetricsCacheLocation* tmasterLocation);
heron::common::HeronStateMgr* state_mgr_;
proto::system::PhysicalPlan* pplan_;
sp_string topology_name_;
sp_string topology_id_;
sp_string stmgr_id_;
sp_string stmgr_host_;
sp_int32 stmgr_port_;
std::vector<sp_string> instances_;
// Getting data from other streammgrs
// Also used to get/send data to local instances
StMgrServer* server_;
// Pushing data to other streammanagers
StMgrClientMgr* clientmgr_;
TMasterClient* tmaster_client_;
EventLoop* eventLoop_;
// Map of task_id to stmgr_id
std::unordered_map<sp_int32, sp_string> task_id_to_stmgr_;
// map of <component, streamid> to its consumers
std::unordered_map<std::pair<sp_string, sp_string>, StreamConsumers*> stream_consumers_;
// xor managers
XorManager* xor_mgrs_;
// Tuple Cache to optimize message building
TupleCache* tuple_cache_;
// This is the topology structure
// that contains the full component objects
proto::api::Topology* hydrated_topology_;
// Metrics Manager
heron::common::MetricsMgrSt* metrics_manager_client_;
// Process related metrics
heron::common::MultiAssignableMetric* stmgr_process_metrics_;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
sp_string zkhostport_;
sp_string zkroot_;
sp_int32 metricsmgr_port_;
sp_int32 shell_port_;
proto::system::HeronTupleSet2 current_control_tuple_set_;
std::vector<sp_int32> out_tasks_;
bool is_acking_enabled;
proto::system::HeronTupleSet2* tuple_set_from_other_stmgr_;
sp_string heron_tuple_set_2_ = "heron.proto.system.HeronTupleSet2";
sp_int64 high_watermark_;
sp_int64 low_watermark_;
};
} // namespace stmgr
} // namespace heron
#endif // SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_H_