blob: 88696180e6ac22b74bda70367b38434849ecff79 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __TMASTER_H
#define __TMASTER_H
#include <map>
#include <set>
#include <string>
#include <vector>
#include "statemgr/heron-statemgr.h"
#include "metrics/metrics-mgr-st.h"
#include "metrics/metrics.h"
#include "network/network.h"
#include "proto/tmaster.pb.h"
#include "basics/basics.h"
namespace heron {
namespace tmaster {
class StMgrState;
class TController;
class StatsInterface;
class TMasterServer;
class TMetricsCollector;
class StatefulController;
class CkptMgrClient;
typedef std::map<std::string, StMgrState*> StMgrMap;
typedef StMgrMap::iterator StMgrMapIter;
typedef std::map<std::string, std::string> ConfigValueMap;
// From component name to config/value pairs
typedef std::map<std::string, std::map<std::string, std::string>> ComponentConfigMap;
class TMaster {
public:
TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
const std::string& _topology_id, const std::string& _topdir,
sp_int32 _tmaster_controller_port, sp_int32 _master_port,
sp_int32 _stats_port, sp_int32 metricsMgrPort, sp_int32 _ckptmgr_port,
const std::string& metrics_sinks_yaml,
const std::string& _myhost_name, EventLoop* eventLoop);
virtual ~TMaster();
const std::string& GetTopologyId() const { return current_pplan_->topology().id(); }
const std::string& GetTopologyName() const { return current_pplan_->topology().name(); }
proto::api::TopologyState GetTopologyState() const { return current_pplan_->topology().state(); }
void ActivateTopology(VCallback<proto::system::StatusCode> cb);
void DeActivateTopology(VCallback<proto::system::StatusCode> cb);
// Update runtime configs in a topology.
// Return true if successful; false otherwise and the callback function won't be invoked.
bool UpdateRuntimeConfig(const ComponentConfigMap& _config,
VCallback<proto::system::StatusCode> cb);
// Validate runtime config. Return false if any issue is found.
bool ValidateRuntimeConfig(const ComponentConfigMap& _config) const;
proto::system::Status* RegisterStMgr(const proto::system::StMgr& _stmgr,
const std::vector<proto::system::Instance*>& _instances,
Connection* _conn, proto::system::PhysicalPlan*& _pplan);
// function to update heartbeat for a nodemgr
proto::system::Status* UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
proto::system::StMgrStats* _stats);
// When stmgr disconnects from us
proto::system::StatusCode RemoveStMgrConnection(Connection* _conn);
// Called by http server upon receiving a user message to cleanup the state
void CleanAllStatefulCheckpoint();
// Called by ckptmgr client upon receiving CleanStatefulCheckpointResponse
void HandleCleanStatefulCheckpointResponse(proto::system::StatusCode);
// Get stream managers registration summary
std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse> GetStmgrsRegSummary();
// Accessors
const proto::system::PhysicalPlan* getPhysicalPlan() const { return current_pplan_; }
// TODO(mfu): Should we provide this?
// topology_ should only be used to construct physical plan when TMaster first starts
// Providing an accessor is bug prone.
// Now used in GetMetrics function in tmetrics-collector
const proto::api::Topology* getInitialTopology() const { return topology_; }
// Timer function to start the stateful checkpoint process
void SendCheckpointMarker();
// Called by tmaster server when it gets InstanceStateStored message
void HandleInstanceStateStored(const std::string& _checkpoint_id,
const proto::system::Instance& _instance);
// Called by tmaster server when it gets RestoreTopologyStateResponse message
void HandleRestoreTopologyStateResponse(Connection* _conn,
const std::string& _checkpoint_id,
int64_t _restore_txid,
proto::system::StatusCode _status);
// Called by tmaster server when it gets ResetTopologyState message
void ResetTopologyState(Connection* _conn, const std::string& _dead_stmgr,
int32_t _dead_instance, const std::string& _reason);
private:
// Helper function to fetch physical plan
void FetchPhysicalPlan();
// Function to be called that calls MakePhysicalPlan and sends it to all stmgrs
void DoPhysicalPlan(EventLoop::Status _code);
// Log config object
void LogConfig(const ComponentConfigMap& _config);
// Big brother function that does the assignment to the workers
// If _new_stmgr is null, this means that there was a plan
// existing, but a _new_stmgr joined us. So redo his part
// If _new_stmgr is empty, this means do pplan from scratch
proto::system::PhysicalPlan* MakePhysicalPlan();
// Check to see if the topology is of correct format
bool ValidateTopology(const proto::api::Topology& _topology);
// Check to see if the topology and stmgrs match
// in terms of workers
bool ValidateStMgrsWithPackingPlan();
// Check to see if the stmgrs and pplan match
// in terms of workers
bool ValidateStMgrsWithPhysicalPlan(proto::system::PhysicalPlan* _pplan);
// Check if incoming runtime configs are valid or not.
// All incoming configurations must exist. If there is any non-existing
// configuration, or the data type is wrong, return false.
bool ValidateRuntimeConfigNames(const ComponentConfigMap& _config) const;
// If the assignment is already done, then:
// 1. Distribute physical plan to all active stmgrs
bool DistributePhysicalPlan();
// Function called after we set the tmasterlocation
void SetTMasterLocationDone(proto::system::StatusCode _code);
// Function called after we get the topology
void GetTopologyDone(proto::system::StatusCode _code);
// Function called after we get StatefulConsistentCheckpoints
void GetStatefulCheckpointsDone(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
proto::system::StatusCode _code);
// Function called after we set an initial StatefulConsistentCheckpoints
void SetStatefulCheckpointsDone(proto::system::StatusCode _code,
proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt);
// Helper function to setup stateful coordinator
void SetupStatefulController(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt);
// Function called after we try to get assignment
void GetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan, proto::system::StatusCode _code);
// Function called after we try to commit a new assignment
void SetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan, proto::system::StatusCode _code);
// Function called when we want to setup ourselves as tmaster
void EstablishTMaster(EventLoop::Status);
void EstablishPackingPlan(EventLoop::Status);
void FetchPackingPlan();
void OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
proto::system::StatusCode _status);
// Metrics updates
void UpdateUptimeMetric();
void UpdateProcessMetrics(EventLoop::Status);
// Update configurations in physical plan.
bool UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
const ComponentConfigMap& _config);
// Function called when a new stateful ckpt record is saved
void HandleStatefulCheckpointSave(const std::string& _oldest_ckpt);
// Function called to kill container
void KillContainer(const std::string& host_name,
sp_int32 port,
const std::string& stmgr_id);
// map of active stmgr id to stmgr state
StMgrMap stmgrs_;
// map of connection to stmgr id
std::map<Connection*, std::string> connection_to_stmgr_id_;
// set of nodemanagers that have not yet connected to us
std::set<std::string> absent_stmgrs_;
// The current physical plan
proto::system::PhysicalPlan* current_pplan_;
// The topology as first submitted by the user
// It shall only be used to construct the physical plan when TMaster first time starts
// Any runtime changes shall be made to current_pplan_->topology
proto::api::Topology* topology_;
proto::system::PackingPlan* packing_plan_;
// The statemgr where we store/retrieve our state
heron::common::HeronStateMgr* state_mgr_;
// Our copy of the tmasterlocation
proto::tmaster::TMasterLocation* tmaster_location_;
// When we are in the middle of doing assignment
// we set this to true
bool assignment_in_progress_;
bool do_reassign_;
// State information
std::string zk_hostport_;
std::string topdir_;
// Servers that implement our services
TController* tmaster_controller_;
sp_int32 tmaster_controller_port_;
TMasterServer* master_;
sp_int32 master_port_;
StatsInterface* stats_;
sp_int32 stats_port_;
std::string myhost_name_;
// how many times have we tried to establish
// ourselves as master
sp_int32 master_establish_attempts_;
// collector
TMetricsCollector* metrics_collector_;
sp_int32 mMetricsMgrPort;
// Metrics Manager
heron::common::MetricsMgrSt* mMetricsMgrClient;
// Ckpt Manager
CkptMgrClient* ckptmgr_client_;
sp_int32 ckptmgr_port_;
// Process related metrics
heron::common::MultiAssignableMetric* tmasterProcessMetrics;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
// Stateful Controller
StatefulController* stateful_controller_;
// HTTP client
AsyncDNS* dns_;
HTTPClient* http_client_;
// Copy of the EventLoop
EventLoop* eventLoop_;
};
} // namespace tmaster
} // namespace heron
#endif