SINGA-79 Fix bug in singatool that can not parse -conf flag
singa-env.sh
- change commandline arg from -confdir=XXX to -confdir XXX
tool.cc
- parse -confdir flag to read corresponding sing configure
cluster_rt.h/cc
- add detailed documentation for JobManager
- add checks for zk related operations if zk handler is not initilized.
diff --git a/bin/singa-env.sh b/bin/singa-env.sh
index 98a0abc..0a3db9e 100755
--- a/bin/singa-env.sh
+++ b/bin/singa-env.sh
@@ -51,7 +51,7 @@
# set SINGA_LOG
if [ -z $SINGA_LOG ]; then
# add -confdir arg, so no need to run under SINGA_HOME
- SINGA_LOG=`"$SINGA_HOME"/singatool getlogdir -confdir="$SINGA_CONF"`
+ SINGA_LOG=`"$SINGA_HOME"/singatool getlogdir -confdir "$SINGA_CONF"`
[ $? == 0 ] || exit 1
fi
diff --git a/conf/singa.conf b/conf/singa.conf
index fad37d5..20cff98 100644
--- a/conf/singa.conf
+++ b/conf/singa.conf
@@ -1,4 +1,6 @@
# point to your active zookeeper service
+# this is comma separated host:port pairs, each corresponding to a zk server
+# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper_host: "localhost:2181"
# set if you want to change log directory
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index bdfa8fd..5de6c16 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -151,16 +151,27 @@
class JobManager {
public:
+ // host is comma separated host:port pairs, each corresponding to a zk server.
+ // e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
explicit JobManager(const std::string& host);
JobManager(const std::string& host, int timeout);
+ // NOTICE: Init must be called once, before start to use other functions
bool Init();
+ // generate a unique job id
bool GenerateJobID(int* id);
- bool GenerateHostList(const char* job_file, std::vector<std::string>* list);
+ // generate a list of hosts for a job conf
+ bool GenerateHostList(const char* host_file, const char* job_file,
+ std::vector<std::string>* list);
+ // list all jobs recorded in zk
bool ListJobs(std::vector<JobInfo>* jobs);
+ // list running processes for a job
bool ListJobProcs(int job, std::vector<std::string>* procs);
+ // remove a job path in zk
bool Remove(int job);
+ // remove all job paths in zk
bool RemoveAllJobs();
+ // remove all singa related paths in zk
bool CleanUp();
private:
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index e51ac97..493e1e2 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -83,6 +83,7 @@
bool ZKService::CreateNode(const char* path, const char* val, int flag,
char* output) {
+ CHECK(zkhandle_) << "zk handler not initialized";
char buf[kZKBufSize];
int ret = 0;
// send the zk request
@@ -126,6 +127,7 @@
}
bool ZKService::DeleteNode(const char* path) {
+ CHECK(zkhandle_) << "zk handler not initialized";
int ret = zoo_delete(zkhandle_, path, -1);
if (ret == ZOK) {
LOG(INFO) << "deleted zookeeper node " << path;
@@ -140,6 +142,7 @@
}
bool ZKService::Exist(const char* path) {
+ CHECK(zkhandle_) << "zk handler not initialized";
struct Stat stat;
int ret = zoo_exists(zkhandle_, path, 0, &stat);
if (ret == ZOK) return true;
@@ -149,6 +152,7 @@
}
bool ZKService::UpdateNode(const char* path, const char* val) {
+ CHECK(zkhandle_) << "zk handler not initialized";
// set version = -1, do not check content version
int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
if (ret == ZOK) {
@@ -163,6 +167,7 @@
}
bool ZKService::GetNode(const char* path, char* output) {
+ CHECK(zkhandle_) << "zk handler not initialized";
struct Stat stat;
int val_len = kZKBufSize;
int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
@@ -179,6 +184,7 @@
}
bool ZKService::GetChild(const char* path, vector<string>* vt) {
+ CHECK(zkhandle_) << "zk handler not initialized";
struct String_vector child;
int ret = zoo_get_children(zkhandle_, path, 0, &child);
if (ret == ZOK) {
@@ -193,6 +199,7 @@
bool ZKService::WGetChild(const char* path, vector<string>* vt,
RTCallback *cb) {
+ CHECK(zkhandle_) << "zk handler not initialized";
struct String_vector child;
int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
if (ret == ZOK) {
@@ -358,7 +365,8 @@
return true;
}
-bool JobManager::GenerateHostList(const char* job_file, vector<string>* list) {
+bool JobManager::GenerateHostList(const char* host_file, const char* job_file,
+ vector<string>* list) {
// compute required #process from job conf
ClusterProto cluster;
google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file),
@@ -373,9 +381,9 @@
else
nprocs = std::max(nworker_procs, nserver_procs);
// get available host list from global conf
- std::ifstream hostfile("conf/hostfile");
+ std::ifstream hostfile(host_file);
if (!hostfile.is_open()) {
- LOG(FATAL) << "Cannot open file: " << "conf/hostfile";
+ LOG(FATAL) << "Cannot open file: " << host_file;
}
vector<string> hosts;
string host;
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
index 435129c..3a6563c 100644
--- a/src/utils/tool.cc
+++ b/src/utils/tool.cc
@@ -27,6 +27,7 @@
#include "utils/cluster_rt.h"
#include "utils/common.h"
+std::string conf_dir;
singa::SingaProto global;
const int SUCCESS = 0;
const int ARG_ERR = 1;
@@ -55,7 +56,8 @@
singa::JobManager mngr(global.zookeeper_host());
if (!mngr.Init()) return RUN_ERR;
std::vector<std::string> list;
- if (!mngr.GenerateHostList(job_conf, &list)) return RUN_ERR;
+ if (!mngr.GenerateHostList((conf_dir+"/hostfile").c_str(), job_conf, &list))
+ return RUN_ERR;
// output selected hosts
for (std::string host : list)
printf("%s\n", host.c_str());
@@ -123,12 +125,19 @@
" view <job id> : view procs of a singa job\n"
" remove <job id> : remove a job path in zookeeper\n"
" removeall : remova all job paths in zookeeper\n"
- " cleanup : clean all singa data in zookeeper\n";
+ " cleanup : clean all singa data in zookeeper\n"
+ "[optional arguments] NOTICE: must put at end of a command\n"
+ " -confdir <dir> : path to singa global conf dir";
+
// set logging level to ERROR and log to STDERR only
google::LogToStderr();
google::SetStderrLogging(google::ERROR);
google::InitGoogleLogging(argv[0]);
- singa::ReadProtoFromTextFile("conf/singa.conf", &global);
+ // parse -confdir argument
+ int arg_pos = singa::ArgPos(argc, argv, "-confdir");
+ conf_dir = arg_pos == -1 ? "conf" : argv[arg_pos+1];
+ if (arg_pos != -1) argc -= 2;
+ singa::ReadProtoFromTextFile((conf_dir+"/singa.conf").c_str(), &global);
// stat code: ARG_ERR for wrong argument, RUN_ERR for runtime error
int stat = (argc <= 1) ? ARG_ERR : SUCCESS;