blob: 789215bd5657f9326c94962cebb31ab1674e0872 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#ifndef SINGA_UTILS_ZK_SERVICE_H_
#define SINGA_UTILS_ZK_SERVICE_H_
#include <zookeeper/zookeeper.h>
#include <string>
#include <vector>
#include "singa/utils/cluster_rt.h"
namespace singa {
const int kZKBufSize = 100;
// following paths are global
const std::string kZKPathSinga = "/singa";
const std::string kZKPathSys = "/singa/sys";
const std::string kZKPathJLock = "/singa/sys/job-lock";
const std::string kZKPathHostIdx = "/singa/sys/host-idx";
const std::string kZKPathApp = "/singa/app";
const std::string kZKPathJob = "/singa/app/job-";
// following paths are local under /singa/app/job-X
const std::string kZKPathJobGroup = "/group";
const std::string kZKPathJobProc = "/proc";
const std::string kZKPathJobPLock = "/proc-lock";
inline std::string GetZKJobWorkspace(int job_id) {
char buf[kZKBufSize];
snprintf(buf, kZKBufSize, "%010d", job_id);
return kZKPathJob + buf;
}
/*
* A wrapper for zookeeper service which handles error code and reconnections
*/
class ZKService {
public:
static void ChildChanges(zhandle_t* zh, int type, int state,
const char *path, void* watcherCtx);
~ZKService();
bool Init(const std::string& host, int timeout);
bool CreateNode(const char* path, const char* val, int flag, char* output);
bool DeleteNode(const char* path);
bool Exist(const char* path);
bool UpdateNode(const char* path, const char* val);
bool GetNode(const char* path, char* output);
bool GetChild(const char* path, std::vector<std::string>* vt);
bool WGetChild(const char* path, std::vector<std::string>* vt,
RTCallback *cb);
private:
const int kNumRetry = 5;
const int kSleepSec = 1;
static void WatcherGlobal(zhandle_t* zh, int type, int state,
const char *path, void* watcherCtx);
zhandle_t* zkhandle_ = nullptr;
};
/*
* A ClusterRuntime implementation using zookeeper
*/
class ZKClusterRT : public ClusterRuntime {
public:
ZKClusterRT(const std::string& host, int job_id);
~ZKClusterRT();
bool Init() override;
int RegistProc(const std::string& host_addr, int pid) override;
std::string GetProcHost(int proc_id) override;
bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
bool JoinSGroup(int gid, int wid, int s_group) override;
bool LeaveSGroup(int gid, int wid, int s_group) override;
private:
inline std::string groupPath(int gid) {
return group_path_ + "/sg" + std::to_string(gid);
}
inline std::string workerPath(int gid, int wid) {
return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
}
int timeout_ = 30000;
std::string host_ = "";
ZKService zk_;
std::string workspace_ = "";
std::string group_path_ = "";
std::string proc_path_ = "";
std::string proc_lock_path_ = "";
std::vector<RTCallback*> cb_vec_;
};
} // namespace singa
#endif // SINGA_UTILS_ZK_SERVICE_H_