blob: 35bf4ea37784d2f0223c74dca1963eb41fac89dd [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//////////////////////////////////////////////////////////////////////////////
//
// heron-zkstatemgr.h
// Author:- Sanjeev Kulkarni(skulkarni@twitter.com)
//
// This file defines the ZK implenentation of the HeronStateMgr interface.
// The details are
// 1. TopologyMasterLocation is kept as an ephimeral node. This way if the
// master goes away, the node is not there. Thus a create would
// succeed. If there is another tmaster, the createnode would fail
// which would give the indication that another tmaster was running.
// Thus TMasterServer can use just the set method to see if he is the
// only tmaster.
// 2. Once #1 is ensured, Topology and Assignment are straightforward
// create/set operations
// 3. The Topology node always exists. However the assignment may or
// may not exist. Currently TMaster always does get of assignment
// to see if some assignment exists or not. We also keep track
// of this. So that the next time a SetAssignment is called,
// we know whether to do createnode or setnode
//////////////////////////////////////////////////////////////////////////////
#ifndef __HERON_ZKSTATE_H
#define __HERON_ZKSTATE_H
#include <string>
#include <vector>
#include <utility>
#include "zookeeper/zkclient.h"
#include "zookeeper/zkclient_factory.h"
#include "statemgr/heron-statemgr.h"
class ZKClient;
namespace heron {
namespace common {
class HeronZKStateMgr : public HeronStateMgr {
public:
HeronZKStateMgr(const std::string& zkhostport, const std::string& topleveldir,
EventLoop* eventLoop, bool exitOnSessionExpiry);
virtual ~HeronZKStateMgr();
//
// Interface implementations
//
void InitTree();
// Sets up a watch on tmaster location change
void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
void SetMetricsCacheLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher);
// Sets the Tmaster
void SetTMasterLocation(const proto::tmaster::TMasterLocation& _location,
VCallback<proto::system::StatusCode> _cb);
void GetTMasterLocation(const std::string& _topology_name,
proto::tmaster::TMasterLocation* _return,
VCallback<proto::system::StatusCode> _cb);
void SetMetricsCacheLocation(const proto::tmaster::MetricsCacheLocation& _location,
VCallback<proto::system::StatusCode> _cb);
void GetMetricsCacheLocation(const std::string& _topology_name,
proto::tmaster::MetricsCacheLocation* _return,
VCallback<proto::system::StatusCode> _cb);
// Gets/Sets the Topology
void CreateTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
void DeleteTopology(const std::string& _topology_name, VCallback<proto::system::StatusCode> _cb);
void SetTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
VCallback<proto::system::StatusCode> _cb);
// Gets/Sets physical plan
void CreatePhysicalPlan(const proto::system::PhysicalPlan& _plan,
VCallback<proto::system::StatusCode> _cb);
void DeletePhysicalPlan(const std::string& _topology_name,
VCallback<proto::system::StatusCode> _cb);
void SetPhysicalPlan(const proto::system::PhysicalPlan& _pplan,
VCallback<proto::system::StatusCode> _cb);
void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> _cb);
void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
VCallback<proto::system::StatusCode> _cb);
// Gets/Sets execution state
void CreateExecutionState(const proto::system::ExecutionState& _state,
VCallback<proto::system::StatusCode> _cb);
void DeleteExecutionState(const std::string& _topology_name,
VCallback<proto::system::StatusCode> _cb);
void GetExecutionState(const std::string& _topology_name, proto::system::ExecutionState* _return,
VCallback<proto::system::StatusCode> _cb);
void SetExecutionState(const proto::system::ExecutionState& _state,
VCallback<proto::system::StatusCode> _cb);
// Gets/Sets the Stateful Checkpoints
void CreateStatefulCheckpoints(const std::string& _topology_name,
const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
VCallback<proto::system::StatusCode> _cb);
void DeleteStatefulCheckpoints(const std::string& _topology_name,
VCallback<proto::system::StatusCode> _cb);
void SetStatefulCheckpoints(const std::string& _topology_name,
const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
VCallback<proto::system::StatusCode> _cb);
void GetStatefulCheckpoints(const std::string& _topology_name,
proto::ckptmgr::StatefulConsistentCheckpoints* _return,
VCallback<proto::system::StatusCode> _cb);
void ListTopologies(std::vector<sp_string>* _return, VCallback<proto::system::StatusCode> _cb);
void ListExecutionStateTopologies(std::vector<sp_string>* _return,
VCallback<proto::system::StatusCode> _cb);
virtual std::string GetStateLocation() { return zkhostport_; }
protected:
// A test ONLY constructor used to pass a ZKClientFactory which could
// return a MockZKClient
HeronZKStateMgr(const std::string& zkhostport, const std::string& topleveldir,
EventLoop* eventLoop, ZKClientFactory* zkclient_factory,
bool exitOnSessionExpiry = false);
private:
// Done methods
void SetTMasterLocationDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetMetricsCacheLocationDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetTMasterLocationDone(std::string* _contents, proto::tmaster::TMasterLocation* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetMetricsCacheLocationDone(std::string* _contents,
proto::tmaster::MetricsCacheLocation* _return,
VCallback<proto::system::StatusCode> _cb,
sp_int32 _rc);
void CreateTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeleteTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetTopologyDone(std::string* _contents, proto::api::Topology* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void CreatePhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeletePhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetPhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetPhysicalPlanDone(std::string* _contents, proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetPackingPlanDone(std::string* _contents, proto::system::PackingPlan* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void CreateExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeleteExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetExecutionStateDone(std::string* _contents, proto::system::ExecutionState* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void CreateStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeleteStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetStatefulCheckpointsDone(std::string* _contents,
proto::ckptmgr::StatefulConsistentCheckpoints* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void ListTopologiesDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void ListExecutionStateTopologiesDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
// This is the callback passed to ZkClient, to handle tmaster location
// changes. It inturn calls the tmaster_location_watcher to notify the
// clients about the change.
void TMasterLocationWatch();
void MetricsCacheLocationWatch();
void PackingPlanWatch();
// Handles global events from ZKClient. For now, it handles the session
// expired event, by deleting the current client, creating a new one,
// setting the tmaster location watch, and notifying the client of a
// possible tmaster location change.
void GlobalWatchEventHandler(const ZKClient::ZkWatchEvent event);
// Sets a tmaster location watch through the ZKClient Exists method.
void SetTMasterLocationWatchInternal();
void SetMetricsCacheLocationWatchInternal();
void SetPackingPlanWatchInternal();
// A wrapper to be passed to select server registerTimer call.
// Ignores the status and call SetTMasterLocationWatchInternal
void CallSetTMasterLocationWatch(EventLoop::Status status);
void CallSetMetricsCacheLocationWatch(EventLoop::Status status);
void CallSetPackingPlanWatch(EventLoop::Status status);
// A handler callback that gets called by ZkClient upon completion of
// setting Tmaster watch. If the return code indicates failure, we
// retry after SET_WATCH_RETRY_INTERVAL_S seconds.
void SetTMasterWatchCompletionHandler(sp_int32 rc);
void SetMetricsCacheWatchCompletionHandler(sp_int32 rc);
void SetPackingPlanWatchCompletionHandler(sp_int32 rc);
// Essentially tells you whether SetTmasterLocationWatch has been
// called by the client or not. It gets this info through
// tmaster_location_watcher_info_
bool IsTmasterWatchDefined();
bool IsMetricsCacheWatchDefined();
bool IsPackingPlanWatchDefined();
// Common functionality for c`tors. Should be called only once from c`tor
void Init();
// Tells if the failure of setting zk node watch is retryable.
// Currently returns true on connection related errors
static bool ShouldRetrySetWatch(sp_int32 rc);
const std::string zkhostport_;
ZKClient* zkclient_;
// A factory for creating ZKClient. It defaults to DefaultZKClientFactory.
// For tests it could be overriden to a factor that returns a MockZkClient
// This class owns the factory, and is responsible for deleting it.
ZKClientFactory* const zkclient_factory_;
EventLoop* eventLoop_;
// A permanent callback initialized to wrap the WatchEventHandler
VCallback<ZKClient::ZkWatchEvent> watch_event_cb_;
// Holds the tmaster location watch callback and the topology name
// passed by the client. Needed for recreating tmaster location watch
// on session expiry. Only set after 'SetTmasterLocationWatch' method
// is called.
struct TMasterLocationWatchInfo {
VCallback<> watcher_cb;
std::string topology_name;
TMasterLocationWatchInfo(VCallback<> watcher, std::string name)
: watcher_cb(std::move(watcher)), topology_name(name) {}
};
const TMasterLocationWatchInfo* tmaster_location_watcher_info_;
const TMasterLocationWatchInfo* metricscache_location_watcher_info_;
const TMasterLocationWatchInfo* packing_plan_watcher_info_;
// If true, we exit on zookeeper session expired event
const bool exitOnSessionExpiry_;
// Retry interval if setting a watch on zk node fails.
static const sp_int32 SET_WATCH_RETRY_INTERVAL_S;
// For easier unit testing, to allow access to private methods.
friend class HeronZKStateMgrTest;
};
} // namespace common
} // namespace heron
#endif