| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*! |
| * \file mshadow_ps.h |
| * \brief parameter server abstraction for mshadow tensor |
| * this is a plugin of mshadow that can be used to syncrhonize |
| * parameters across device and machines |
| * |
| * \author Tianqi Chen, Mu Li |
| */ |
| #ifndef MSHADOW_PS_H_ // NOLINT(*) |
| #define MSHADOW_PS_H_ // NOLINT(*) |
| #include <vector> |
| // optionally support of lambda function in C++11, if available |
| #if __cplusplus >= 201103L |
| #include <functional> |
| #endif // C++11 |
| #include "../mshadow/tensor.h" |
| |
| /*! \brief whether to adapt distributed PS from parameter-server */ |
| #ifndef MSHADOW_DIST_PS |
| #define MSHADOW_DIST_PS 1 |
| #endif |
| |
| /*! \brief whether to support BSP rabit API of PS*/ |
| #ifndef MSHADOW_RABIT_PS |
| #define MSHADOW_RABIT_PS 1 |
| #endif |
| |
| namespace mshadow { |
| /*! \brief namespace of mshadow-ps */ |
| namespace ps { |
| /*! |
| * \brief interface of parameter server |
| * \tparam xpu the device of the data lies |
| * \tparam DType the type of element in the tensor |
| */ |
| template<typename xpu, |
| typename DType MSHADOW_DEFAULT_DTYPE> |
| class ISharedModel { |
| public: |
| /*! |
| * \brief callback function that will be executed when pull request finishes |
| * before calling the callback, the thread context is already switched |
| * to the device of pullrequest |
| * \param stream the stream of callback thread, it is recommended to operate using this stream |
| * \param arg the argument of callback function |
| */ |
| typedef void (CallbackFunction) (Stream<xpu> *stream, void *arg); |
| /*! \brief virtual destructor */ |
| virtual ~ISharedModel(void) {} |
| /*! |
| * \brief Set param for the layer from string |
| * \param name parameter name |
| * \param val string for configuration |
| */ |
| virtual void SetParam(const char *name, const char *val) {} |
| /*! |
| * \brief initialize the paramerver server client |
| * \param devices specifies the possible device id |
| * to be input from Push and Pull, |
| */ |
| virtual void Init(const std::vector<int> &devices) {} |
| /*! |
| * \brief initialize the paramerver server client |
| * without specifying the devices, only device 0 is allowed |
| */ |
| inline void Init(void) { |
| std::vector<int> dev; |
| dev.push_back(0); |
| this->Init(dev); |
| } |
| /*! |
| * \brief initialize a key with certain shape |
| * must be called before using Push/PullReq/PullWait |
| * on the corresponding key |
| * \param shape the shape content of the key |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| */ |
| template<int dim> |
| inline void InitKey(Shape<dim> shape, |
| int key, int devid) { |
| this->InitKey_(shape.FlatTo2D(), key, devid); |
| } |
| /*! |
| * \brief wait until the pull event finishes |
| * if there was no pull request, wait will directly returns |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| */ |
| virtual void PullWait(int key, int devid) = 0; |
| /*! |
| * \brief check if the weight was correct on the current device |
| * |
| * \param data the data |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| */ |
| template<int dim> |
| inline void CheckWeight(Tensor<xpu, dim, DType> data, |
| int key, |
| int devid) { |
| this->CheckWeight_(data.FlatTo2D(), key, devid); |
| } |
| /*! |
| * \brief push out a tensor to parameter server |
| * this call is asynchronize and returns immediately |
| * |
| * \param data the data |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| * \param priority the priority of this operation, |
| * the bigger the number is the higher the priority will be |
| */ |
| template<int dim> |
| inline void Push(Tensor<xpu, dim, DType> data, |
| int key, |
| int devid, |
| int priority = 0) { |
| this->Push_(data.FlatTo2D(), key, devid, priority); |
| } |
| /*! |
| * \brief send a pull request, to pull parameter into data |
| * this call is asynchronize and returns immediately |
| * use PullWait to wait the event of copy finish |
| * |
| * \param data the data |
| * \param key the unique key to indicate the tensor, |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| * \param priority the priority of this operation, |
| * the bigger the number is the higher the priority will be |
| * \param callback the callback function that will |
| * be invoked when the request finishes |
| * \param callback_arg the argument to pass to callback |
| */ |
| template<int dim> |
| inline void PullReq(Tensor<xpu, dim, DType> data, |
| int key, |
| int devid, |
| int priority = 0, |
| CallbackFunction callback = NULL, |
| void *callback_arg = NULL) { |
| this->PullReq_(data.FlatTo2D(), key, |
| devid, priority, callback, callback_arg); |
| } |
| #if __cplusplus >= 201103L |
| /*! |
| * \brief send a pull request, to pull parameter into data |
| * this call is asynchronize and returns immediately |
| * use PullWait to wait the event of copy finish |
| * this is the c++11 version that allows lambda function as callback |
| * \param data the data |
| * \param key the unique key to indicate the tensor, |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| * \param priority the priority of this operation, |
| * the bigger the number is the higher the priority will be |
| * \param callback the callback function |
| */ |
| template<int dim> |
| inline void PullReq(Tensor<xpu, dim, DType> data, |
| int key, |
| int devid, |
| int priority, |
| std::function<void(Stream<xpu> *stream)> callback) { |
| // need to allocate space, because callback can happen latter.. |
| auto calbk = new std::function<void(Stream<xpu> *stream)>(); |
| *calbk = callback; |
| this->PullReq(data, key, devid, priority, InvokeLambda_, calbk); |
| } |
| #endif // C++11 |
| |
| /*! |
| * \brief set weight of corresponding key in server |
| * this is a debug function that was not necessarily |
| * implemented by the server |
| * \param data the data to set |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| */ |
| virtual void SetWeight_(Tensor<xpu, 2, DType> data, |
| int key, |
| int devid) = 0; |
| /*! |
| * \brief check if the weight matches the server side |
| * this is a debug function that was not necessarily |
| * implemented by the server |
| * \param data the data to set |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| */ |
| virtual void CheckWeight_(Tensor<xpu, 2, DType> data, |
| int key, |
| int devid) = 0; |
| |
| protected: |
| /*! |
| * \brief initialize a key with certain shape |
| * \param shape the shape content of the key |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| */ |
| virtual void InitKey_(Shape<2> shape, |
| int key, int devid) = 0; |
| /*! |
| * \brief push out a tensor to parameter server |
| * this call is asynchronize and returns immediately |
| * |
| * \param data the data |
| * \param key the unique key to indicate the tensor |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| * \param priority the priority of this operation, |
| * the bigger the number is the higher the priority will be |
| */ |
| virtual void Push_(Tensor<xpu, 2, DType> data, |
| int key, |
| int devid, |
| int priority = 0) = 0; |
| /*! |
| * \brief send a pull request, to pull parameter into data |
| * this call is asynchronize and returns immediately |
| * use PullWait to wait the event of copy finish |
| * |
| * \param data the data |
| * \param key the unique key to indicate the tensor, |
| * this is unique per device |
| * \param devid the device id this tensor lies in |
| * \param priority the priority of this operation, |
| * the bigger the number is the higher the priority will be |
| * \param callback the callback function that will |
| * be invoked when the request finishes |
| * \param callback_arg the argument to pass to callback |
| */ |
| virtual void PullReq_(Tensor<xpu, 2, DType> data, |
| int key, |
| int devid, |
| int priority, |
| CallbackFunction callback, |
| void *callback_arg) = 0; |
| |
| private: |
| // C++11 support for lambda prepare function |
| #if __cplusplus >= 201103L |
| /*! \brief hack function to convert lambda to callback function */ |
| inline static void InvokeLambda_(Stream<xpu> *stream, void *fun) { |
| auto *fp = static_cast<std::function<void(Stream<xpu> *stream)>*>(fun); |
| (*fp)(stream); |
| delete fp; |
| } |
| #endif // C++11 |
| }; |
| /*! \brief interface for customized mshadow server */ |
| template<typename DType> |
| class IModelUpdater { |
| public: |
| virtual ~IModelUpdater(void) {} |
| /*! |
| * \brief set parameters from outside |
| * \param name name of parameter |
| * \param val value of parameter |
| */ |
| virtual void SetParam(const char *name, const char *val) {} |
| /*! |
| * \brief init the model updater |
| * \param rank the rank of the node |
| * \param argc number of arguments |
| * \param argv arguments |
| */ |
| virtual void InitUpdater(int rank, int argc, char *argv[]) {} |
| /*! |
| * \brief initialize the model |
| * \param key the key of data we point to |
| * \param dptr the data pointer |
| * \param size size of the parameter key |
| */ |
| virtual void InitModel(int key, DType *dptr, size_t size) { |
| this->InitModel_(key, Tensor<cpu, 1, DType>(dptr, Shape1(size))); |
| } |
| /*! |
| * update the model |
| * \param key the key of data we point to |
| * \param dptr the data pointer |
| * \param size size of the parameter key |
| */ |
| virtual void Update(int key, DType *dptr, size_t size) { |
| this->Update_(key, Tensor<cpu, 1, DType>(dptr, Shape1(size))); |
| } |
| |
| protected: |
| /*! |
| * \brief initialize the model, user can implement this one |
| * to take advantage of tensor operations |
| * \param key the key of data we point to |
| * \param data the tensor data corresponding to the data we want to initialize |
| */ |
| virtual void InitModel_(int key, Tensor<cpu, 1, DType> data) { |
| LOG(FATAL) << "InitModel: not implemented"; |
| } |
| /*! |
| * \brief update the model, user can implement this one |
| * to take advantage of tensor operations |
| * \param key the key of data we point to |
| * \param data the tensor data corresponding to the data we want to initialize |
| */ |
| virtual void Update_(int key, Tensor<cpu, 1, DType> data) { |
| LOG(FATAL) << "InitModel: not implemented"; |
| } |
| }; |
| /*! |
| * \brief create customized server |
| * this is a server defined by user |
| * \return new server |
| */ |
| template<typename DType> |
| IModelUpdater<DType> *CreateModelUpdater(void); |
| } // namespace ps |
| } // namespace mshadow |
| |
| #include "./ps_local-inl.h" |
| #include "./ps_dist-inl.h" |
| #include "./ps_rabit-inl.h" |
| namespace mshadow { |
| namespace ps { |
| /*! |
| * \brief create a parameter server implementation |
| * \param type the type of paramerver server |
| * can either be "local" or "dist" |
| * \return the ISharedModel that can be used to synchronize weights |
| */ |
| template<typename xpu, typename DType> |
| inline ISharedModel<xpu, DType> *CreateSharedModel(const char *type) { |
| if (!strcmp("local", type)) { |
| #if MSHADOW_RABIT_PS |
| // allreduce on one machine pays no cost |
| if (rabit::IsDistributed()) { |
| return new RabitModel<xpu, DType>(); |
| } |
| #endif |
| return new LocalModel<xpu, DType>(); |
| } |
| #if MSHADOW_DIST_PS |
| if (!strcmp("dist", type)) return new DistModel<xpu, DType>(); |
| #endif |
| LOG(FATAL) << "unknown server type " << type; |
| return NULL; |
| } |
| } // namespace ps |
| } // namespace mshadow |
| #endif // MSHADOW_PS_H_ NOLINT(*) |