blob: bb53614f055289663d26f0de0b2bde979b561ba5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* @brief Simple test of KVLayer
*/
#include "ps.h"
#include "parameter/kv_layer.h"
#include <cstdio>
#include <iostream>
#include <omp.h>
#include <map>
#include <mshadow/tensor.h>
#include <mshadow-ps/mshadow_ps.h>
#include "dbstr.h"
#include "glog/logging.h"
namespace mshadow {
namespace ps {
template<typename DType>
class Updater : public IModelUpdater<DType> {
protected:
void InitModel_(int key, Tensor<cpu, 1, DType> data) {
data = 0;
data_[key] = data;
}
void Update_(int key, Tensor<cpu, 1, DType> data) {
data_[key] += data;
// LOG(ERROR) << dbstr(data_[key]);
}
std::map<int, Tensor<cpu, 1, DType> > data_;
};
template<typename DType>
IModelUpdater<DType> *CreateModelUpdater(void) {
return new Updater<DType>();
}
} // namespace ps
} // namespace mshadow
// this function is runed by specific thread
template<typename xpu>
inline void RunWorkerThread(int devid,
mshadow::ps::ISharedModel<xpu, float> *ps) {
// initialize tensor engine
mshadow::InitTensorEngine<xpu>(devid);
mshadow::Stream<xpu> *stream = mshadow::NewStream<xpu>();
// allocate tensor on xpu
mshadow::TensorContainer<xpu, 2> data(mshadow::Shape2(2, 3));
// set the computation stream to the new allocated stream
// this will make subsequent computation whose target is data
// to use the stream, stream is needed for async execution in GPU
data.set_stream(stream);
// intiaialize the key, register the shape on parameter server
ps->InitKey(data[0].shape_, 0, devid);
ps->InitKey(data[1].shape_, 1, devid);
// first step, pull the data back from server
ps->PullReq(data[0], 0, devid);
ps->PullReq(data[1], 1, devid);
// PullWait will block until these request finishes
ps->PullWait(0, devid);
ps->PullWait(1, devid);
data[1] = devid + data[0];
LOG(ERROR) << "node " << ::ps::MyNodeID() << ", dev " << devid << ": before sync\n"
<< dbstr(data);
// push data[0] out, for update, or aggregation
// 0 is the key of the data, devid is the current device id
ps->Push(data[0], 0, devid);
// pull request is used to request the data to be copied back
// once computation is done
ps->PullReq(data[0], 0, devid);
// computation can be done here..
// the pull request handler will be overlapped with
// similar as previous call
ps->PullWait(0, devid);
ps->Push(data[1], 1, devid);
ps->PullReq(data[1], 1, devid);
// more computation can be done here...
// the computation will be overlapped
// PullWait will block until these request finishes
ps->PullWait(1, devid);
LOG(ERROR) << "node " << ::ps::MyNodeID() << ", dev " << devid
<< ": after sync\n" << dbstr(data);
mshadow::DeleteStream(stream);
mshadow::ShutdownTensorEngine<xpu>();
}
template<typename xpu>
inline int Run(int argc, char *argv[]) {
if (argc < 2) {
printf("Usage: device list\n"\
"\tfor CPU the device list can be arbitrary\n"\
"\tfor GPU the device list need to be actual device index\n");
return 0;
}
// list of device ids
std::vector<int> devs;
// initialization
for (int i = 1; i < argc; ++i) {
// record the device id
devs.push_back(atoi(argv[i]));
}
mshadow::ps::ISharedModel<xpu, float>
*ps = mshadow::ps::CreateSharedModel<xpu, float>("dist");
// intiaialize the ps
ps->SetParam("update_on_server", "1");
ps->Init(devs);
// use openmp to launch #devs threads
#pragma omp parallel num_threads(devs.size())
{
int tid = omp_get_thread_num();
RunWorkerThread<xpu>(devs[tid], ps);
}
delete ps;
return 0;
}