blob: 1e4929f8a0ab63605117ca7a842483e453ec9869 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#include "singa/driver.h"
#include <glog/logging.h>
#include <set>
#include <string>
#include <vector>
#include "singa/neuralnet/layer.h"
#include "singa/utils/common.h"
#include "singa/utils/tinydir.h"
#include "singa/utils/cluster.h"
#include "singa/utils/context.h"
#include "singa/proto/job.pb.h"
#include "singa/server.h"
#include "singa/stub.h"
#include "singa/worker.h"
#include "singa/neuralnet/connection_layer.h"
#include "singa/neuralnet/input_layer.h"
#include "singa/neuralnet/loss_layer.h"
#include "singa/neuralnet/neuron_layer.h"
#include "singa/neuralnet/output_layer.h"
extern "C" void openblas_set_num_threads(int num);
namespace singa {
void Driver::Init(int argc, char **argv) {
// unique job ID generated from singa-run.sh, passed in as "-singa_job <id>"
int arg_pos = ArgPos(argc, argv, "-singa_job");
job_id_ = (arg_pos != -1) ? atoi(argv[arg_pos+1]) : -1;
// global signa conf passed by singa-run.sh as "-singa_conf <path>"
arg_pos = ArgPos(argc, argv, "-singa_conf");
if (arg_pos != -1)
ReadProtoFromTextFile(argv[arg_pos+1], &singa_conf_);
else
ReadProtoFromTextFile("conf/singa.conf", &singa_conf_);
// set log path
if (singa_conf_.has_log_dir())
SetupLog(singa_conf_.log_dir(), "driver");
// job conf passed by users as "-conf <path>"
arg_pos = ArgPos(argc, argv, "-conf");
if (arg_pos != -1)
ReadProtoFromTextFile(argv[arg_pos+1], &job_conf_);
// register layers
// input and output layers
RegisterLayer<RecordInputLayer, int>(kRecordInput);
RegisterLayer<CSVInputLayer, int>(kCSVInput);
RegisterLayer<ImagePreprocessLayer, int>(kImagePreprocess);
RegisterLayer<RecordOutputLayer, int>(kRecordOutput);
RegisterLayer<CSVOutputLayer, int>(kCSVOutput);
RegisterLayer<CharRNNInputLayer, int>(kCharRNN);
RegisterLayer<RNNLabelLayer, int>(kRNNLabel);
RegisterLayer<OneHotLayer, int>(kOneHot);
RegisterLayer<CharRNNOutputLayer, int>(kCharRNNOutput);
// connection layers
RegisterLayer<BridgeDstLayer, int>(kBridgeDst);
RegisterLayer<BridgeSrcLayer, int>(kBridgeSrc);
RegisterLayer<ConcateLayer, int>(kConcate);
RegisterLayer<SliceLayer, int>(kSlice);
RegisterLayer<SplitLayer, int>(kSplit);
RegisterLayer<RNNDummyLayer, int>(kRNNDummy);
RegisterLayer<AccuracyLayer, int>(kAccuracy);
RegisterLayer<ArgSortLayer, int>(kArgSort);
RegisterLayer<ConvolutionLayer, int>(kConvolution);
RegisterLayer<CConvolutionLayer, int>(kCConvolution);
RegisterLayer<CPoolingLayer, int>(kCPooling);
RegisterLayer<EmbeddingLayer, int>(kEmbedding);
#ifdef USE_CUDNN
RegisterLayer<CudnnActivationLayer, int>(kCudnnActivation);
RegisterLayer<CudnnConvLayer, int>(kCudnnConv);
RegisterLayer<CudnnPoolLayer, int>(kCudnnPool);
RegisterLayer<CudnnLRNLayer, int>(kCudnnLRN);
RegisterLayer<CudnnSoftmaxLayer, int>(kCudnnSoftmax);
RegisterLayer<CudnnSoftmaxLossLayer, int>(kCudnnSoftmaxLoss);
#endif
RegisterLayer<DropoutLayer, int>(kDropout);
RegisterLayer<DummyLayer, int>(kDummy);
RegisterLayer<EuclideanLossLayer, int>(kEuclideanLoss);
RegisterLayer<InnerProductLayer, int>(kInnerProduct);
RegisterLayer<LabelLayer, int>(kLabel);
RegisterLayer<LRNLayer, int>(kLRN);
RegisterLayer<MnistLayer, int>(kMnist);
RegisterLayer<PrefetchLayer, int>(kPrefetch);
RegisterLayer<PoolingLayer, int>(kPooling);
RegisterLayer<RBMHidLayer, int>(kRBMHid);
RegisterLayer<RBMVisLayer, int>(kRBMVis);
RegisterLayer<RGBImageLayer, int>(kRGBImage);
RegisterLayer<ReLULayer, int>(kReLU);
RegisterLayer<ShardDataLayer, int>(kShardData);
RegisterLayer<SigmoidLayer, int>(kSigmoid);
RegisterLayer<SoftmaxLossLayer, int>(kSoftmaxLoss);
RegisterLayer<STanhLayer, int>(kSTanh);
RegisterLayer<SoftmaxLayer, int>(kSoftmax);
RegisterLayer<GRULayer, int>(kGRU);
#ifdef USE_LMDB
RegisterLayer<LMDBDataLayer, int>(kLMDBData);
#endif
// register updaters
RegisterUpdater<AdaGradUpdater>(kAdaGrad);
RegisterUpdater<NesterovUpdater>(kNesterov);
RegisterUpdater<RMSPropUpdater>(kRMSProp);
RegisterUpdater<SGDUpdater>(kSGD);
// register learning rate change methods
RegisterLRGenerator<LRGenerator>(kFixed);
RegisterLRGenerator<FixedStepLRGen>(kFixedStep);
RegisterLRGenerator<StepLRGen>(kStep);
RegisterLRGenerator<LinearLRGen>(kLinear);
RegisterLRGenerator<ExpLRGen>(kExponential);
RegisterLRGenerator<InvLRGen>(kInverse);
RegisterLRGenerator<InvTLRGen>(kInverseT);
// register workers
RegisterWorker<BPWorker>(kBP);
RegisterWorker<BPTTWorker>(kBPTT);
RegisterWorker<CDWorker>(kCD);
// register params
RegisterParam<Param>(0);
// register param init methods
RegisterParamGenerator<ParamGenerator>(kConstant);
RegisterParamGenerator<GaussianGen>(kGaussian);
RegisterParamGenerator<UniformGen>(kUniform);
RegisterParamGenerator<GaussianSqrtFanInGen>(kGaussianSqrtFanIn);
RegisterParamGenerator<UniformSqrtFanInGen>(kUniformSqrtFanIn);
RegisterParamGenerator<UniformSqrtFanInOutGen>(kUniformSqrtFanInOut);
}
void Driver::InitLog(char* arg) {
google::InitGoogleLogging(arg);
}
void Driver::Train(bool resume, const std::string str) {
JobProto job_conf;
job_conf.ParseFromString(str);
Train(resume, job_conf);
}
void Driver::Train(bool resume, const JobProto& job_conf) {
if (singa_conf_.has_log_dir())
SetupLog(singa_conf_.log_dir(),
std::to_string(job_id_) + "-" + job_conf.name());
Cluster::Setup(job_id_, singa_conf_, job_conf.cluster());
tinydir_dir workspace;
if (tinydir_open(&workspace, job_conf.cluster().workspace().c_str()) == -1)
LOG(FATAL) << "workspace not exist: " << job_conf.cluster().workspace();
if (job_conf.num_openblas_threads() != 1)
LOG(WARNING) << "openblas luanches "
<< job_conf.num_openblas_threads() << " threads";
openblas_set_num_threads(job_conf.num_openblas_threads());
JobProto job;
job.CopyFrom(job_conf);
if (resume)
SetupForResume(&job);
job.set_id(job_id_);
Train(job);
}
void Driver::Test(const std::string str) {
JobProto job_conf;
job_conf.ParseFromString(str);
Test(job_conf);
}
void Driver::Test(const JobProto& job_conf) {
Cluster::Setup(job_id_, singa_conf_, job_conf.cluster());
Cluster::Get()->Register(getpid(), "localhost");
// TODO(wangwei) extend to a group with multiple workers
auto worker = Worker::Create(job_conf.train_one_batch());
worker->Setup(0, 0, job_conf, nullptr, nullptr, nullptr);
auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1);
WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json",
net->ToGraph(true).ToJson());
vector<string> paths;
for (const auto& p : job_conf.checkpoint_path())
paths.push_back(p);
net->Load(paths);
worker->Test(job_conf.test_steps(), kTest, net);
}
void Driver::Train(const JobProto& job_conf) {
auto cluster = Cluster::Get();
int nserver_grps = cluster->nserver_groups();
int grp_size = cluster->nworkers_per_group();
Stub stub;
// no need to create Stub if there is only a single worker without servers,
// i.e., the training will be conducted by the single worker.
if (grp_size > 1 || nserver_grps > 0) {
stub.Setup();
// TODO(wangwei) register endpoint to zookeeper if > 1 procs;
cluster->Register(getpid(), stub.endpoint()); // getpid() is from unistd.h
}
NeuralNet* net = NeuralNet::Create(job_conf.neuralnet(), kTrain, grp_size);
WriteStringToTextFile(cluster->vis_folder() + "/train_net.json",
net->ToGraph(true).ToJson());
const vector<Worker*> workers = CreateWorkers(job_conf, net);
const vector<Server*> servers = CreateServers(job_conf, net);
#ifdef USE_MPI
int nthreads = workers.size() + servers.size() + 1;
for (int i = 0; i < nthreads; i++)
MPIQueues.push_back(make_shared<SafeQueue>());
#endif
vector<std::thread> threads;
for (auto server : servers)
threads.push_back(std::thread(&Server::Run, server));
int gpu = 0;
auto context = Singleton<Context>::Instance();
// CHECK_LE(workers.size(), job_conf.gpu_size());
for (auto worker : workers) {
threads.push_back(std::thread(&Worker::Run, worker));
int device_id = -1;
if (gpu < job_conf.gpu_size()) {
device_id = job_conf.gpu(gpu++);
}
context->SetupDevice(threads.back().get_id(), device_id);
}
if (grp_size > 1 || nserver_grps > 0) {
int nservers_per_grp = cluster->nservers_per_group();
int lcm = LeastCommonMultiple(nservers_per_grp, nserver_grps);
auto slices = Param::ComputeSlices(lcm, net->params());
auto slice2server = PartitionSlices(nservers_per_grp, slices);
stub.Run(slice2server, workers, servers);
}
for (auto& thread : threads)
thread.join();
for (auto server : servers)
delete server;
delete net;
std::set<NeuralNet*> deleted{net, nullptr};
for (auto worker : workers) {
for (auto ptr : worker->GetNets())
if (deleted.find(ptr) == deleted.end()) {
delete ptr;
deleted.insert(ptr);
}
delete worker;
}
}
void Driver::SetupForResume(JobProto* job_conf) {
tinydir_dir dir;
std::string folder = Cluster::Get()->checkpoint_folder();
tinydir_open(&dir, folder.c_str());
int latest_step = 0;
// there would be multi checkpoint files (from diff workers) for one step
vector<std::string> ck_files;
// iterate all files to get the files for the last checkpoint
while (dir.has_next) {
tinydir_file file;
tinydir_readfile(&dir, &file);
tinydir_next(&dir);
char* ch = strstr(file.name, "step");
if (ch == nullptr) {
if (file.name[0] != '.')
LOG(INFO) << "Irregular file in checkpoint folder: " << file.name;
continue;
}
LOG(INFO) << "Add checkpoint file for resume: " << ch;
int step = atoi(ch+4);
if (step == latest_step) {
ck_files.push_back(file.name);
} else if (step > latest_step) {
latest_step = step;
ck_files.clear();
ck_files.push_back(std::string(file.name));
}
}
if (latest_step > 0) {
job_conf->set_step(latest_step);
if (!job_conf->has_reset_param_version())
job_conf->set_reset_param_version(false);
job_conf->clear_checkpoint_path();
for (auto ck_file : ck_files)
job_conf->add_checkpoint_path(folder + "/" + ck_file);
}
tinydir_close(&dir);
}
const vector<Worker*> Driver::CreateWorkers(const JobProto& job_conf,
NeuralNet* net) {
auto cluster = Cluster::Get();
vector<Worker*> workers;
if (!cluster->has_worker()) return workers;
int wgrp_size = cluster->nworkers_per_group();
int nservers_per_grp = cluster->nservers_per_group();
int nserver_grps = cluster->nserver_groups();
int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp);
const vector<int> rng = cluster->ExecutorRng(cluster->procs_id(),
cluster->nworkers_per_group(), cluster->nworkers_per_procs());
int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3];
for (int gid = gstart; gid < gend; gid++) {
NeuralNet* train_net = nullptr, *test_net = nullptr, *val_net = nullptr;
if (gid == gstart) {
train_net = net;
Param::SliceParams(lcm, train_net->params());
// test and validation are performed by the 1st group.
if (gid == 0 && job_conf.test_steps() > 0) {
test_net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1);
test_net->ShareParamsFrom(train_net, false);
}
if (gid == 0 && job_conf.validate_steps() > 0) {
val_net = NeuralNet::Create(job_conf.neuralnet(), kVal, 1);
val_net->ShareParamsFrom(train_net, false);
}
} else {
train_net = NeuralNet::Create(job_conf.neuralnet(), kTrain, wgrp_size);
if (cluster->share_memory()) {
train_net->ShareParamsFrom(net, true);
} else {
Param::SliceParams(lcm, train_net->params());
}
}
for (int wid = wstart; wid < wend; wid++) {
auto *worker = Worker::Create(job_conf.train_one_batch());
// TODO(wangwei) extend to test among workers in a grp
if (wid == 0)
worker->Setup(gid, wid, job_conf, train_net, val_net, test_net);
else
worker->Setup(gid, wid, job_conf, train_net, nullptr, nullptr);
workers.push_back(worker);
}
}
return workers;
}
const vector<Server*> Driver::CreateServers(const JobProto& job_conf,
NeuralNet* net) {
auto cluster = Cluster::Get();
vector<Server*> servers;
if (!cluster->has_server()) return servers;
int nservers_per_grp = cluster->nservers_per_group();
int nserver_grps = cluster->nserver_groups();
int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp);
auto slices = Param::ComputeSlices(lcm, net->params());
// partition among server groups, each group maintains one sub-set for sync
auto slice2group = PartitionSlices(nserver_grps, slices);
// partition within one server group, each server updates for one sub-set
auto slice2server = PartitionSlices(nservers_per_grp, slices);
int server_procs = cluster->procs_id();
// if true, server procs (logical) id starts after worker procs
if (cluster->server_worker_separate())
server_procs -= cluster->nworker_procs();
const vector<int> rng = cluster->ExecutorRng(server_procs,
cluster->nservers_per_group(), cluster->nservers_per_procs());
int gstart = rng[0], gend = rng[1], start = rng[2], end = rng[3];
for (int gid = gstart; gid < gend; gid++) {
for (int sid = start; sid < end; sid++) {
auto server = new Server(gid, sid, job_conf, slice2group, slice2server);
servers.push_back(server);
}
}
return servers;
}
} // namespace singa