Merge branch 'v1-rc0'
diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc
index c9b72d4..dc48ae2 100644
--- a/tool/mesos/singa_scheduler.cc
+++ b/tool/mesos/singa_scheduler.cc
@@ -180,7 +180,7 @@
// store in temporary map
new_tasks->push_back(task);
tasks_[offer.id().value()] = new_tasks;
-
+ hostnames_[offer.id().value()] = offer.hostname();
nhosts_++;
}
@@ -197,7 +197,7 @@
// launch tasks
for (map<string, vector<mesos::TaskInfo>*>::iterator it =
tasks_.begin(); it != tasks_.end(); ++it) {
- prepare_tasks(it->second, job_counter_, path);
+ prepare_tasks(it->second, hostnames_[it->first], job_counter_, path);
mesos::OfferID newId;
newId.set_value(it->first);
LOG(INFO) << "Launching task with offer ID = " << newId.value();
@@ -219,12 +219,11 @@
virtual void statusUpdate(SchedulerDriver* driver,
const mesos::TaskStatus& status) {
if (status.state() == mesos::TASK_FINISHED)
- task_counter_--;
+ task_counter_--;
- if (task_counter_ == 0)
+ if (task_counter_ == 0) {
driver->stop();
-
- else if (status.state() == mesos::TASK_FAILED) {
+ } else if (status.state() == mesos::TASK_FAILED) {
LOG(ERROR) << "TASK FAILED !!!!";
driver->abort();
}
@@ -252,14 +251,14 @@
/**
* Helper function that initialize TaskInfo with the correct URI and command
*/
- void prepare_tasks(vector<mesos::TaskInfo> *tasks, int job_id, string job_conf) {
+ void prepare_tasks(vector<mesos::TaskInfo> *tasks, string hostname, int job_id, string job_conf) {
char path_sys_config[512], path_job_config[512];
// path to singa.conf
snprintf(path_sys_config, 512, "hdfs://%s%s", namenode_.c_str(), SINGA_CONFIG);
snprintf(path_job_config, 512, "hdfs://%s%s", namenode_.c_str(), job_conf.c_str());
char command[512];
- snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job %d", job_id);
+ snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job %d -host %s", job_id, hostname.c_str());
for (int i=0; i < tasks->size(); i++) {
mesos::CommandInfo *comm = (tasks->at(i)).mutable_command();
@@ -366,6 +365,8 @@
int nhosts_;
// temporary map of tasks: <offerID, TaskInfo>
map<string, vector<mesos::TaskInfo>*> tasks_;
+ // temporary map of offerID to slave IP addresses
+ map<string, string> hostnames_;
// SINGA job config file
string job_conf_file_;
// HDFS namenode