blob: 0d3cccd2af2133a3ed1a25bfa349871725cc1d2d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __TMANAGER_H
#define __TMANAGER_H
#include <map>
#include <set>
#include <string>
#include <vector>
#include "statemgr/heron-statemgr.h"
#include "metrics/metrics-mgr-st.h"
#include "metrics/metrics.h"
#include "network/network.h"
#include "proto/tmanager.pb.h"
#include "basics/basics.h"
namespace heron {
namespace tmanager {
using std::unique_ptr;
using std::shared_ptr;
class StMgrState;
class TController;
class StatsInterface;
class TManagerServer;
class TMetricsCollector;
class StatefulController;
class CkptMgrClient;
typedef std::map<std::string, shared_ptr<StMgrState>> StMgrMap;
typedef StMgrMap::iterator StMgrMapIter;
typedef std::map<std::string, std::string> ConfigValueMap;
// From component name to config/value pairs
typedef std::map<std::string, std::map<std::string, std::string>> ComponentConfigMap;
class TManager {
public:
TManager(const std::string& _zk_hostport, const std::string& _topology_name,
const std::string& _topology_id, const std::string& _topdir,
sp_int32 _tmanager_controller_port, sp_int32 _server_port,
sp_int32 _stats_port, sp_int32 metricsMgrPort, sp_int32 _ckptmgr_port,
const std::string& metrics_sinks_yaml,
const std::string& _myhost_name, shared_ptr<EventLoop> eventLoop);
virtual ~TManager();
const std::string& GetTopologyId() const { return current_pplan_->topology().id(); }
const std::string& GetTopologyName() const { return current_pplan_->topology().name(); }
proto::api::TopologyState GetTopologyState() const { return current_pplan_->topology().state(); }
void ActivateTopology(VCallback<proto::system::StatusCode> cb);
void DeActivateTopology(VCallback<proto::system::StatusCode> cb);
// Update runtime configs in a topology.
// Return true if successful; false otherwise and the callback function won't be invoked.
bool UpdateRuntimeConfig(const ComponentConfigMap& _config,
VCallback<proto::system::StatusCode> cb);
// Validate runtime config. Return false if any issue is found.
bool ValidateRuntimeConfig(const ComponentConfigMap& _config) const;
proto::system::Status* RegisterStMgr(const proto::system::StMgr& _stmgr,
const std::vector<shared_ptr<proto::system::Instance>>& _instances,
Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan);
// function to update heartbeat for a nodemgr
proto::system::Status* UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
proto::system::StMgrStats* _stats);
// When stmgr disconnects from us
proto::system::StatusCode RemoveStMgrConnection(Connection* _conn);
// Called by http server upon receiving a user message to cleanup the state
void CleanAllStatefulCheckpoint();
// Called by ckptmgr client upon receiving CleanStatefulCheckpointResponse
void HandleCleanStatefulCheckpointResponse(proto::system::StatusCode);
// Get stream managers registration summary
std::unique_ptr<proto::tmanager::StmgrsRegistrationSummaryResponse> GetStmgrsRegSummary();
// Accessors
const shared_ptr<proto::system::PhysicalPlan> getPhysicalPlan() const { return current_pplan_; }
// TODO(mfu): Should we provide this?
// topology_ should only be used to construct physical plan when TManager first starts
// Providing an accessor is bug prone.
// Now used in GetMetrics function in tmetrics-collector
const proto::api::Topology& getInitialTopology() const { return *topology_; }
// Timer function to start the stateful checkpoint process
void SendCheckpointMarker();
// Called by tmanager server when it gets InstanceStateStored message
void HandleInstanceStateStored(const std::string& _checkpoint_id,
const proto::system::Instance& _instance);
// Called by tmanager server when it gets RestoreTopologyStateResponse message
void HandleRestoreTopologyStateResponse(Connection* _conn,
const std::string& _checkpoint_id,
int64_t _restore_txid,
proto::system::StatusCode _status);
// Called by tmanager server when it gets ResetTopologyState message
void ResetTopologyState(Connection* _conn, const std::string& _dead_stmgr,
int32_t _dead_instance, const std::string& _reason);
private:
// Helper function to fetch physical plan
void FetchPhysicalPlan();
// Function to be called that calls MakePhysicalPlan and sends it to all stmgrs
void DoPhysicalPlan(EventLoop::Status _code);
// Log config object
void LogConfig(const ComponentConfigMap& _config);
// Big brother function that does the assignment to the workers
// If _new_stmgr is null, this means that there was a plan
// existing, but a _new_stmgr joined us. So redo his part
// If _new_stmgr is empty, this means do pplan from scratch
shared_ptr<proto::system::PhysicalPlan> MakePhysicalPlan();
// Check to see if the topology is of correct format
bool ValidateTopology(const proto::api::Topology& _topology);
// Check to see if the topology and stmgrs match
// in terms of workers
bool ValidateStMgrsWithPackingPlan();
// Check to see if the stmgrs and pplan match
// in terms of workers
bool ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan);
// Check if incoming runtime configs are valid or not.
// All incoming configurations must exist. If there is any non-existing
// configuration, or the data type is wrong, return false.
bool ValidateRuntimeConfigNames(const ComponentConfigMap& _config) const;
// If the assignment is already done, then:
// 1. Distribute physical plan to all active stmgrs
bool DistributePhysicalPlan();
// Function called after we set the tmanagerlocation
void SetTManagerLocationDone(proto::system::StatusCode _code);
// Function called after we get the topology
void GetTopologyDone(proto::system::StatusCode _code);
// Function called after we get StatefulConsistentCheckpoints
void GetStatefulCheckpointsDone(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
proto::system::StatusCode _code);
// Function called after we set an initial StatefulConsistentCheckpoints
void SetStatefulCheckpointsDone(proto::system::StatusCode _code,
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt);
// Helper function to setup stateful coordinator
void SetupStatefulController(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt);
// Function called after we try to get assignment
void GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code);
// Function called after we try to commit a new assignment
void SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code);
// Function called when we want to setup ourselves as tmanager
void EstablishTManager(EventLoop::Status);
void EstablishPackingPlan(EventLoop::Status);
void FetchPackingPlan();
void OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
proto::system::StatusCode _status);
// Metrics updates
void UpdateUptimeMetric();
void UpdateProcessMetrics(EventLoop::Status);
// Update configurations in physical plan.
bool UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
const ComponentConfigMap& _config);
// Function called when a new stateful ckpt record is saved
void HandleStatefulCheckpointSave(
const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts);
// Function called to kill container
void KillContainer(const std::string& host_name,
sp_int32 port,
const std::string& stmgr_id);
// map of active stmgr id to stmgr state
StMgrMap stmgrs_;
// map of connection to stmgr id
std::map<Connection*, std::string> connection_to_stmgr_id_;
// set of nodemanagers that have not yet connected to us
std::set<std::string> absent_stmgrs_;
// The current physical plan
shared_ptr<proto::system::PhysicalPlan> current_pplan_;
// The topology as first submitted by the user
// It shall only be used to construct the physical plan when TManager first time starts
// Any runtime changes shall be made to current_pplan_->topology
unique_ptr<proto::api::Topology> topology_;
shared_ptr<proto::system::PackingPlan> packing_plan_;
// The statemgr where we store/retrieve our state
shared_ptr<heron::common::HeronStateMgr> state_mgr_;
// Our copy of the tmanagerlocation
unique_ptr<proto::tmanager::TManagerLocation> tmanager_location_;
// When we are in the middle of doing assignment
// we set this to true
bool assignment_in_progress_;
bool do_reassign_;
// State information
std::string zk_hostport_;
std::string topdir_;
// Servers that implement our services
unique_ptr<TController> tmanager_controller_;
sp_int32 tmanager_controller_port_;
unique_ptr<TManagerServer> server_;
sp_int32 server_port_;
unique_ptr<StatsInterface> stats_;
sp_int32 stats_port_;
std::string myhost_name_;
// how many times have we tried to establish
// ourselves as server
sp_int32 server_establish_attempts_;
// collector
shared_ptr<TMetricsCollector> metrics_collector_;
sp_int32 mMetricsMgrPort;
// Metrics Manager
shared_ptr<heron::common::MetricsMgrSt> mMetricsMgrClient;
// Ckpt Manager
unique_ptr<CkptMgrClient> ckptmgr_client_;
sp_int32 ckptmgr_port_;
// Process related metrics
shared_ptr<heron::common::MultiAssignableMetric> tmanagerProcessMetrics;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
// Stateful Controller
unique_ptr<StatefulController> stateful_controller_;
// HTTP client
AsyncDNS* dns_;
HTTPClient* http_client_;
// Copy of the EventLoop
shared_ptr<EventLoop> eventLoop_;
};
} // namespace tmanager
} // namespace heron
#endif