blob: 352f6f71d26d107b8b88cc0030e9e30fe1362bb6 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#include "singa/utils/zk_service.h"
#include <glog/logging.h>
#include <algorithm>
using std::string;
using std::to_string;
using std::vector;
namespace singa {
void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
const char *path, void *watcherCtx) {
// check if already callback
RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
if (cb->fn == nullptr) return;
if (type == ZOO_CHILD_EVENT) {
struct String_vector child;
// check the child list and put another watcher
int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
if (ret == ZOK) {
if (child.count == 0) {
LOG(INFO) << "child.count = 0 in path: " << path;
// all workers leave, we do callback now
(*cb->fn)(cb->ctx);
cb->fn = nullptr;
}
} else {
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_wget_children " << path << ")";
}
} else {
LOG(FATAL) << "Unhandled callback type code: "<< type;
}
}
ZKService::~ZKService() {
// close zookeeper handler
zookeeper_close(zkhandle_);
}
char zk_cxt[] = "ZKClusterRT";
bool ZKService::Init(const string& host, int timeout) {
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
static_cast<void *>(zk_cxt), 0);
if (zkhandle_ == NULL) {
LOG(ERROR) << "Error when connecting to zookeeper servers...";
LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
LOG(ERROR) << host.c_str();
return false;
}
return true;
}
bool ZKService::CreateNode(const char* path, const char* val, int flag,
char* output) {
CHECK(zkhandle_) << "zk handler not initialized";
char buf[kZKBufSize];
int ret = 0;
// send the zk request
for (int i = 0; i < kNumRetry; ++i) {
ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
&ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
if (ret == ZNONODE) {
LOG(WARNING) << "zookeeper parent node of " << path
<< " not exist, retry later";
} else if (ret == ZCONNECTIONLOSS) {
LOG(WARNING) << "zookeeper disconnected, retry later";
} else {
break;
}
sleep(kSleepSec);
}
// copy the node name to output
if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
snprintf(output, kZKBufSize, "%s", buf);
// use snprintf instead of strcpy
// strcpy(output, buf);
}
if (ret == ZOK) {
LOG(INFO) << "created zookeeper node " << buf
<< " (" << (val == nullptr ? "NULL" : val) << ")";
return true;
} else if (ret == ZNODEEXISTS) {
LOG(WARNING) << "zookeeper node " << path << " already exists";
return true;
} else if (ret == ZCONNECTIONLOSS) {
LOG(ERROR) << "Cannot connect to zookeeper, "
<< "please ensure it is running properly...\n"
<< "If want to use zookeeper in our thirdparty folder, "
<< "you can start it by:\n"
<< "$ ./bin/zk-service.sh start";
return false;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_create " << path << ")";
return false;
}
bool ZKService::DeleteNode(const char* path) {
CHECK(zkhandle_) << "zk handler not initialized";
int ret = zoo_delete(zkhandle_, path, -1);
if (ret == ZOK) {
LOG(INFO) << "deleted zookeeper node " << path;
return true;
} else if (ret == ZNONODE) {
LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
return true;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_delete " << path << ")";
return false;
}
bool ZKService::Exist(const char* path) {
CHECK(zkhandle_) << "zk handler not initialized";
struct Stat stat;
int ret = zoo_exists(zkhandle_, path, 0, &stat);
if (ret == ZOK) return true;
else if (ret == ZNONODE) return false;
LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
return false;
}
bool ZKService::UpdateNode(const char* path, const char* val) {
CHECK(zkhandle_) << "zk handler not initialized";
// set version = -1, do not check content version
int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
if (ret == ZOK) {
return true;
} else if (ret == ZNONODE) {
LOG(ERROR) << "zk node " << path << " does not exist";
return false;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_get " << path << ")";
return false;
}
bool ZKService::GetNode(const char* path, char* output) {
CHECK(zkhandle_) << "zk handler not initialized";
struct Stat stat;
int val_len = kZKBufSize;
int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
if (ret == ZOK) {
output[val_len] = '\0';
return true;
} else if (ret == ZNONODE) {
LOG(ERROR) << "zk node " << path << " does not exist";
return false;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_get " << path << ")";
return false;
}
bool ZKService::GetChild(const char* path, vector<string>* vt) {
CHECK(zkhandle_) << "zk handler not initialized";
struct String_vector child;
int ret = zoo_get_children(zkhandle_, path, 0, &child);
if (ret == ZOK) {
vt->clear();
for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
return true;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_get_children " << path << ")";
return false;
}
bool ZKService::WGetChild(const char* path, vector<string>* vt,
RTCallback *cb) {
CHECK(zkhandle_) << "zk handler not initialized";
struct String_vector child;
int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
if (ret == ZOK) {
vt->clear();
for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
return true;
}
LOG(FATAL) << "Unhandled ZK error code: " << ret
<< " (zoo_get_children " << path << ")";
return false;
}
void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
const char *path, void *watcherCtx) {
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE)
LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
else if (state == ZOO_EXPIRED_SESSION_STATE)
LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
}
}
ZKClusterRT::ZKClusterRT(const string& host, int job_id) {
host_ = host;
workspace_ = GetZKJobWorkspace(job_id);
group_path_ = workspace_ + kZKPathJobGroup;
proc_path_ = workspace_ + kZKPathJobProc;
proc_lock_path_ = workspace_ + kZKPathJobPLock;
}
ZKClusterRT::~ZKClusterRT() {
// release callback vector
for (RTCallback* p : cb_vec_) {
delete p;
}
}
bool ZKClusterRT::Init() {
if (!zk_.Init(host_, timeout_)) return false;
if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
return false;
if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
return false;
if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
return false;
if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
return false;
if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
return false;
if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
return false;
return true;
}
int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
char buf[kZKBufSize];
string lock = proc_lock_path_ + "/lock-";
if (!zk_.CreateNode(lock.c_str(), nullptr,
ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
return -1;
}
// get all children in lock folder
vector<string> vt;
if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
return -1;
}
// find own position among all locks
int id = -1;
std::sort(vt.begin(), vt.end());
for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
if (proc_lock_path_+"/"+vt[i] == buf) {
id = i;
break;
}
}
if (id == -1) {
LOG(ERROR) << "cannot find own node " << buf;
return -1;
}
// create a new node in proc path
string path = proc_path_ + "/proc-" + to_string(id);
string content = host_addr + "|" + to_string(pid);
if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
nullptr)) {
return -1;
}
return id;
}
std::string ZKClusterRT::GetProcHost(int proc_id) {
char val[kZKBufSize];
// construct file name
string path = proc_path_ + "/proc-" + to_string(proc_id);
if (!zk_.GetNode(path.c_str(), val)) return "";
int len = strlen(val) - 1;
while (len && val[len] != '|') --len;
CHECK(len);
val[len] = '\0';
return string(val);
}
bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
CHECK_NOTNULL(fn);
string path = groupPath(gid);
// create zk node
if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
vector<string> child;
// store the callback function and context for later usage
RTCallback *cb = new RTCallback;
cb->fn = fn;
cb->ctx = ctx;
cb_vec_.push_back(cb);
// start to watch on the zk node, does not care about the first return value
return zk_.WGetChild(path.c_str(), &child, cb);
}
bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
string path = groupPath(s_group) + workerPath(gid, wid);
// try to create an ephemeral node under server group path
return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
}
bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
string path = groupPath(s_group) + workerPath(gid, wid);
return zk_.DeleteNode(path.c_str());
}
} // namespace singa