blob: 0fbfae6351d0ca8001ebc91ad638e7eaef3dc567 [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
///////////////////////////////////////////////////////////////////////////////
// Implements the ZKClient using the ZKClient library as the underlying
// transport implementation. See zkclient.h for API details.
///////////////////////////////////////////////////////////////////////////////
#include "zookeeper/zkclient.h"
#include <fcntl.h>
#include <errno.h>
#include <string>
#include <vector>
#include "glog/logging.h"
// used for get calls
struct ZKClientGetStructure {
std::string* result_;
sp_int32* version_;
CallBack1<sp_int32>* cb_;
};
// used for get children calls
struct ZKClientGetChildrenStructure {
std::vector<std::string>* result_;
CallBack1<sp_int32>* cb_;
};
void RunUserCb(sp_int32 rc, VCallback<sp_int32> cb) { cb(rc); }
void RunWatcherCb(VCallback<> cb) { cb(); }
// A helper method to run watch event callback
void RunWatchEventCb(VCallback<ZKClient::ZkWatchEvent> watch_cb, ZKClient::ZkWatchEvent event) {
watch_cb(event);
}
// 'C' style callback for zk on global wathcer events
void CallGlobalWatcher(zhandle_t* _zh, sp_int32 _type, sp_int32 _state, const char* _path,
void* _context) {
ZKClient* cl = reinterpret_cast<ZKClient*>(_context);
cl->GlobalWatcher(_zh, _type, _state, _path);
}
// 'C' style callback for zk watcher
void CallWatcher(zhandle_t*, sp_int32 _type, sp_int32 _state, const char* _path, void* _context) {
LOG(INFO) << "ZKClient CallWatcher called with type " << ZKClient::type2String(_type)
<< " and state " << ZKClient::state2String(_state);
if (_path && strlen(_path) > 0) {
LOG(INFO) << " for path " << _path;
}
if (_state == ZOO_CONNECTED_STATE) {
// Only Handle watches when in connected state
CallBack* cb = reinterpret_cast<CallBack*>(_context);
cb->Run();
}
}
void StringCompletionWatcher(sp_int32 _rc, const char*, const void* _data) {
CallBack1<sp_int32>* cb = (CallBack1<sp_int32>*)(_data);
cb->Run(_rc);
}
void VoidCompletionWatcher(sp_int32 _rc, const void* _data) {
CallBack1<sp_int32>* cb = (CallBack1<sp_int32>*)(_data);
cb->Run(_rc);
}
void GetCompletionWatcher(int _rc, const char* _value, int _value_len, const struct Stat* _stat,
const void* _data) {
const ZKClientGetStructure* get_structure = reinterpret_cast<const ZKClientGetStructure*>(_data);
std::string* result = get_structure->result_;
if (_rc == 0 && _value) {
result->resize(_value_len);
result->assign(_value, _value_len);
if (get_structure->version_) {
*(get_structure->version_) = _stat->version;
}
}
CallBack1<sp_int32>* cb = get_structure->cb_;
delete get_structure;
cb->Run(_rc);
}
void SetCompletionWatcher(sp_int32 _rc, const struct Stat*, const void* _data) {
CallBack1<sp_int32>* cb = (CallBack1<sp_int32>*)(_data);
cb->Run(_rc);
}
void GetChildrenCompletionWatcher(sp_int32 _rc, const struct String_vector* _strings,
const void* _data) {
const ZKClientGetChildrenStructure* get_structure =
reinterpret_cast<const ZKClientGetChildrenStructure*>(_data);
std::vector<std::string>* result = get_structure->result_;
if (_rc == 0 && _strings) {
for (sp_int32 i = 0; i < _strings->count; i++) {
result->push_back(_strings->data[i]);
}
}
CallBack1<sp_int32>* cb = get_structure->cb_;
delete get_structure;
cb->Run(_rc);
}
void ExistsCompletionHandler(sp_int32 _rc, const struct Stat*, const void* _data) {
CallBack1<sp_int32>* cb = (CallBack1<sp_int32>*)_data;
cb->Run(_rc);
}
// Constructor. We create a new event_base.
ZKClient::ZKClient(const std::string& hostportlist, EventLoop* eventLoop)
: eventLoop_(eventLoop), hostportlist_(hostportlist) {
Init();
}
ZKClient::ZKClient(const std::string& hostportlist, EventLoop* eventLoop,
VCallback<ZkWatchEvent> global_watcher_cb)
: eventLoop_(eventLoop),
hostportlist_(hostportlist),
client_global_watcher_cb_(std::move(global_watcher_cb)) {
CHECK(client_global_watcher_cb_);
Init();
}
void ZKClient::Init() {
zkaction_responses_ = new PCQueue<CallBack*>();
auto zkaction_response_cb = [this](EventLoop::Status status) {
this->OnZkActionResponse(status);
};
if (pipe(pipers_) < 0) {
LOG(FATAL) << "Pipe failed in ZKClient";
}
sp_int32 flags;
if ((flags = fcntl(pipers_[0], F_GETFL, 0)) < 0 ||
fcntl(pipers_[0], F_SETFL, flags | O_NONBLOCK) < 0 ||
eventLoop_->registerForRead(pipers_[0], std::move(zkaction_response_cb), true) != 0) {
LOG(FATAL) << "fcntl failed in ZKClient";
}
zoo_deterministic_conn_order(0); // even distribution of clients on the server
InitZKHandle();
}
// Destructor.
ZKClient::~ZKClient() {
if (eventLoop_) {
CHECK_EQ(eventLoop_->unRegisterForRead(pipers_[0]), 0);
}
close(pipers_[0]);
close(pipers_[1]);
delete zkaction_responses_;
zookeeper_close(zk_handle_);
}
//
// Client implementions
//
void ZKClient::Exists(const std::string& _node, VCallback<sp_int32> cb) {
Exists(_node, VCallback<>(), std::move(cb));
}
void ZKClient::Exists(const std::string& _node, VCallback<> watcher, VCallback<sp_int32> cb) {
LOG(INFO) << "Checking if " << _node << " exists";
sp_int32 rc;
if (!watcher) {
rc = zoo_aexists(zk_handle_, _node.c_str(), 0, ExistsCompletionHandler,
CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb)));
} else {
rc = zoo_awexists(zk_handle_, _node.c_str(), CallWatcher,
CreateCallback(this, &ZKClient::ZkWatcherCb, std::move(watcher)),
ExistsCompletionHandler,
CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb)));
}
if (rc) {
// There is nothing we can do here. Continuing will only make
// other things fail
LOG(FATAL) << "zoo_aexists/awexists returned non-zero " << rc << " errno: " << errno
<< " while checking for node " << _node << "\n";
}
}
// Creates a node
void ZKClient::CreateNode(const std::string& _node, const std::string& _value, bool _is_ephimeral,
VCallback<sp_int32> cb) {
sp_int32 flags = 0;
if (_is_ephimeral) {
flags |= ZOO_EPHEMERAL;
}
LOG(INFO) << "Creating zknode " << _node << std::endl;
sp_int32 rc = zoo_acreate(zk_handle_, _node.c_str(), _value.c_str(), _value.size(),
&ZOO_OPEN_ACL_UNSAFE, flags, StringCompletionWatcher,
CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb)));
if (rc) {
// There is nothing we can do here. Continuing will only make
// other things fail
LOG(FATAL) << "zoo_acreate returned non-zero " << rc << " errno: " << errno
<< " while creating node " << _node << "\n";
}
}
// Deletes a node
void ZKClient::DeleteNode(const std::string& _node, VCallback<sp_int32> cb) {
LOG(INFO) << "Deleting zknode " << _node << std::endl;
sp_int32 rc = zoo_adelete(zk_handle_, _node.c_str(), -1, VoidCompletionWatcher,
CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb)));
if (rc) {
// There is nothing we can do here. Continuing will only make
// other things fail
LOG(FATAL) << "zoo_adelete returned non-zero " << rc << " errno: " << errno
<< " while deleting node " << _node << "\n";
}
}
void ZKClient::Get(const std::string& _node, std::string* _data, VCallback<sp_int32> cb) {
Get(_node, _data, NULL, std::move(cb));
}
void ZKClient::Get(const std::string& _node, std::string* _data, sp_int32* _version,
VCallback<sp_int32> cb) {
Get(_node, _data, _version, VCallback<>(), std::move(cb));
}
void ZKClient::Get(const std::string& _node, std::string* _data, sp_int32* _version,
VCallback<> watcher, VCallback<sp_int32> cb) {
LOG(INFO) << "Getting zknode " << _node << std::endl;
ZKClientGetStructure* get_structure = new ZKClientGetStructure();
get_structure->result_ = _data;
get_structure->version_ = _version;
get_structure->cb_ = CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb));
sp_int32 rc;
if (!watcher) {
rc = zoo_aget(zk_handle_, _node.c_str(), 0, GetCompletionWatcher, get_structure);
} else {
rc = zoo_awget(zk_handle_, _node.c_str(), CallWatcher,
CreateCallback(this, &ZKClient::ZkWatcherCb, std::move(watcher)),
GetCompletionWatcher, get_structure);
}
if (rc) {
// There is nothing we can do here. Continuing will only make
// other things fail
LOG(FATAL) << "zoo_aget/zoo_awget returned non-zero " << rc << " errno: " << errno
<< " while getting " << _node << "\n";
}
}
void ZKClient::Set(const std::string& _node, const std::string& _data, VCallback<sp_int32> cb) {
Set(_node, _data, -1, std::move(cb));
}
void ZKClient::Set(const std::string& _node, const std::string& _data, sp_int32 _version,
VCallback<sp_int32> cb) {
LOG(INFO) << "Setting zknode " << _node << std::endl;
sp_int32 rc =
zoo_aset(zk_handle_, _node.c_str(), _data.c_str(), _data.size(), _version,
SetCompletionWatcher, CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb)));
if (rc) {
// There is nothing we can do here. Continuing will only make
// other things fail
LOG(FATAL) << "zoo_aset returned non-zero " << rc << " errno: " << errno << " while setting "
<< _node << "\n";
}
}
void ZKClient::GetChildren(const std::string& _node, std::vector<std::string>* _children,
VCallback<sp_int32> cb) {
LOG(INFO) << "Getting children for zknode " << _node << std::endl;
ZKClientGetChildrenStructure* get_structure = new ZKClientGetChildrenStructure();
get_structure->result_ = _children;
get_structure->cb_ = CreateCallback(this, &ZKClient::ZkActionCb, std::move(cb));
sp_int32 rc =
zoo_aget_children(zk_handle_, _node.c_str(), 0, GetChildrenCompletionWatcher, get_structure);
if (rc) {
// There is nothing we can do here. Continuing will only make
// other things fail
LOG(FATAL) << "zoo_aget_children returned non-zero " << rc << " errno: " << errno
<< " while getting children of " << _node << "\n";
}
}
//
// Internal functions
//
// TODO(vikasr): move internal functions to use std::function
// Called when there is some state change wrt zk handle
// Note:- This is called under the context of the zk thread
// So be sure that anything that you do is threadsafe
void ZKClient::GlobalWatcher(zhandle_t* _zh, sp_int32 _type, sp_int32 _state, const char* _path) {
// Be careful using zk_handler_ here rather than _zzh;
// the client lib may call the watcher before zookeeper_init returns
LOG(INFO) << "ZKClient GlobalWatcher called with type " << type2String(_type) << " and state "
<< state2String(_state);
if (_path && strlen(_path) > 0) {
LOG(INFO) << " for path " << _path;
}
if (_type == ZOO_SESSION_EVENT) {
if (_state == ZOO_CONNECTED_STATE) {
const clientid_t* id = zoo_client_id(_zh);
if (zk_clientid_.client_id == 0 || zk_clientid_.client_id != id->client_id) {
zk_clientid_ = *id;
LOG(INFO) << "Got a new session id: " << zk_clientid_.client_id << "\n";
}
}
if (_state == ZOO_AUTH_FAILED_STATE) {
LOG(FATAL) << "ZKClient Authentication failure. Shutting down...\n";
} else if (_state == ZOO_EXPIRED_SESSION_STATE) {
// If client watcher is set, notify it about the session expiry
// instead of shutting down.
if (client_global_watcher_cb_) {
const ZkWatchEvent event = {_type, _state, _path};
SendWatchEvent(event);
} else {
// We need to close and re-establish
// There are watches, etc that need to be set again.
// So the simpler option here is to kill ourselves
LOG(FATAL) << "Session expired. Shutting down...\n";
}
} else if (_state == ZOO_CONNECTING_STATE) {
// We are still in the process of connecting
LOG(INFO) << "Re-connecting to the zookeeper\n";
} else if (_state == ZOO_ASSOCIATING_STATE) {
// Connection process still ongoing
}
}
}
// Helper function to init the zk_handle
void ZKClient::InitZKHandle() {
zk_clientid_.client_id = 0;
zk_handle_ = zookeeper_init(hostportlist_.c_str(), CallGlobalWatcher, 30000, NULL, this, 0);
if (!zk_handle_) {
LOG(FATAL) << "zookeeper_init failed with error " << errno << "\n";
}
}
void ZKClient::SignalMainThread() {
// This need not be protected by any mutex.
// The os will take care of that.
int rc = write(pipers_[1], "a", 1);
if (rc != 1) {
LOG(FATAL) << "Write to pipe failed in ZkClient with return code: " << rc;
}
}
void ZKClient::OnZkActionResponse(EventLoop::Status _status) {
if (_status == EventLoop::READ_EVENT) {
char buf[1];
ssize_t readcount = read(pipers_[0], buf, 1);
if (readcount == 1) {
bool dequeued = false;
CallBack* cb = zkaction_responses_->trydequeue(dequeued);
if (cb) {
cb->Run();
}
} else {
LOG(ERROR) << "In Server read from pipers returned " << readcount << " errno " << errno
<< std::endl;
if (readcount < 0 && (errno == EAGAIN || errno == EINTR)) {
// Never mind. we will try again
return;
} else {
// We really don't know what to do here.
// TODO(kramasamy): Figure out a way to get the hell out of here
return;
}
}
}
return;
}
void ZKClient::ZkActionCb(sp_int32 rc, VCallback<sp_int32> cb) {
zkaction_responses_->enqueue(CreateCallback(&RunUserCb, rc, std::move(cb)));
SignalMainThread();
}
void ZKClient::ZkWatcherCb(VCallback<> cb) {
zkaction_responses_->enqueue(CreateCallback(&RunWatcherCb, std::move(cb)));
SignalMainThread();
}
void ZKClient::SendWatchEvent(const ZkWatchEvent& event) {
CHECK(client_global_watcher_cb_);
zkaction_responses_->enqueue(CreateCallback(&RunWatchEventCb, client_global_watcher_cb_, event));
SignalMainThread();
}
const std::string ZKClient::state2String(sp_int32 _state) {
if (_state == 0) {
return "CLOSED_STATE";
} else if (_state == ZOO_CONNECTING_STATE) {
return "CONNECTING_STATE";
} else if (_state == ZOO_ASSOCIATING_STATE) {
return "ASSOCIATING_STATE";
} else if (_state == ZOO_CONNECTED_STATE) {
return "CONNECTED_STATE";
} else if (_state == ZOO_EXPIRED_SESSION_STATE) {
return "EXPIRED_SESSION_STATE";
} else if (_state == ZOO_AUTH_FAILED_STATE) {
return "AUTH_FAILED_STATE";
} else {
return "INVALID_STATE";
}
}
const std::string ZKClient::type2String(sp_int32 _state) {
if (_state == ZOO_CREATED_EVENT) {
return "CREATED_EVENT";
} else if (_state == ZOO_DELETED_EVENT) {
return "DELETED_EVENT";
} else if (_state == ZOO_CHANGED_EVENT) {
return "CHANGED_EVENT";
} else if (_state == ZOO_CHILD_EVENT) {
return "CHILD_EVENT";
} else if (_state == ZOO_SESSION_EVENT) {
return "SESSION_EVENT";
} else if (_state == ZOO_NOTWATCHING_EVENT) {
return "NOTWATCHING_EVENT";
} else {
return "UNKNOWN_EVENT_TYPE";
}
}