blob: c158792ca9fca12f48831fbae0d05bdd3720a0b8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include <list>
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <chrono>
#include <typeindex>
#include "proto/messages.h"
#include "network/network.h"
#include "basics/basics.h"
#include "config/topology-config-vars.h"
namespace heron {
namespace common {
class HeronStateMgr;
class MetricsMgrSt;
class MultiAssignableMetric;
class CountMetric;
class MultiCountMetric;
class TimeSpentMetric;
namespace heron {
namespace stmgr {
using std::shared_ptr;
class StMgrServer;
class InstanceServer;
class StMgrClientMgr;
class TMasterClient;
class StreamConsumers;
class XorManager;
class TupleCache;
class NeighbourCalculator;
class StatefulRestorer;
class CkptMgrClient;
class StMgr {
StMgr(shared_ptr<EventLoop> eventLoop, const sp_string& _myhost, sp_int32 _data_port,
sp_int32 _local_data_port,
const sp_string& _topology_name, const sp_string& _topology_id,
shared_ptr<proto::api::Topology> _topology, const sp_string& _stmgr_id,
const std::vector<sp_string>& _instances, const sp_string& _zkhostport,
const sp_string& _zkroot, sp_int32 _metricsmgr_port, sp_int32 _shell_port,
sp_int32 _ckptmgr_port, const sp_string& _ckptmgr_id,
sp_int64 _high_watermark, sp_int64 _low_watermark,
const sp_string& _metricscachemgr_mode);
virtual ~StMgr();
// All kinds of initialization like starting servers and clients
void Init();
// Called by tmaster client when a new physical plan is available
void NewPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> pplan);
void HandleStreamManagerData(const sp_string& _stmgr_id,
unique_ptr<proto::stmgr::TupleStreamMessage> _message);
void HandleInstanceData(sp_int32 _task_id, bool _local_spout,
unique_ptr<proto::system::HeronTupleSet> _message);
// Called when an instance does checkpoint and sends its checkpoint
// to the stmgr to save it
void HandleStoreInstanceStateCheckpoint(
const proto::ckptmgr::InstanceStateCheckpoint& _message,
const proto::system::Instance& _instance);
void DrainInstanceData(sp_int32 _task_id, proto::system::HeronTupleSet2* _tuple);
// Send checkpoint message to this task_id
void DrainDownstreamCheckpoint(sp_int32 _task_id,
proto::ckptmgr::DownstreamStatefulCheckpoint* _message);
const shared_ptr<proto::system::PhysicalPlan> GetPhysicalPlan() const;
// Forward the call to the StmgrServer
virtual void StartBackPressureOnServer(const sp_string& _other_stmgr_id);
// Forward the call to the StmgrServer
virtual void StopBackPressureOnServer(const sp_string& _other_stmgr_id);
// Used by the server to tell the client to send the back pressure related
// messages
void SendStartBackPressureToOtherStMgrs();
void SendStopBackPressureToOtherStMgrs();
void StartBackPressureOnSpouts();
void AttemptStopBackPressureFromSpouts();
void StartTMasterClient();
bool DidAnnounceBackPressure();
bool DidOthersAnnounceBackPressure();
const NetworkOptions& GetStmgrServerNetworkOptions() const;
const NetworkOptions& GetInstanceServerNetworkOptions() const;
void HandleDeadStMgrConnection(const sp_string& _stmgr);
void HandleAllStMgrClientsRegistered();
void HandleDeadInstance(sp_int32 _task_id);
void HandleAllInstancesConnected();
void HandleCkptMgrRegistration();
// Handle checkpoint message coming from an upstream task to a downstream task
void HandleDownStreamStatefulCheckpoint(
const proto::ckptmgr::DownstreamStatefulCheckpoint& _message);
// Handle RestoreInstanceStateResponse message from local instance
void HandleRestoreInstanceStateResponse(sp_int32 _task_id, const proto::system::Status& _status,
const std::string& _checkpoint_id);
void OnTMasterLocationFetch(shared_ptr<proto::tmaster::TMasterLocation> _tmaster,
void OnMetricsCacheLocationFetch(
shared_ptr<proto::tmaster::MetricsCacheLocation> _tmaster, proto::system::StatusCode);
void FetchTMasterLocation();
void FetchMetricsCacheLocation();
// A wrapper that calls FetchTMasterLocation. Needed for RegisterTimer
void CheckTMasterLocation(EventLoop::Status);
void UpdateUptimeMetric();
void UpdateProcessMetrics(EventLoop::Status);
// Utility function to create checkpoint mgr client
void CreateCheckpointMgrClient();
// Called when ckpt mgr saves the state of an instance
void HandleSavedInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id);
// Called when ckpt mgr reteives the state of an instance
void HandleGetInstanceState(proto::system::StatusCode _status, sp_int32 _task_id,
sp_string _checkpoint_id,
const proto::ckptmgr::InstanceStateCheckpoint& _msg);
void CleanupStreamConsumers();
void PopulateStreamConsumers(
proto::api::Topology* _topology,
const std::map<sp_string, std::vector<sp_int32> >& _component_to_task_ids);
void PopulateXorManagers(
const proto::api::Topology& _topology, sp_int32 _message_timeout,
const std::map<sp_string, std::vector<sp_int32> >& _component_to_task_ids);
void CleanupXorManagers();
void SendInBound(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
void ProcessAcksAndFails(sp_int32 _src_task_id,
sp_int32 _task_id, const proto::system::HeronControlTupleSet& _control);
void CopyDataOutBound(sp_int32 _src_task_id, bool _local_spout,
const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple,
const std::vector<sp_int32>& _out_tasks);
void CopyControlOutBound(sp_int32 _src_task_id,
const proto::system::AckTuple& _control, bool _is_fail);
sp_int32 ExtractTopologyTimeout(const proto::api::Topology& _topology);
void CreateTMasterClient(shared_ptr<proto::tmaster::TMasterLocation> tmasterLocation);
void StartStmgrServer();
void StartInstanceServer();
void CreateTupleCache();
// This is called when we receive a valid new Tmaster Location.
// Performs all the actions necessary to deal with new tmaster.
void HandleNewTmaster(shared_ptr<proto::tmaster::TMasterLocation> newTmasterLocation);
// Broadcast the tmaster location changes to other components. (MM for now)
void BroadcastTmasterLocation(shared_ptr<proto::tmaster::TMasterLocation> tmasterLocation);
void BroadcastMetricsCacheLocation(
shared_ptr<proto::tmaster::MetricsCacheLocation> tmasterLocation);
// Called when TMaster sends a InitiateStatefulCheckpoint message with a checkpoint_id
// This will send intiate checkpoint messages to local instances to capture their state.
void InitiateStatefulCheckpoint(sp_string checkpoint_id);
// Invoked when TMaster asks us to restore all our local instances state to
// the checkpoint represented by _checkpoint_id. This starts the
// Restore state machine
void RestoreTopologyState(sp_string _checkpoint_id, sp_int64 _restore_txid);
// Invoked when TMaster sends the StartStatefulProcessing request to kick
// start the computation. We send the StartStatefulProcessing to all our
// local instances so that they can start the processing.
void StartStatefulProcessing(sp_string _checkpoint_id);
// Called when Stateful Restorer restores the instance state
void HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64 _restore_txid);
// Patch new physical plan with internal hydrated topology but keep new topology data:
// - new topology state
// - new topology/component config
static void PatchPhysicalPlanWithHydratedTopology(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::api::Topology const& _topology);
shared_ptr<heron::common::HeronStateMgr> state_mgr_;
shared_ptr<proto::system::PhysicalPlan> pplan_;
sp_string topology_name_;
sp_string topology_id_;
sp_string stmgr_id_;
sp_string stmgr_host_;
sp_int32 data_port_;
sp_int32 local_data_port_;
std::vector<sp_string> instances_;
// Getting data from other streammgrs
shared_ptr<StMgrServer> stmgr_server_;
// Used to get/send data to local instances
shared_ptr<InstanceServer> instance_server_;
// Pushing data to other streammanagers
shared_ptr<StMgrClientMgr> clientmgr_;
shared_ptr<TMasterClient> tmaster_client_;
shared_ptr<EventLoop> eventLoop_;
// Map of task_id to stmgr_id
std::unordered_map<sp_int32, sp_string> task_id_to_stmgr_;
// map of <component, streamid> to its consumers
std::unordered_map<std::pair<sp_string, sp_string>,
unique_ptr<StreamConsumers>> stream_consumers_;
// xor managers
shared_ptr<XorManager> xor_mgrs_;
// Tuple Cache to optimize message building
shared_ptr<TupleCache> tuple_cache_;
// Neighbour Calculator for stateful processing
shared_ptr<NeighbourCalculator> neighbour_calculator_;
// Stateful Restorer
shared_ptr<StatefulRestorer> stateful_restorer_;
// This is the topology structure
// that contains the full component objects
shared_ptr<proto::api::Topology> hydrated_topology_;
// Metrics Manager
shared_ptr<heron::common::MetricsMgrSt> metrics_manager_client_;
// Checkpoint Manager
shared_ptr<CkptMgrClient> ckptmgr_client_;
// Process related metrics
shared_ptr<heron::common::MultiAssignableMetric> stmgr_process_metrics_;
// Stateful Restore metric
shared_ptr<heron::common::CountMetric> restore_initiated_metrics_;
shared_ptr<heron::common::MultiCountMetric> dropped_during_restore_metrics_;
// Instance related metrics
shared_ptr<heron::common::MultiCountMetric> instance_bytes_received_metrics_;
// Backpressure relarted metrics
shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_initiated_;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
sp_string zkhostport_;
sp_string zkroot_;
sp_int32 metricsmgr_port_;
sp_int32 shell_port_;
sp_int32 ckptmgr_port_;
sp_string ckptmgr_id_;
std::vector<sp_int32> out_tasks_;
bool is_acking_enabled;
config::TopologyConfigVars::TopologyReliabilityMode reliability_mode_;
sp_int64 high_watermark_;
sp_int64 low_watermark_;
// whether MetricsCacheMgr is running
sp_string metricscachemgr_mode_;
} // namespace stmgr
} // namespace heron