blob: dc48ae2addf0f505d581476732163ffa2cb52574 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#include "singa/proto/job.pb.h"
#include "singa/proto/singa.pb.h"
#include "./scheduler.pb.h"
#include "singa/utils/common.h"
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <iostream>
#include <fstream>
#include <hdfs.h>
#include <mesos/scheduler.hpp>
#include <google/protobuf/text_format.h>
#include <glog/logging.h>
/**
* \file singa_scheduler.cc implements a framework for managing SINGA jobs.
*
* The scheduler takes a job configuration file [file] and performs the following:
* 1. Parse the config file to determine the required resources.
* 2. [Optional] Copy singa.conf to the HDFS: /singa/singa.conf.
* 2.1. Raise error if singa.conf is NOT FOUND on HDFS
* 3. Wait for offers from the Mesos master until enough is acquired.
* 4. Keep an increasing counter of job ID.
* 5. Write [file] to HDFS: /singa/job_ID/job.conf
* 6. Start the task:
* + Set URI in the TaskInfo message to point to the config files on HDFS:
/singa/singa.conf
/singa/Job_ID/job.conf
* + Set the executable command: singa-run.sh -conf ./job.conf
*
* We assume that singa-run.sh is include in $PATH variable at all nodes.
* (Else, we set the executable to its full path)
* ./job.conf is relative path pointing to the current sandbox directory created
* dynamically by Mesos.
*
*
* Scheduling:
* Each SINGA job requires certain resources represented by: (1) number of workers, (2) number of worker groups
* and (3) number of worker per process. The resources offered by Mesos contains (1) number of host, (2) number of CPUs
* at each host and (3) memory available at each hosts.
*
* Our scheduler performs simply task assignment which guarantees that each process runs an entire work group,
* and each takes all the memory offered by the slave. We assume that each slave runs ONE process, that is the following
* condition holds:
* nCPUs_per_host >= nWorkersPerProcess+nServersPerProcess //if seperate
>= max(nWorkerPerProcess, nServersPerProcess) // if attached
*/
using std::string;
using mesos::SchedulerDriver;
using std::vector;
using std::map;
const char usage[] = " singa_scheduler <job_conf> [-scheduler_conf global_config] [-singa_conf singa_config] \n"
" job_conf: job configuration file\n"
" -scheduler_conf: optional, system-wide configuration file\n"
" -singa_conf: optional, singa global configuration file\n";
const char SINGA_CONFIG[] = "/singa/singa.conf";
const char DEFAULT_SCHEDULER_CONF[] = "scheduler.conf";
class SingaScheduler: public mesos::Scheduler {
public:
/**
* Constructor, used when [sing_conf] is not given. Raise error if /singa/singa.conf is not found
* on HDFS.
*
* @param namenode address of HDFS namenode
* @param job_conf_file job configuration file
* @param jc job counter
*/
SingaScheduler(string namenode, string job_conf_file, int jc):
job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) {
hdfs_handle_ = hdfs_connect(namenode.c_str());
if (hdfs_handle_) {
if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0)
LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use -singa_conf flag to upload the file";
} else {
LOG(ERROR) << "Failed to connect to HDFS";
}
ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
}
/**
* Constructor. It overwrites /singa/singa.conf on HDFS (created a new one if necessary).
* The file contains zookeeper_host and log_dir values
* It also parses the JobProto from job_config file
*
* @param namenode address of HDFS namenode
* @param singa_conf singa global configuration file
* @param job_conf_file job configuration file
*/
SingaScheduler(string namenode, string job_conf_file, string singa_conf, int jc)
: job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) {
hdfs_handle_ = hdfs_connect(namenode);
if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf))
LOG(ERROR) << "Failed to connect to HDFS";
ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
}
virtual void registered(SchedulerDriver *driver,
const mesos::FrameworkID& frameworkId,
const mesos::MasterInfo& masterInfo) {
}
virtual void reregistered(SchedulerDriver *driver,
const mesos::MasterInfo& masterInfo) {
}
virtual void disconnected(SchedulerDriver *driver) {
}
/**
* Handle resource offering from Mesos scheduler. It implements the simple/naive
* scheduler:
* + For each offer that contains enough CPUs, adds new tasks to the list
* + Launch all the tasks when reaching the required number of tasks (nworkers_groups + nserver_groups).
*/
virtual void resourceOffers(SchedulerDriver* driver, const std::vector<mesos::Offer>& offers) {
// do nothing if the task is already running
if (is_running_)
return;
for (int i = 0; i < offers.size(); i++) {
const mesos::Offer offer = offers[i];
// check for resource and create temporary tasks
int cpus = 0, mem = 0;
int nresources = offer.resources().size();
for (int r = 0; r < nresources; r++) {
const mesos::Resource& resource = offer.resources(r);
if (resource.name() == "cpus"
&& resource.type() == mesos::Value::SCALAR)
cpus = resource.scalar().value();
else if (resource.name() == "mem"
&& resource.type() == mesos::Value::SCALAR)
mem = resource.scalar().value();
}
if (!check_resources(cpus))
break;
vector<mesos::TaskInfo> *new_tasks = new vector<mesos::TaskInfo>();
mesos::TaskInfo task;
task.set_name("SINGA");
char string_id[100];
snprintf(string_id, 100, "SINGA_%d", nhosts_);
task.mutable_task_id()->set_value(string_id);
task.mutable_slave_id()->MergeFrom(offer.slave_id());
mesos::Resource *resource = task.add_resources();
resource->set_name("cpus");
resource->set_type(mesos::Value::SCALAR);
// take only nworkers_per_group CPUs
resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
resource = task.add_resources();
resource->set_name("mem");
resource->set_type(mesos::Value::SCALAR);
// take all the memory
resource->mutable_scalar()->set_value(mem);
// store in temporary map
new_tasks->push_back(task);
tasks_[offer.id().value()] = new_tasks;
hostnames_[offer.id().value()] = offer.hostname();
nhosts_++;
}
if (nhosts_>= job_conf_.cluster().nworker_groups()) {
LOG(INFO) << "Acquired enough resources: "
<< job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
<< " CPUs over " << job_conf_.cluster().nworker_groups() << " hosts. Launching tasks ... ";
// write job_conf_file_ to /singa/job_id/job.conf
char path[512];
snprintf(path, 512, "/singa/%d/job.conf", job_counter_);
hdfs_overwrite(hdfs_handle_, path, job_conf_file_);
// launch tasks
for (map<string, vector<mesos::TaskInfo>*>::iterator it =
tasks_.begin(); it != tasks_.end(); ++it) {
prepare_tasks(it->second, hostnames_[it->first], job_counter_, path);
mesos::OfferID newId;
newId.set_value(it->first);
LOG(INFO) << "Launching task with offer ID = " << newId.value();
driver->launchTasks(newId, *(it->second));
task_counter_++;
if (task_counter_>= job_conf_.cluster().nworker_groups())
break;
}
job_counter_++;
is_running_ = true;
}
}
virtual void offerRescinded(SchedulerDriver *driver,
const mesos::OfferID& offerId) {
}
virtual void statusUpdate(SchedulerDriver* driver,
const mesos::TaskStatus& status) {
if (status.state() == mesos::TASK_FINISHED)
task_counter_--;
if (task_counter_ == 0) {
driver->stop();
} else if (status.state() == mesos::TASK_FAILED) {
LOG(ERROR) << "TASK FAILED !!!!";
driver->abort();
}
}
virtual void frameworkMessage(SchedulerDriver* driver,
const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
const string& data) {
}
virtual void slaveLost(SchedulerDriver* driver,
const mesos::SlaveID& slaveId) {
}
virtual void executorLost(SchedulerDriver* driver,
const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
int status) {
}
virtual void error(SchedulerDriver* driver, const string& message) {
LOG(ERROR) << "ERROR !!! " << message;
}
private:
/**
* Helper function that initialize TaskInfo with the correct URI and command
*/
void prepare_tasks(vector<mesos::TaskInfo> *tasks, string hostname, int job_id, string job_conf) {
char path_sys_config[512], path_job_config[512];
// path to singa.conf
snprintf(path_sys_config, 512, "hdfs://%s%s", namenode_.c_str(), SINGA_CONFIG);
snprintf(path_job_config, 512, "hdfs://%s%s", namenode_.c_str(), job_conf.c_str());
char command[512];
snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job %d -host %s", job_id, hostname.c_str());
for (int i=0; i < tasks->size(); i++) {
mesos::CommandInfo *comm = (tasks->at(i)).mutable_command();
comm->add_uris()->set_value(path_sys_config);
comm->add_uris()->set_value(path_job_config);
comm->set_value(command);
}
}
/**
* Helper function to connect to HDFS
*/
hdfsFS hdfs_connect(string namenode) {
string path(namenode);
int idx = path.find_first_of(":");
string host = path.substr(0, idx);
int port = atoi(path.substr(idx+1).c_str());
return hdfsConnect(host.c_str(), port);
}
/**
* Helper function to read HDFS file content into a string.
* It assumes the file exists.
* @return NULL if there's error.
*/
string hdfs_read(hdfsFS hdfs_handle, string filename) {
hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, filename.c_str());
int file_size = stat->mSize;
string buffer;
buffer.resize(file_size);
hdfsFile file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_RDONLY, 0, 0, 0);
int status = hdfsRead(hdfs_handle, file, const_cast<char*>(buffer.c_str()), stat->mSize);
hdfsFreeFileInfo(stat, 1);
hdfsCloseFile(hdfs_handle, file);
if (status != -1)
return string(buffer);
else
return NULL;
}
/**
* Helper function that write content of source_file to filename, overwritting the latter
* if it exists.
* @return 1 if sucessfull, 0 if fail.
*/
int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string source_file) {
hdfsFile file;
if (hdfsExists(hdfs_handle, filename.c_str()) == 0) {
file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
} else {
// create directory and file
int last_idx = filename.find_last_of("/");
string dir = filename.substr(0, last_idx);
hdfsCreateDirectory(hdfs_handle, dir.c_str());
file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
}
FILE *fh = fopen(source_file.c_str(), "r");
if (!fh) {
LOG(ERROR) << "Cannot open " << source_file;
return 0;
}
if (file) {
fseek(fh, 0, SEEK_END);
int len = ftell(fh);
rewind(fh);
string buf;
buf.resize(len);
fread(const_cast<char*>(buf.c_str()), len, 1, fh);
fclose(fh);
hdfsWrite(hdfs_handle, file, buf.c_str(), len);
hdfsFlush(hdfs_handle, file);
hdfsCloseFile(hdfs_handle, file);
} else {
LOG(ERROR) << "ERROR openng file on HDFS " << filename;
return 0;
}
return 1;
}
/**
* Helper function, check if the offered CPUs satisfies the resource requirements
* @param ncpus: number of cpus offer at this host
* @return true when ncpus >= (nWorkersPerProcess + nServersPerProcess) if workers and servers are separated
* or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are not.
*/
bool check_resources(int ncpus) {
int n1 = job_conf_.cluster().nworkers_per_procs();
int n2 = job_conf_.cluster().nservers_per_procs();
LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus = " << ncpus;
return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : ncpus >= (n1 > n2 ? n1 : n2);
}
int job_counter_, task_counter_;
// true if the job has been launched
bool is_running_;
singa::JobProto job_conf_;
// total number of hosts required
int nhosts_;
// temporary map of tasks: <offerID, TaskInfo>
map<string, vector<mesos::TaskInfo>*> tasks_;
// temporary map of offerID to slave IP addresses
map<string, string> hostnames_;
// SINGA job config file
string job_conf_file_;
// HDFS namenode
string namenode_;
// handle to HDFS
hdfsFS hdfs_handle_;
};
int main(int argc, char** argv) {
FLAGS_logtostderr = 1;
int status = mesos::DRIVER_RUNNING;
SingaScheduler *scheduler;
if (!(argc == 2 || argc == 4 || argc == 6)) {
std::cout << usage << std::endl;
return 1;
}
int scheduler_conf_idx = 0;
int singa_conf_idx = 0;
for (int i=1; i < argc-1; i++) {
if (strcmp(argv[i], "-scheduler_conf") == 0)
scheduler_conf_idx = i+1;
if (strcmp(argv[i], "-singa_conf") == 0)
singa_conf_idx = i+1;
}
SchedulerProto msg;
if (scheduler_conf_idx)
singa::ReadProtoFromTextFile((const char*)argv[scheduler_conf_idx], &msg);
else
singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF, &msg);
if (!singa_conf_idx)
scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), msg.job_counter());
else
scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), string(argv[singa_conf_idx]), msg.job_counter());
msg.set_job_counter(msg.job_counter()+1);
if (scheduler_conf_idx)
singa::WriteProtoToTextFile(msg, (const char*)argv[scheduler_conf_idx]);
else
singa::WriteProtoToTextFile(msg, DEFAULT_SCHEDULER_CONF);
LOG(INFO) << "Scheduler initialized";
mesos::FrameworkInfo framework;
framework.set_user("");
framework.set_name("SINGA");
SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, framework, msg.master().c_str());
LOG(INFO) << "Starting SINGA framework...";
status = driver->run();
driver->stop();
LOG(INFO) << "Stoping SINGA framework...";
return status == mesos::DRIVER_STOPPED ? 0 : 1;
}